Skip to content

Commit

Permalink
Merge pull request #245 from bytewax/add_csv
Browse files Browse the repository at this point in the history
Add a _CSVSource and CSVInput subclass of FileInput
  • Loading branch information
awmatheson committed May 30, 2023
2 parents 72ef095 + a41d608 commit 9e1ebe3
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 4 deletions.
8 changes: 8 additions & 0 deletions examples/csv_input.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from bytewax.connectors.files import CSVInput
from bytewax.dataflow import Dataflow
from bytewax.connectors.stdio import StdOutput


flow = Dataflow()
flow.input("inp", CSVInput("examples/sample_data/ec2_metrics.csv", delimiter=","))
flow.output("out", StdOutput())
9 changes: 9 additions & 0 deletions examples/sample_data/metrics.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
index,timestamp,value,instance
0,2022-02-24 11:42:08,0.132,24ae8d
0,2022-02-24 11:42:08,0.066,c6585a
0,2022-02-24 11:42:08,42.652,ac20cd
0,2022-02-24 11:42:08,51.846,5f5533
0,2022-02-24 11:42:08,2.296,fe7f93
0,2022-02-24 11:42:08,1.732,53ea38
0,2022-02-24 11:42:08,91.958,825cc2
0,2022-02-24 11:42:08,0.068,77c1ca
92 changes: 89 additions & 3 deletions pysrc/bytewax/connectors/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
"""
import os
from pathlib import Path
from typing import Callable
from typing import Callable, Union
from zlib import adler32
import csv

from bytewax.inputs import PartitionedInput, StatefulSource
from bytewax.outputs import PartitionedOutput, StatefulSink
Expand All @@ -14,6 +15,7 @@
"DirOutput",
"FileInput",
"FileOutput",
"CSVInput",
]


Expand Down Expand Up @@ -89,8 +91,9 @@ class FileInput(PartitionedInput):
"""

def __init__(self, path: Path):
self._path = path
def __init__(self, path: Union[Path, str]):
if not isinstance(path, Path): path = Path(path)
self._path = path

def list_parts(self):
return {str(self._path)}
Expand All @@ -101,6 +104,89 @@ def build_part(self, for_part, resume_state):
assert for_part == str(self._path), "Can't resume reading from different file"
return _FileSource(self._path, resume_state)

class CSVInput(FileInput):
"""Read a single csv file line-by-line from the filesystem.
Will read the first row as the header.
For each successive line it will return a dictionary
with the header as keys like the DictReader() method.
This csv file must exist and be identical on all workers.
There is no parallelism; only one worker will actually read the
file.
Args:
path: Path to file.
**fmtparams: Any custom formatting arguments you can pass to [`csv.reader`](https://docs.python.org/3/library/csv.html?highlight=csv#csv.reader).
sample input:
```
index,timestamp,value,instance
0,2022-02-24 11:42:08,0.132,24ae8d
0,2022-02-24 11:42:08,0.066,c6585a
0,2022-02-24 11:42:08,42.652,ac20cd
0,2022-02-24 11:42:08,51.846,5f5533
0,2022-02-24 11:42:08,2.296,fe7f93
0,2022-02-24 11:42:08,1.732,53ea38
0,2022-02-24 11:42:08,91.958,825cc2
0,2022-02-24 11:42:08,0.068,77c1ca
```
sample output:
```
{'index': '0', 'timestamp': '2022-02-24 11:42:08', 'value': '0.132', 'instance': '24ae8d'}
{'index': '0', 'timestamp': '2022-02-24 11:42:08', 'value': '0.066', 'instance': 'c6585a'}
{'index': '0', 'timestamp': '2022-02-24 11:42:08', 'value': '42.652', 'instance': 'ac20cd'}
{'index': '0', 'timestamp': '2022-02-24 11:42:08', 'value': '51.846', 'instance': '5f5533'}
{'index': '0', 'timestamp': '2022-02-24 11:42:08', 'value': '2.296', 'instance': 'fe7f93'}
{'index': '0', 'timestamp': '2022-02-24 11:42:08', 'value': '1.732', 'instance': '53ea38'}
{'index': '0', 'timestamp': '2022-02-24 11:42:08', 'value': '91.958', 'instance': '825cc2'}
{'index': '0', 'timestamp': '2022-02-24 11:42:08', 'value': '0.068', 'instance': '77c1ca'}
```
"""
def __init__(self, path: Path, **fmtparams):
super().__init__(path)
self.fmtparams = fmtparams

def build_part(self, for_part, resume_state):
assert for_part == str(self._path), "Can't resume reading from different file"
return _CSVSource(self._path, resume_state, **self.fmtparams)

class _CSVSource(_FileSource):
"""
Handler for csv files to iterate line by line.
Uses the csv reader assumes a header on the file
on each next() call, will return a dict of header
& values
Called by CSVInput
"""

def __init__(self, path, resume_state, **fmtparams):
resume_offset = resume_state or 0
self._f = open(path, "rt")
self.fmtparams = fmtparams
self.header = next(csv.reader([self._f.readline()], **self.fmtparams))
if resume_offset:
self._f.seek(resume_offset)

def next(self):
line = self._f.readline()
csv_line = dict(zip(self.header, next(csv.reader([line], **self.fmtparams))))
if len(line) <= 0:
raise StopIteration()
return csv_line

def snapshot(self):
return self._f.tell()

def close(self):
self._f.close()

class _FileSink(StatefulSink):
def __init__(self, path, resume_state, end):
Expand Down
24 changes: 23 additions & 1 deletion pytests/connectors/test_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from pytest import raises

from bytewax.connectors.files import DirInput, DirOutput, FileInput, FileOutput
from bytewax.connectors.files import DirInput, DirOutput, FileInput, FileOutput, CSVInput
from bytewax.dataflow import Dataflow
from bytewax.testing import run_main, TestingInput, TestingOutput

Expand Down Expand Up @@ -71,6 +71,28 @@ def test_file_input():
"one6",
]

def test_csv_file_input():
file_path = Path("examples/sample_data/metrics.csv")

flow = Dataflow()

flow.input("inp", CSVInput(file_path))

out = []
flow.output("out", TestingOutput(out))

run_main(flow)

assert out == [
{'index': '0', 'timestamp': '2022-02-24 11:42:08', 'value': '0.132', 'instance': '24ae8d'},
{'index': '0', 'timestamp': '2022-02-24 11:42:08', 'value': '0.066', 'instance': 'c6585a'},
{'index': '0', 'timestamp': '2022-02-24 11:42:08', 'value': '42.652', 'instance': 'ac20cd'},
{'index': '0', 'timestamp': '2022-02-24 11:42:08', 'value': '51.846', 'instance': '5f5533'},
{'index': '0', 'timestamp': '2022-02-24 11:42:08', 'value': '2.296', 'instance': 'fe7f93'},
{'index': '0', 'timestamp': '2022-02-24 11:42:08', 'value': '1.732', 'instance': '53ea38'},
{'index': '0', 'timestamp': '2022-02-24 11:42:08', 'value': '91.958', 'instance': '825cc2'},
{'index': '0', 'timestamp': '2022-02-24 11:42:08', 'value': '0.068', 'instance': '77c1ca'}
]

def test_file_input_resume_state():
file_path = Path("examples/sample_data/cluster/partition-1.txt")
Expand Down

0 comments on commit 9e1ebe3

Please sign in to comment.