Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename windows #208

Merged
merged 2 commits into from Feb 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Expand Up @@ -11,7 +11,9 @@ notes on GitHub when we make a new release.__

- Add `SessionWindow` for windowing operators.

- Add `HoppingWindowConfig` for windowing operators.
- Add `SlidingWindow` for windowing operators.

- *Breaking change* Rename `TumblingWindowConfig` to `TumblingWindow`

- Add `filter_map` operator.

Expand Down
4 changes: 2 additions & 2 deletions README.md
Expand Up @@ -82,10 +82,10 @@ There are multiple stateful operators available like `reduce`, `stateful_map` an
import datetime
from collections import defaultdict

from bytewax.window import TumblingWindowConfig, SystemClockConfig
from bytewax.window import TumblingWindow, SystemClockConfig

cc = SystemClockConfig()
wc = TumblingWindowConfig(length=datetime.timedelta(seconds=5))
wc = TumblingWindow(length=datetime.timedelta(seconds=5))


def build():
Expand Down
271 changes: 271 additions & 0 deletions apidocs/html/bytewax/connectors/files.html
@@ -0,0 +1,271 @@
<main class="api__content">
<article class="api__article" id="content">
<header class="api__article-header">
<h1 class="api__article-title">Module <strong>bytewax.connectors.files</strong></h1>
</header>
<section class="api__article-intro" id="section-intro">
<p>Connectors for local text files.</p>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre class="language-python line-numbers"><code class="language-python">&#34;&#34;&#34;Connectors for local text files.

&#34;&#34;&#34;
from pathlib import Path

from bytewax.inputs import PartInput


def _stateful_read(path, resume_i):
with open(path) as f:
for i, line in enumerate(f):
# Resume to one after the last completed read.
if i &lt;= resume_i:
continue
yield i, line.strip()


class DirInput(PartInput):
&#34;&#34;&#34;Read all files in a filesystem directory line-by-line.

The directory must exist and contain identical data on all
workers, so either run on a single machine or use a shared mount.

Individual files are the unit of parallelism. Thus, lines from
different files are interleaved.

Args:

dir: Path to directory.

glob_pat: Pattern of files to read from the
directory. Defaults to `&#34;*&#34;` or all files.

&#34;&#34;&#34;

def __init__(self, dir: Path, glob_pat: str = &#34;*&#34;):
if not dir.exists():
raise ValueError(f&#34;input directory `{dir}` does not exist&#34;)
if not dir.is_dir():
raise ValueError(f&#34;input directory `{dir}` is not a directory&#34;)

self.dir = dir
self.glob_pat = glob_pat

def list_parts(self):
return [
str(path.relative_to(self.dir)) for path in self.dir.glob(self.glob_pat)
]

def build_part(self, for_part, resume_state):
path = self.dir / for_part
resume_i = resume_state or -1

return _stateful_read(path, resume_i)


class FileInput(PartInput):
&#34;&#34;&#34;Read a single file line-by-line from the filesystem.

This file must exist and be identical on all workers.

There is no parallelism; only one worker will actually read the
file.

Args:

path: Path to file.

&#34;&#34;&#34;

def __init__(self, path: Path):
self.path = path

def list_parts(self):
return [str(self.path)]

def build_part(self, for_part, resume_state):
# TODO: Warn and return None. Then we could support
# continuation from a different file.
assert for_part == str(self.path), &#34;Can&#39;t resume from different file&#34;
resume_i = resume_state or -1

return _stateful_read(self.path, resume_i)</code></pre>
</details>
</section>
<section>
</section>
<section>
</section>
<section>
</section>
<section>
<h2 class="api__article-subtitle" id="header-classes">Classes</h2>
<dl>
<dt id="bytewax.connectors.files.DirInput"><code class="language-python flex name class">
<span>class <span class="ident">DirInput</span></span>
<span>(</span><span>dir: pathlib.Path, glob_pat: str = '*')</span>
</code></dt>
<dd>
<div class="desc"><p>Read all files in a filesystem directory line-by-line.</p>
<p>The directory must exist and contain identical data on all
workers, so either run on a single machine or use a shared mount.</p>
<p>Individual files are the unit of parallelism. Thus, lines from
different files are interleaved.</p>
<h2 id="args">Args</h2>
<dl>
<dt><strong><code>dir</code></strong></dt>
<dd>Path to directory.</dd>
<dt><strong><code>glob_pat</code></strong></dt>
<dd>Pattern of files to read from the
directory. Defaults to <code>"*"</code> or all files.</dd>
</dl></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre class="language-python line-numbers"><code class="language-python">class DirInput(PartInput):
&#34;&#34;&#34;Read all files in a filesystem directory line-by-line.

The directory must exist and contain identical data on all
workers, so either run on a single machine or use a shared mount.

Individual files are the unit of parallelism. Thus, lines from
different files are interleaved.

Args:

dir: Path to directory.

glob_pat: Pattern of files to read from the
directory. Defaults to `&#34;*&#34;` or all files.

&#34;&#34;&#34;

def __init__(self, dir: Path, glob_pat: str = &#34;*&#34;):
if not dir.exists():
raise ValueError(f&#34;input directory `{dir}` does not exist&#34;)
if not dir.is_dir():
raise ValueError(f&#34;input directory `{dir}` is not a directory&#34;)

self.dir = dir
self.glob_pat = glob_pat

def list_parts(self):
return [
str(path.relative_to(self.dir)) for path in self.dir.glob(self.glob_pat)
]

def build_part(self, for_part, resume_state):
path = self.dir / for_part
resume_i = resume_state or -1

return _stateful_read(path, resume_i)</code></pre>
</details>
<h3>Ancestors</h3>
<ul class="hlist">
<li><a title="bytewax.inputs.PartInput" href="/apidocs/bytewax.inputs#bytewax.inputs.PartInput">PartInput</a></li>
</ul>
<h3>Inherited members</h3>
<ul class="hlist">
<li><code class="language-python"><b><a title="bytewax.inputs.PartInput" href="/apidocs/bytewax.inputs#bytewax.inputs.PartInput">PartInput</a></b></code>:
<ul class="hlist">
<li><code class="language-python"><a title="bytewax.inputs.PartInput.build_part" href="/apidocs/bytewax.inputs#bytewax.inputs.PartInput.build_part">build_part</a></code></li>
<li><code class="language-python"><a title="bytewax.inputs.PartInput.list_parts" href="/apidocs/bytewax.inputs#bytewax.inputs.PartInput.list_parts">list_parts</a></code></li>
</ul>
</li>
</ul>
</dd>
<dt id="bytewax.connectors.files.FileInput"><code class="language-python flex name class">
<span>class <span class="ident">FileInput</span></span>
<span>(</span><span>path: pathlib.Path)</span>
</code></dt>
<dd>
<div class="desc"><p>Read a single file line-by-line from the filesystem.</p>
<p>This file must exist and be identical on all workers.</p>
<p>There is no parallelism; only one worker will actually read the
file.</p>
<h2 id="args">Args</h2>
<dl>
<dt><strong><code>path</code></strong></dt>
<dd>Path to file.</dd>
</dl></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre class="language-python line-numbers"><code class="language-python">class FileInput(PartInput):
&#34;&#34;&#34;Read a single file line-by-line from the filesystem.

This file must exist and be identical on all workers.

There is no parallelism; only one worker will actually read the
file.

Args:

path: Path to file.

&#34;&#34;&#34;

def __init__(self, path: Path):
self.path = path

def list_parts(self):
return [str(self.path)]

def build_part(self, for_part, resume_state):
# TODO: Warn and return None. Then we could support
# continuation from a different file.
assert for_part == str(self.path), &#34;Can&#39;t resume from different file&#34;
resume_i = resume_state or -1

return _stateful_read(self.path, resume_i)</code></pre>
</details>
<h3>Ancestors</h3>
<ul class="hlist">
<li><a title="bytewax.inputs.PartInput" href="/apidocs/bytewax.inputs#bytewax.inputs.PartInput">PartInput</a></li>
</ul>
<h3>Inherited members</h3>
<ul class="hlist">
<li><code class="language-python"><b><a title="bytewax.inputs.PartInput" href="/apidocs/bytewax.inputs#bytewax.inputs.PartInput">PartInput</a></b></code>:
<ul class="hlist">
<li><code class="language-python"><a title="bytewax.inputs.PartInput.build_part" href="/apidocs/bytewax.inputs#bytewax.inputs.PartInput.build_part">build_part</a></code></li>
<li><code class="language-python"><a title="bytewax.inputs.PartInput.list_parts" href="/apidocs/bytewax.inputs#bytewax.inputs.PartInput.list_parts">list_parts</a></code></li>
</ul>
</li>
</ul>
</dd>
</dl>
</section>
<footer class="api__footer" id="footer">
<p class="api__footer-copyright">
Generated by <a href="https://pdoc3.github.io/pdoc" title="pdoc: Python API documentation generator"><cite>pdoc</cite> 0.10.0</a>.
</p>
</footer>
</article>
<nav class="api__sidebar" id="sidebar">
<ul class="api__sidebar-nav" id="index">
<li class="api__sidebar-nav-item">
<h3 class="api__sidebar-nav-title">Super-module</h3>
<ul class="api__sidebar-nav-menu">
<li class="api__sidebar-nav-menu-item">
<a title="bytewax.connectors" href="/apidocs/bytewax.connectors/index">bytewax.connectors</a>
</li>
</ul>
</li>
<li class="api__sidebar-nav-item">
<h3 class="api__sidebar-nav-title"><a href="#header-classes">Classes</a></h3>
<ul class="api__sidebar-nav-classes">
<li class="api__sidebar-nav-classes-item">
<h4 class="api__sidebar-nav-classes-title"><a title="bytewax.connectors.files.DirInput" href="/apidocs/bytewax.connectors/files#bytewax.connectors.files.DirInput">DirInput</a></h4>
</li>
<li class="api__sidebar-nav-classes-item">
<h4 class="api__sidebar-nav-classes-title"><a title="bytewax.connectors.files.FileInput" href="/apidocs/bytewax.connectors/files#bytewax.connectors.files.FileInput">FileInput</a></h4>
</li>
</ul>
</li>
</ul>
</nav>
</main>
34 changes: 33 additions & 1 deletion apidocs/html/bytewax/connectors/index.html
@@ -1,9 +1,23 @@
<main class="api__content">
<article class="api__article" id="content">
<header class="api__article-header">
<h1 class="api__article-title">Namespace <strong>bytewax.connectors</strong></h1>
<h1 class="api__article-title">Module <strong>bytewax.connectors</strong></h1>
</header>
<section class="api__article-intro" id="section-intro">
<p>Connectors for IO with external systems.</p>
<p>See <code><a title="bytewax.inputs" href="/apidocs/bytewax.inputs">bytewax.inputs</a></code> and <code><a title="bytewax.outputs" href="/apidocs/bytewax.outputs">bytewax.outputs</a></code> for the lower level
primitives to implement your own connector.</p>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre class="language-python line-numbers"><code class="language-python">&#34;&#34;&#34;Connectors for IO with external systems.

See `bytewax.inputs` and `bytewax.outputs` for the lower level
primitives to implement your own connector.

&#34;&#34;&#34;</code></pre>
</details>
</section>
<section>
<h2 class="api__article-subtitle" id="header-submodules">Sub-modules</h2>
Expand All @@ -20,6 +34,18 @@ <h4><a title="bytewax.connectors.dynamodb" href="/apidocs/bytewax.connectors/dyn
<div class="desc"></div>
</p>
</div>
<div class="api__article-submodules-item">
<h4><a title="bytewax.connectors.files" href="/apidocs/bytewax.connectors/files">bytewax.connectors.files</a></h4>
<p>
<div class="desc"><p>Connectors for local text files.</p></div>
</p>
</div>
<div class="api__article-submodules-item">
<h4><a title="bytewax.connectors.kafka" href="/apidocs/bytewax.connectors/kafka">bytewax.connectors.kafka</a></h4>
<p>
<div class="desc"><p>Connectors for <a href="https://kafka.apache.org">Kafka</a>.</p></div>
</p>
</div>
</div>
</section>
<section>
Expand Down Expand Up @@ -52,6 +78,12 @@ <h3 class="api__sidebar-nav-title">Super-module</h3>
<li class="api__sidebar-nav-menu-item">
<a title="bytewax.connectors.dynamodb" href="/apidocs/bytewax.connectors/dynamodb/index">bytewax.connectors.dynamodb</a>
</li>
<li class="api__sidebar-nav-menu-item">
<a title="bytewax.connectors.files" href="/apidocs/bytewax.connectors/files">bytewax.connectors.files</a>
</li>
<li class="api__sidebar-nav-menu-item">
<a title="bytewax.connectors.kafka" href="/apidocs/bytewax.connectors/kafka">bytewax.connectors.kafka</a>
</li>
</ul>
</li>
</ul>
Expand Down