Skip to content

Commit

Permalink
Merge pull request #34 from markheger/master
Browse files Browse the repository at this point in the history
0.6
  • Loading branch information
markheger committed Feb 3, 2020
2 parents 1d493a1 + 0fb391b commit ac1e81c
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 4 deletions.
4 changes: 2 additions & 2 deletions docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@
# built documents.
#
# The short X.Y version.
version = '0.5'
version = '0.6'
# The full version, including alpha/beta/rc tags.
release = '0.5.0'
release = '0.6.0'

# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
Expand Down
2 changes: 1 addition & 1 deletion streamsx/standard/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__='0.5.0'
__version__='0.6.0'
42 changes: 41 additions & 1 deletion streamsx/standard/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ class FileSink(streamsx.topology.composite.ForEach):
----------
file : str
Name of the output file.
options : dict
The additional optional parameters as variable keyword arguments.
"""


def __init__(self, file):
def __init__(self, file, **options):
self.file = file
self.append = None
self.bytes_per_file = None
Expand All @@ -59,6 +61,44 @@ def __init__(self, file):
self.tuples_per_file = None
self.write_failure_action = None
self.write_punctuations = None
if 'append' in options:
self.append = options.get('append')
if 'bytes_per_file' in options:
self.bytes_per_file = options.get('bytes_per_file')
if 'close_mode' in options:
self.close_mode = options.get('close_mode')
if 'compression' in options:
self.compression = options.get('compression')
if 'encoding' in options:
self.encoding = options.get('encoding')
if 'eol_marker' in options:
self.eol_marker = options.get('eol_marker')
if 'flush' in options:
self.flush = options.get('flush')
if 'flush_on_punctuation' in options:
self.flush_on_punctuation = options.get('flush_on_punctuation')
if 'format' in options:
self.format = options.get('format')
if 'has_delay_field' in options:
self.has_delay_field = options.get('has_delay_field')
if 'move_file_to_directory' in options:
self.move_file_to_directory = options.get('move_file_to_directory')
if 'quote_strings' in options:
self.quote_strings = options.get('quote_strings')
if 'separator' in options:
self.separator = options.get('separator')
if 'suppress' in options:
self.suppress = options.get('suppress')
if 'time_per_file' in options:
self.time_per_file = options.get('time_per_file')
if 'truncate_on_reset' in options:
self.truncate_on_reset = options.get('truncate_on_reset')
if 'tuples_per_file' in options:
self.tuples_per_file = options.get('tuples_per_file')
if 'write_failure_action' in options:
self.write_failure_action = options.get('write_failure_action')
if 'write_punctuations' in options:
self.write_punctuations = options.get('write_punctuations')

@property
def append(self):
Expand Down
35 changes: 35 additions & 0 deletions streamsx/standard/tests/test_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,41 @@ def test_composite(self):
tester.contents(r, expected)
tester.test(self.test_ctxtype, self.test_config)

def test_composite_kwargs(self):
topo = Topology()
s = topo.source(range(13))
sch = 'tuple<rstring a, int32 b>'
s = s.map(lambda v: ('A'+str(v), v+7), schema=sch)

fn = os.path.join(self.dir, 'data.csv')

config = {
'append': True,
'flush': 1,
'close_mode': CloseMode.punct.name,
'flush_on_punctuation': True,
'format': Format.csv.name,
'has_delay_field': False,
'quote_strings': False,
'write_failure_action': WriteFailureAction.log.name,
'write_punctuations': False,
}
fsink = files.FileSink(fn, **config)
s.for_each(fsink)

tester = Tester(topo)
tester.tuple_count(s, 13)
tester.test(self.test_ctxtype, self.test_config)

self.assertTrue(os.path.isfile(fn))

topo = Topology()
r = files.csv_reader(topo, schema=sch, file=fn)
expected = [ {'a':'A'+str(v), 'b':v+7} for v in range(13)]

tester = Tester(topo)
tester.contents(r, expected)
tester.test(self.test_ctxtype, self.test_config)

def test_read_file_from_application_dir(self):
topo = Topology()
Expand Down

0 comments on commit ac1e81c

Please sign in to comment.