Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a _CSVSource and CSVInput subclass of FileInput #245

Merged
merged 11 commits into from
May 30, 2023
12 changes: 12 additions & 0 deletions examples/csv_input.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from bytewax.connectors.files import FileInput
from bytewax.dataflow import Dataflow
from bytewax.connectors.stdio import StdOutput


flow = Dataflow()
flow.input("inp", FileInput("examples/sample_data/ec2_metrics.csv"))
flow.output("out", StdOutput())

from bytewax.testing import run_main

run_main(flow)
9 changes: 9 additions & 0 deletions examples/sample_data/metrics.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
index,timestamp,value,instance
0,2022-02-24 11:42:08,0.132,24ae8d
0,2022-02-24 11:42:08,0.066,c6585a
0,2022-02-24 11:42:08,42.652,ac20cd
0,2022-02-24 11:42:08,51.846,5f5533
0,2022-02-24 11:42:08,2.296,fe7f93
0,2022-02-24 11:42:08,1.732,53ea38
0,2022-02-24 11:42:08,91.958,825cc2
0,2022-02-24 11:42:08,0.068,77c1ca
39 changes: 38 additions & 1 deletion pysrc/bytewax/connectors/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pathlib import Path
from typing import Callable
from zlib import adler32
import csv

from bytewax.inputs import PartitionedInput, StatefulSource
from bytewax.outputs import PartitionedOutput, StatefulSink
Expand All @@ -14,6 +15,7 @@
"DirOutput",
"FileInput",
"FileOutput",
"CSVInput",
]


Expand Down Expand Up @@ -78,6 +80,10 @@ def build_part(self, for_part, resume_state):
class FileInput(PartitionedInput):
"""Read a single file line-by-line from the filesystem.

FileInput will attempt to identify the type of the file.
awmatheson marked this conversation as resolved.
Show resolved Hide resolved
If the file is of type csv, it will return a dictionary
with the headers as the keys.

This file must exist and be identical on all workers.

There is no parallelism; only one worker will actually read the
Expand All @@ -91,6 +97,12 @@ class FileInput(PartitionedInput):

def __init__(self, path: Path):
awmatheson marked this conversation as resolved.
Show resolved Hide resolved
self._path = path
if str(type(path)) == "<class 'pathlib.PosixPath'>":
awmatheson marked this conversation as resolved.
Show resolved Hide resolved
self.file_type = path.suffix
else:
self._path = Path(path)
self.file_type = self._path.suffix


def list_parts(self):
return {str(self._path)}
Expand All @@ -99,8 +111,33 @@ def build_part(self, for_part, resume_state):
# TODO: Warn and return None. Then we could support
# continuation from a different file.
assert for_part == str(self._path), "Can't resume reading from different file"
return _FileSource(self._path, resume_state)
if self.file_type == '.csv':
return _CSVSource(self._path, resume_state)
else:
return _FileSource(self._path, resume_state)

class _CSVSource(StatefulSource):
def __init__(self, path, resume_state):
resume_offset = resume_state or 0
self._f = open(path, "rt")
self.header = list(csv.reader([self._f.readline()]))[0]
header_position = self._f.tell()
if resume_offset <= header_position:
awmatheson marked this conversation as resolved.
Show resolved Hide resolved
resume_offset = header_position
self._f.seek(resume_offset)

def next(self):
line = self._f.readline()
csv_line = dict(zip(self.header, list(csv.reader([line]))[0]))
if len(line) <= 0:
raise StopIteration()
return csv_line

def snapshot(self):
return self._f.tell()

def close(self):
self._f.close()

class _FileSink(StatefulSink):
def __init__(self, path, resume_state, end):
Expand Down
22 changes: 22 additions & 0 deletions pytests/connectors/test_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,28 @@ def test_file_input():
"one6",
]

def test_csv_file_input():
file_path = Path("examples/sample_data/metrics.csv")

flow = Dataflow()

flow.input("inp", FileInput(file_path))

out = []
flow.output("out", TestingOutput(out))

run_main(flow)

assert out == [
{'index': '0', 'timestamp': '2022-02-24 11:42:08', 'value': '0.132', 'instance': '24ae8d'},
{'index': '0', 'timestamp': '2022-02-24 11:42:08', 'value': '0.066', 'instance': 'c6585a'},
{'index': '0', 'timestamp': '2022-02-24 11:42:08', 'value': '42.652', 'instance': 'ac20cd'},
{'index': '0', 'timestamp': '2022-02-24 11:42:08', 'value': '51.846', 'instance': '5f5533'},
{'index': '0', 'timestamp': '2022-02-24 11:42:08', 'value': '2.296', 'instance': 'fe7f93'},
{'index': '0', 'timestamp': '2022-02-24 11:42:08', 'value': '1.732', 'instance': '53ea38'},
{'index': '0', 'timestamp': '2022-02-24 11:42:08', 'value': '91.958', 'instance': '825cc2'},
{'index': '0', 'timestamp': '2022-02-24 11:42:08', 'value': '0.068', 'instance': '77c1ca'}
]

def test_file_input_resume_state():
file_path = Path("examples/sample_data/cluster/partition-1.txt")
Expand Down