Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a _CSVSource and CSVInput subclass of FileInput #245

Merged
merged 11 commits into from
May 30, 2023
8 changes: 8 additions & 0 deletions examples/csv_input.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from bytewax.connectors.files import CSVInput
from bytewax.dataflow import Dataflow
from bytewax.connectors.stdio import StdOutput


flow = Dataflow()
flow.input("inp", CSVInput("examples/sample_data/ec2_metrics.csv", delimiter=","))
flow.output("out", StdOutput())
9 changes: 9 additions & 0 deletions examples/sample_data/metrics.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
index,timestamp,value,instance
0,2022-02-24 11:42:08,0.132,24ae8d
0,2022-02-24 11:42:08,0.066,c6585a
0,2022-02-24 11:42:08,42.652,ac20cd
0,2022-02-24 11:42:08,51.846,5f5533
0,2022-02-24 11:42:08,2.296,fe7f93
0,2022-02-24 11:42:08,1.732,53ea38
0,2022-02-24 11:42:08,91.958,825cc2
0,2022-02-24 11:42:08,0.068,77c1ca
92 changes: 89 additions & 3 deletions pysrc/bytewax/connectors/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
"""
import os
from pathlib import Path
from typing import Callable
from typing import Callable, Union
from zlib import adler32
import csv

from bytewax.inputs import PartitionedInput, StatefulSource
from bytewax.outputs import PartitionedOutput, StatefulSink
Expand All @@ -14,6 +15,7 @@
"DirOutput",
"FileInput",
"FileOutput",
"CSVInput",
]


Expand Down Expand Up @@ -89,8 +91,9 @@ class FileInput(PartitionedInput):

"""

def __init__(self, path: Path):
self._path = path
def __init__(self, path: Union[Path, str]):
if not isinstance(path, Path): path = Path(path)
self._path = path

def list_parts(self):
return {str(self._path)}
Expand All @@ -101,6 +104,89 @@ def build_part(self, for_part, resume_state):
assert for_part == str(self._path), "Can't resume reading from different file"
return _FileSource(self._path, resume_state)

class CSVInput(FileInput):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe call this DictCSVInput? This input won't work correctly on CSV files without a header and that would parallel the name of the csv.DictReader in the stdlib that works with header-based CSVs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure it makes sense to provide a non-header version right now because that would be the same as lines that you could parse downstream with next(csv.reader([line])) if you don't care about the header.

I'd like to leave it as is for right now and we can always add a way to skip the header or provide your own header.

"""Read a single csv file line-by-line from the filesystem.

Will read the first row as the header.

For each successive line it will return a dictionary
with the header as keys like the DictReader() method.

This csv file must exist and be identical on all workers.

There is no parallelism; only one worker will actually read the
file.

Args:

path: Path to file.
**fmtparams: Any custom formatting arguments you can pass to [`csv.reader`](https://docs.python.org/3/library/csv.html?highlight=csv#csv.reader).

sample input:

```
index,timestamp,value,instance
0,2022-02-24 11:42:08,0.132,24ae8d
0,2022-02-24 11:42:08,0.066,c6585a
0,2022-02-24 11:42:08,42.652,ac20cd
0,2022-02-24 11:42:08,51.846,5f5533
0,2022-02-24 11:42:08,2.296,fe7f93
0,2022-02-24 11:42:08,1.732,53ea38
0,2022-02-24 11:42:08,91.958,825cc2
0,2022-02-24 11:42:08,0.068,77c1ca
```

awmatheson marked this conversation as resolved.
Show resolved Hide resolved
sample output:

```
{'index': '0', 'timestamp': '2022-02-24 11:42:08', 'value': '0.132', 'instance': '24ae8d'}
{'index': '0', 'timestamp': '2022-02-24 11:42:08', 'value': '0.066', 'instance': 'c6585a'}
{'index': '0', 'timestamp': '2022-02-24 11:42:08', 'value': '42.652', 'instance': 'ac20cd'}
{'index': '0', 'timestamp': '2022-02-24 11:42:08', 'value': '51.846', 'instance': '5f5533'}
{'index': '0', 'timestamp': '2022-02-24 11:42:08', 'value': '2.296', 'instance': 'fe7f93'}
{'index': '0', 'timestamp': '2022-02-24 11:42:08', 'value': '1.732', 'instance': '53ea38'}
{'index': '0', 'timestamp': '2022-02-24 11:42:08', 'value': '91.958', 'instance': '825cc2'}
{'index': '0', 'timestamp': '2022-02-24 11:42:08', 'value': '0.068', 'instance': '77c1ca'}
```
"""
def __init__(self, path: Path, **fmtparams):
super().__init__(path)
self.fmtparams = fmtparams

def build_part(self, for_part, resume_state):
assert for_part == str(self._path), "Can't resume reading from different file"
return _CSVSource(self._path, resume_state, **self.fmtparams)

class _CSVSource(_FileSource):
"""
Handler for csv files to iterate line by line.
Uses the csv reader assumes a header on the file
on each next() call, will return a dict of header
& values

Called by CSVInput
"""

def __init__(self, path, resume_state, **fmtparams):
resume_offset = resume_state or 0
self._f = open(path, "rt")
self.fmtparams = fmtparams
self.header = next(csv.reader([self._f.readline()], **self.fmtparams))
if resume_offset:
self._f.seek(resume_offset)

def next(self):
line = self._f.readline()
csv_line = dict(zip(self.header, next(csv.reader([line], **self.fmtparams))))
if len(line) <= 0:
raise StopIteration()
return csv_line

def snapshot(self):
return self._f.tell()

def close(self):
self._f.close()

class _FileSink(StatefulSink):
def __init__(self, path, resume_state, end):
Expand Down
24 changes: 23 additions & 1 deletion pytests/connectors/test_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from pytest import raises

from bytewax.connectors.files import DirInput, DirOutput, FileInput, FileOutput
from bytewax.connectors.files import DirInput, DirOutput, FileInput, FileOutput, CSVInput
from bytewax.dataflow import Dataflow
from bytewax.testing import run_main, TestingInput, TestingOutput

Expand Down Expand Up @@ -71,6 +71,28 @@ def test_file_input():
"one6",
]

def test_csv_file_input():
file_path = Path("examples/sample_data/metrics.csv")

flow = Dataflow()

flow.input("inp", CSVInput(file_path))

out = []
flow.output("out", TestingOutput(out))

run_main(flow)

assert out == [
{'index': '0', 'timestamp': '2022-02-24 11:42:08', 'value': '0.132', 'instance': '24ae8d'},
{'index': '0', 'timestamp': '2022-02-24 11:42:08', 'value': '0.066', 'instance': 'c6585a'},
{'index': '0', 'timestamp': '2022-02-24 11:42:08', 'value': '42.652', 'instance': 'ac20cd'},
{'index': '0', 'timestamp': '2022-02-24 11:42:08', 'value': '51.846', 'instance': '5f5533'},
{'index': '0', 'timestamp': '2022-02-24 11:42:08', 'value': '2.296', 'instance': 'fe7f93'},
{'index': '0', 'timestamp': '2022-02-24 11:42:08', 'value': '1.732', 'instance': '53ea38'},
{'index': '0', 'timestamp': '2022-02-24 11:42:08', 'value': '91.958', 'instance': '825cc2'},
{'index': '0', 'timestamp': '2022-02-24 11:42:08', 'value': '0.068', 'instance': '77c1ca'}
]

def test_file_input_resume_state():
file_path = Path("examples/sample_data/cluster/partition-1.txt")
Expand Down