Skip to content

Commit

Permalink
Merge pull request #306 from bytewax/release-0.17.2
Browse files Browse the repository at this point in the history
Prepare for 0.17.2 release.
  • Loading branch information
whoahbot committed Oct 5, 2023
2 parents a16c8f9 + 96b15aa commit 10486e6
Show file tree
Hide file tree
Showing 8 changed files with 303 additions and 16 deletions.
1 change: 0 additions & 1 deletion .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,6 @@ jobs:
upload:
name: Store wheels in S3
runs-on: ubuntu-20.04
if: "github.ref == 'refs/heads/main'"
needs: [ linux, macos, windows ]
steps:
- uses: actions/download-artifact@v3
Expand Down
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ All notable changes to this project will be documented in this file. For help wi
__Add any extra change notes here and we'll put them in the release
notes on GitHub when we make a new release.__

## v0.17.2

- Fixes error message creation, and updates error messages when
creating recovery partitions.

## v0.17.1

- Adds the `batch` operator to Dataflows. Calling `Dataflow.batch`
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "bytewax"
version = "0.17.1"
version = "0.17.2"
edition = "2021"

[lib]
Expand Down
14 changes: 9 additions & 5 deletions apidocs/html/bytewax/connectors/kafka.html
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ <h1 class="api__article-title">Module <strong>bytewax.connectors.kafka</strong><
cluster_metadata = client.list_topics(topic)
topic_metadata = cluster_metadata.topics[topic]
if topic_metadata.error is not None:
msg = f&#34;error listing partitions for Kafka topic `{topic!r}`: &#34;
f&#34;{topic_metadata.error.str()}&#34;
msg = (
f&#34;error listing partitions for Kafka topic `{topic!r}`: &#34;
f&#34;{topic_metadata.error.str()}&#34;
)
raise RuntimeError(msg)
part_idxs = topic_metadata.partitions.keys()
for i in part_idxs:
Expand Down Expand Up @@ -90,9 +92,11 @@ <h1 class="api__article-title">Module <strong>bytewax.connectors.kafka</strong><
break
else:
# Discard all the messages in this batch too
msg = f&#34;error consuming from Kafka topic `{self._topic!r}`: &#34;
f&#34;{msg.error()}&#34;
raise RuntimeError(msg)
err_msg = (
f&#34;error consuming from Kafka topic `{self._topic!r}`: &#34;
f&#34;{msg.error()}&#34;
)
raise RuntimeError(err_msg)
batch.append((msg.key(), msg.value()))
last_offset = msg.offset()

Expand Down
264 changes: 264 additions & 0 deletions apidocs/html/bytewax/connectors/periodic.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
<main class="api__content">
<article class="api__article" id="content">
<header class="api__article-header">
<h1 class="api__article-title">Module <strong>bytewax.connectors.periodic</strong></h1>
</header>
<section class="api__article-intro" id="section-intro">
<p>Periodic interval base class to build custom input sources.</p>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre class="language-python line-numbers"><code class="language-python">&#34;&#34;&#34;Periodic interval base class to build custom input sources.&#34;&#34;&#34;
from abc import abstractmethod
from datetime import datetime, timedelta, timezone
from math import ceil
from typing import Any, Optional

from bytewax.inputs import PartitionedInput, StatefulSource

__all__ = [
&#34;SimplePollingInput&#34;,
]


class _SimplePollingSource(StatefulSource):
def __init__(self, interval: timedelta, align_to: Optional[datetime], getter):
now = datetime.now(timezone.utc)
if align_to is not None:
assert now &gt; align_to, &#34;align_to must be in the past&#34;
self._align_to = align_to
# Next awake is the next datetime that is an integer
# number of intervals from the align_to date.
self._next_awake = align_to + (interval * ceil((now - align_to) / interval))
else:
self._align_to = now
self._next_awake = now

self._interval = interval
self._getter = getter

def next_batch(self):
self._next_awake += self._interval
return [self._getter()]

def next_awake(self):
return self._next_awake

def snapshot(self):
return None


class SimplePollingInput(PartitionedInput):
&#34;&#34;&#34;Calls a user defined function at a regular interval.

Subclass this input source and write the `next_item` function
that will be called at the defined interval.

Example:
&gt;&gt;&gt; class URLInput(SimplePollingInput):
... def next_item(self):
... return requests.get(&#34;https://example.com&#34;)
...

Notes:
- The `interval` parameter is capped at a minimum of 10ms, but it can be as
large as needed.
- If you need fast input polling, consider writing a custom input source,
where you can also batch items to avoid doing too much IO.
&#34;&#34;&#34;

def __init__(self, interval: timedelta, align_to: Optional[datetime] = None):
&#34;&#34;&#34;Create a PeriodicInput.

Args:
interval: The interval between awakes. Must be &gt;= 10ms.
align_to: Align awake times to the given datetime.
&#34;&#34;&#34;
if interval &lt; timedelta(milliseconds=10):
msg = &#34;The interval for SimplePollingInput must be &gt;= 10ms&#34;
raise ValueError(msg)
self._interval = interval
self._align_to = align_to

def list_parts(self):
&#34;&#34;&#34;Only emit a single tick.&#34;&#34;&#34;
return [&#34;singleton&#34;]

def build_part(self, for_part, _resume_state): # noqa: D102
assert for_part == &#34;singleton&#34;
# Ignore resume state
return _SimplePollingSource(self._interval, None, self.next_item)

@abstractmethod
def next_item(self) -&gt; Any:
&#34;&#34;&#34;This function will be called at regular inerval.&#34;&#34;&#34;
...</code></pre>
</details>
</section>
<section>
</section>
<section>
</section>
<section>
</section>
<section>
<h2 class="api__article-subtitle" id="header-classes">Classes</h2>
<dl>
<dt id="bytewax.connectors.periodic.SimplePollingInput"><code class="language-python flex name class">
<span>class <span class="ident">SimplePollingInput</span></span>
<span>(</span><span>interval: datetime.timedelta, align_to: Optional[datetime.datetime] = None)</span>
</code></dt>
<dd>
<div class="desc"><p>Calls a user defined function at a regular interval.</p>
<p>Subclass this input source and write the <code>next_item</code> function
that will be called at the defined interval.</p>
<p>Example:</p>
<pre><code class="language-python-repl">&gt;&gt;&gt; class URLInput(SimplePollingInput):
... def next_item(self):
... return requests.get(&quot;https://example.com&quot;)
...
</code></pre>
<p>Notes:
- The <code>interval</code> parameter is capped at a minimum of 10ms, but it can be as
large as needed.
- If you need fast input polling, consider writing a custom input source,
where you can also batch items to avoid doing too much IO.</p>
<p>Create a PeriodicInput.</p>
<h2 id="args">Args</h2>
<dl>
<dt><strong><code>interval</code></strong></dt>
<dd>The interval between awakes. Must be &gt;= 10ms.</dd>
<dt><strong><code>align_to</code></strong></dt>
<dd>Align awake times to the given datetime.</dd>
</dl></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre class="language-python line-numbers"><code class="language-python">class SimplePollingInput(PartitionedInput):
&#34;&#34;&#34;Calls a user defined function at a regular interval.

Subclass this input source and write the `next_item` function
that will be called at the defined interval.

Example:
&gt;&gt;&gt; class URLInput(SimplePollingInput):
... def next_item(self):
... return requests.get(&#34;https://example.com&#34;)
...

Notes:
- The `interval` parameter is capped at a minimum of 10ms, but it can be as
large as needed.
- If you need fast input polling, consider writing a custom input source,
where you can also batch items to avoid doing too much IO.
&#34;&#34;&#34;

def __init__(self, interval: timedelta, align_to: Optional[datetime] = None):
&#34;&#34;&#34;Create a PeriodicInput.

Args:
interval: The interval between awakes. Must be &gt;= 10ms.
align_to: Align awake times to the given datetime.
&#34;&#34;&#34;
if interval &lt; timedelta(milliseconds=10):
msg = &#34;The interval for SimplePollingInput must be &gt;= 10ms&#34;
raise ValueError(msg)
self._interval = interval
self._align_to = align_to

def list_parts(self):
&#34;&#34;&#34;Only emit a single tick.&#34;&#34;&#34;
return [&#34;singleton&#34;]

def build_part(self, for_part, _resume_state): # noqa: D102
assert for_part == &#34;singleton&#34;
# Ignore resume state
return _SimplePollingSource(self._interval, None, self.next_item)

@abstractmethod
def next_item(self) -&gt; Any:
&#34;&#34;&#34;This function will be called at regular inerval.&#34;&#34;&#34;
...</code></pre>
</details>
<h3>Ancestors</h3>
<ul class="hlist">
<li><a title="bytewax.inputs.PartitionedInput" href="/apidocs/bytewax.inputs#bytewax.inputs.PartitionedInput">PartitionedInput</a></li>
<li><a title="bytewax.inputs.Input" href="/apidocs/bytewax.inputs#bytewax.inputs.Input">Input</a></li>
<li>abc.ABC</li>
</ul>
<h3>Methods</h3>
<dl>
<dt id="bytewax.connectors.periodic.SimplePollingInput.list_parts"><code class="language-python name flex">
<span>def <span class="ident">list_parts</span></span>(<span>self)</span>
</code></dt>
<dd>
<div class="desc"><p>Only emit a single tick.</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre class="language-python line-numbers"><code class="language-python">def list_parts(self):
&#34;&#34;&#34;Only emit a single tick.&#34;&#34;&#34;
return [&#34;singleton&#34;]</code></pre>
</details>
</dd>
<dt id="bytewax.connectors.periodic.SimplePollingInput.next_item"><code class="language-python name flex">
<span>def <span class="ident">next_item</span></span>(<span>self) ‑> Any</span>
</code></dt>
<dd>
<div class="desc"><p>This function will be called at regular inerval.</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre class="language-python line-numbers"><code class="language-python">@abstractmethod
def next_item(self) -&gt; Any:
&#34;&#34;&#34;This function will be called at regular inerval.&#34;&#34;&#34;
...</code></pre>
</details>
</dd>
</dl>
<h3>Inherited members</h3>
<ul class="hlist">
<li><code class="language-python"><b><a title="bytewax.inputs.PartitionedInput" href="/apidocs/bytewax.inputs#bytewax.inputs.PartitionedInput">PartitionedInput</a></b></code>:
<ul class="hlist">
<li><code class="language-python"><a title="bytewax.inputs.PartitionedInput.build_part" href="/apidocs/bytewax.inputs#bytewax.inputs.PartitionedInput.build_part">build_part</a></code></li>
</ul>
</li>
</ul>
</dd>
</dl>
</section>
<footer class="api__footer" id="footer">
<p class="api__footer-copyright">
Generated by <a href="https://pdoc3.github.io/pdoc" title="pdoc: Python API documentation generator"><cite>pdoc</cite> 0.10.0</a>.
</p>
</footer>
</article>
<nav class="api__sidebar" id="sidebar">
<ul class="api__sidebar-nav" id="index">
<li class="api__sidebar-nav-item">
<h3 class="api__sidebar-nav-title">Super-module</h3>
<ul class="api__sidebar-nav-menu">
<li class="api__sidebar-nav-menu-item">
<a title="bytewax.connectors" href="/apidocs/bytewax.connectors/index">bytewax.connectors</a>
</li>
</ul>
</li>
<li class="api__sidebar-nav-item">
<h3 class="api__sidebar-nav-title"><a href="#header-classes">Classes</a></h3>
<ul class="api__sidebar-nav-classes">
<li class="api__sidebar-nav-classes-item">
<h4 class="api__sidebar-nav-classes-title"><a title="bytewax.connectors.periodic.SimplePollingInput" href="/apidocs/bytewax.connectors/periodic#bytewax.connectors.periodic.SimplePollingInput">SimplePollingInput</a></h4>
<ul class="api__sidebar-nav-menu">
<li class="api__sidebar-nav-menu-item"><a title="bytewax.connectors.periodic.SimplePollingInput.list_parts" href="/apidocs/bytewax.connectors/periodic#bytewax.connectors.periodic.SimplePollingInput.list_parts">list_parts</a></li>
<li class="api__sidebar-nav-menu-item"><a title="bytewax.connectors.periodic.SimplePollingInput.next_item" href="/apidocs/bytewax.connectors/periodic#bytewax.connectors.periodic.SimplePollingInput.next_item">next_item</a></li>
</ul>
</li>
</ul>
</li>
</ul>
</nav>
</main>
13 changes: 11 additions & 2 deletions apidocs/html/bytewax/recovery.html
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ <h2 id="caveats">Caveats</h2>
processing.</p>
<h2 id="setup">Setup</h2>
<p>Recovery partitions must be pre-initialized before running the
dataflow initially. This is done by executing this module:</p>
dataflow initially. First, create the directory to hold the partitions.</p>
<pre><code>$ mkdir db_dir
</code></pre>
<p>Then create the partitions by executing this module:</p>
<pre><code>$ python -m bytewax.recovery db_dir/ 4
</code></pre>
<p>The second parameter (e.g. <code>4</code>) is the number of recovery partitions
Expand Down Expand Up @@ -191,7 +194,13 @@ <h2 id="backup-and-disaster-recovery">Backup And Disaster Recovery</h2>
-----

Recovery partitions must be pre-initialized before running the
dataflow initially. This is done by executing this module:
dataflow initially. First, create the directory to hold the partitions.

```
$ mkdir db_dir
```

Then create the partitions by executing this module:

```
$ python -m bytewax.recovery db_dir/ 4
Expand Down
18 changes: 12 additions & 6 deletions apidocs/html/bytewax/run.html
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,10 @@ <h2 id="recovery">Recovery</h2>
# Reraise the ImportError if it occurred within the imported module.
# Determine this by checking whether the trace has a depth &gt; 1.
if sys.exc_info()[2].tb_next:
msg = f&#34;While importing {module_name!r}, an ImportError was raised:\n\n&#34;
f&#34;{traceback.format_exc()}&#34;
msg = (
f&#34;While importing {module_name!r}, an ImportError was raised:\n\n&#34;
f&#34;{traceback.format_exc()}&#34;
)
raise ImportError(msg) from None
else:
msg = f&#34;Could not import {module_name!r}.&#34;
Expand Down Expand Up @@ -222,17 +224,21 @@ <h2 id="recovery">Recovery</h2>
if not _called_with_wrong_args(attr):
raise

msg = f&#34;The factory {dataflow_name!r} in module {module.__name__!r} &#34;
&#34;could not be called with the specified arguments&#34;
msg = (
f&#34;The factory {dataflow_name!r} in module {module.__name__!r} &#34;
&#34;could not be called with the specified arguments&#34;
)
raise TypeError(msg) from e
else:
dataflow = attr

if isinstance(dataflow, Dataflow):
return dataflow

msg = &#34;A valid Bytewax dataflow was not obtained from &#34;
f&#34;&#39;{module.__name__}:{dataflow_name}&#39;&#34;
msg = (
&#34;A valid Bytewax dataflow was not obtained from &#34;
f&#34;&#39;{module.__name__}:{dataflow_name}&#39;&#34;
)
raise RuntimeError(msg)


Expand Down

0 comments on commit 10486e6

Please sign in to comment.