Skip to content

Commit

Permalink
Merge pull request #42 from markheger/master
Browse files Browse the repository at this point in the history
0.9.0
  • Loading branch information
markheger committed Feb 6, 2020
2 parents cf74681 + 532c291 commit d765fa0
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 42 deletions.
4 changes: 2 additions & 2 deletions docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@
# built documents.
#
# The short X.Y version.
version = '0.8'
version = '0.9'
# The full version, including alpha/beta/rc tags.
release = '0.8.0'
release = '0.9.0'

# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
Expand Down
2 changes: 1 addition & 1 deletion streamsx/standard/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__='0.8.0'
__version__='0.9.0'
115 changes: 87 additions & 28 deletions streamsx/standard/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,21 @@ def __init__(self, directory, pattern=None, schema=CommonSchema.String, **option
self.move_to_directory = None
self.ignore_dot_files = None
self.ignore_existing_files_at_startup = None
if 'sleep_time' in options:
self.sleep_time = options.get('sleep_time')
if 'init_delay' in options:
self.init_delay = options.get('init_delay')
if 'sort_by' in options:
self.sort_by = options.get('sort_by')
if 'order' in options:
self.order = options.get('order')
if 'move_to_directory' in options:
self.move_to_directory = options.get('move_to_directory')
if 'ignore_dot_files' in options:
self.ignore_dot_files = options.get('ignore_dot_files')
if 'ignore_existing_files_at_startup' in options:
self.ignore_existing_files_at_startup = options.get('ignore_existing_files_at_startup')


@property
def sleep_time(self):
Expand Down Expand Up @@ -162,7 +177,6 @@ def populate(self, topology, name, **options):

return _op.stream


class FileSink(streamsx.topology.composite.ForEach):
"""
Write a stream to a file
Expand Down Expand Up @@ -554,12 +568,14 @@ def populate(self, topology, stream, name, **options) -> streamsx.topology.topol
return streamsx.topology.topology.Sink(_op)


def csv_reader(topology, schema, file, header=False, encoding=None, separator=None, ignoreExtraFields=False, hot=False, name=None):
class CSVReader(streamsx.topology.composite.Source):
"""Read a comma separated value file as a stream.
The file defined by `file` is read and mapped to a stream
with a structured schema of `schema`.
.. note:: Reads a single file only
Example for reading a file from application directory (file is part of application bundle)::
import streamsx.standard.files as files
Expand All @@ -570,7 +586,7 @@ def csv_reader(topology, schema, file, header=False, encoding=None, separator=No
topo.add_file_dependency(sample_file, 'etc') # add sample file to etc dir in bundle
fn = os.path.join('etc', 'data.csv') # file name relative to application dir
sch = 'tuple<rstring a, int32 b>'
r = files.csv_reader(topo, schema=sch, file=fn)
r = topo.source(files.CSVReader(schema=sch, file=fn))
Example for reading a file from file system accessible from the running job, for example persistent volume claim (Cloud Pak for Data)::
Expand All @@ -579,37 +595,74 @@ def csv_reader(topology, schema, file, header=False, encoding=None, separator=No
topo = Topology()
sample_file = '/opt/ibm/streams-ext/data.csv' # file location accessible from running Streams application
r = files.csv_reader(topo, schema='tuple<rstring a, int32 b>', file=sample_file)
r = topo.source(files.CSVReader(schema='tuple<rstring a, int32 b>', file=sample_file))
Args:
topology(Topology): Topology to contain the returned stream.
schema(StreamSchema): Schema of the returned stream.
file(str|Expression): Name of the source file. File name in relative path is expected in application directory, for example the file is added to the application bundle.
header: Does the file contain a header.
encoding: Specifies the character set encoding that is used in the output file.
separator(str): Separator between records (defaults to comma ``,``).
ignoreExtraFields: When `True` then if the file contains more
ignoreExtraFields(bool): When `True` then if the file contains more
fields than `schema` has attributes they will be ignored.
Otherwise if there are extra fields an error is raised.
hot(bool): Specifies whether the input file is hot, which means it is appended continuously.
name(str): Name of the stream, defaults to a generated name.
Return:
(Stream): Stream containing records from the file.
"""
fe = streamsx.spl.op.Expression.expression(Format.csv.name)
if isinstance(file, streamsx.spl.op.Expression) is False:
if isinstance(file, str):
if os.path.isabs(file) is False:
file = streamsx.spl.op.Expression.expression('getApplicationDir()+"'+'/'+file+'"')
print("file="+str(file))
else:
raise TypeError(file)
_op = _FileSource(topology, schema, file=file, format=fe, hotFile=hot,encoding=encoding,separator=separator,ignoreExtraCSVValues=ignoreExtraFields)
return _op.outputs[0]


def csv_writer(stream, file, append=None, encoding=None, separator=None, flush=None, name=None):
def __init__(self, schema, file, header=False, encoding=None, separator=None, ignoreExtraFields=False, hot=False):
self.schema = schema
self.file = file
self.header = header
self.encoding = encoding
self.separator = separator
self.ignoreExtraFields = ignoreExtraFields
self.hot = hot

def populate(self, topology, name, **options):
fe = streamsx.spl.op.Expression.expression(Format.csv.name)
if self.file is None:
raise ValueError('file must not be None')
if isinstance(self.file, streamsx.spl.op.Expression) is False:
if isinstance(self.file, str):
if os.path.isabs(self.file) is False:
self.file = streamsx.spl.op.Expression.expression('getApplicationDir()+"'+'/'+self.file+'"')
print("file="+str(self.file))
else:
raise TypeError(self.file)
_op = _FileSource(topology, schemas=self.schema, file=self.file, format=fe, hotFile=self.hot, encoding=self.encoding, separator=self.separator, hasHeaderLine=self.header, ignoreExtraCSVValues=self.ignoreExtraFields)
return _op.outputs[0]


class CSVFilesReader(streamsx.topology.composite.Map):
"""Reads files given by input stream and generates tuples with the file content on the output stream.
.. note:: Each input tuple holds the file name to be read
.. seealso:: Use :py:meth:`~streamsx.standard.files.CSVReader` for single file given as parameter
Args:
header: Does the file contain a header.
encoding: Specifies the character set encoding that is used in the output file.
separator(str): Separator between records (defaults to comma ``,``).
ignoreExtraFields(bool): When `True` then if the file contains more
fields than `schema` has attributes they will be ignored.
Otherwise if there are extra fields an error is raised.
"""
def __init__(self, header=False, encoding=None, separator=None, ignoreExtraFields=False):
self.header = header
self.encoding = encoding
self.separator = separator
self.ignoreExtraFields = ignoreExtraFields

def populate(self, topology, stream, schema, name, **options):
fe = streamsx.spl.op.Expression.expression(Format.csv.name)
_op = _FileSource(topology, schemas=schema, stream=stream, format=fe, encoding=self.encoding, separator=self.separator, hasHeaderLine=self.header, ignoreExtraCSVValues=self.ignoreExtraFields)
return _op.outputs[0]


class CSVWriter(streamsx.topology.composite.ForEach):
"""Write a stream as a comma separated value file.
The file defined by `file` is used as output file.
Expand All @@ -623,23 +676,29 @@ def csv_writer(stream, file, append=None, encoding=None, separator=None, flush=N
s = topo.source(range(13))
sch = 'tuple<rstring a, int32 b>'
s = s.map(lambda v: ('A'+str(v), v+7), schema=sch)
files.csv_writer(s, file=os.path.join(tempfile.mkdtemp(), 'data.csv'))
s.for_each(files.CSVWriter(file=os.path.join(tempfile.mkdtemp(), 'data.csv')))
Args:
topology(Topology): Topology to contain the returned stream.
file(str|Expression): Name of the output file. File name in relative path is relative to data directory.
append(bool): Specifies that the generated tuples are appended to the output file. If this parameter is false or not specified, the output file is truncated before the tuples are generated.
encoding: Specifies the character set encoding that is used in the output file.
separator(str): Separator between records (defaults to comma ``,``).
flush(int): Specifies the number of tuples after which to flush the output file. By default no flushing on tuple numbers is performed.
name(str): Name of the stream, defaults to a generated name.
Return:
(streamsx.spl.op.Invoke): Sink operator
"""
fe = streamsx.spl.op.Expression.expression(Format.csv.name)
_op = _FileSink(stream, file, format=fe, append=append, encoding=encoding, separator=separator, flush=flush, name=name)
def __init__(self, file, append=None, encoding=None, separator=None, flush=None):
self.file = file
self.append = append
self.encoding = encoding
self.separator = separator
self.flush = flush

def populate(self, topology, stream, name, **options) -> streamsx.topology.topology.Sink:
fe = streamsx.spl.op.Expression.expression(Format.csv.name)
_op = _FileSink(stream, self.file, format=fe, append=self.append, encoding=self.encoding, separator=self.separator, flush=self.flush, name=name)
return streamsx.topology.topology.Sink(_op)


class _DirectoryScan(streamsx.spl.op.Source):
Expand Down Expand Up @@ -670,9 +729,9 @@ def __init__(self, topology, schema, directory, pattern=None, sleepTime=None, in

class _FileSource(streamsx.spl.op.Invoke):

def __init__(self, topology, schemas, file=None, format=None, defaultTuple=None, parsing=None, hasDelayField=None, compression=None, eolMarker=None, blockSize=None, initDelay=None, hotFile=None, deleteFile=None, moveFileToDirectory=None, separator=None, encoding=None, hasHeaderLine=None, ignoreOpenErrors=None, readPunctuations=None, ignoreExtraCSVValues=None, name=None):
def __init__(self, topology, schemas, stream=None, file=None, format=None, defaultTuple=None, parsing=None, hasDelayField=None, compression=None, eolMarker=None, blockSize=None, initDelay=None, hotFile=None, deleteFile=None, moveFileToDirectory=None, separator=None, encoding=None, hasHeaderLine=None, ignoreOpenErrors=None, readPunctuations=None, ignoreExtraCSVValues=None, name=None):
kind="spl.adapter::FileSource"
inputs=None
inputs=stream
params = dict()
if file is not None:
params['file'] = file
Expand Down
22 changes: 11 additions & 11 deletions streamsx/standard/tests/test_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,17 @@ def test_read_write(self):
s = topo.source(range(13))
sch = 'tuple<rstring a, int32 b>'
s = s.map(lambda v: ('A'+str(v), v+7), schema=sch)

fn = os.path.join(self.dir, 'data.csv')
files.csv_writer(s, fn)
s.for_each(files.CSVWriter(fn))

tester = Tester(topo)
tester.tuple_count(s, 13)
tester.test(self.test_ctxtype, self.test_config)

self.assertTrue(os.path.isfile(fn))

topo = Topology()
r = files.csv_reader(topo, schema=sch, file=fn)
r = topo.source(files.CSVReader(schema=sch, file=fn))
expected = [ {'a':'A'+str(v), 'b':v+7} for v in range(13)]

tester = Tester(topo)
Expand Down Expand Up @@ -72,7 +72,7 @@ def test_composite(self):
self.assertTrue(os.path.isfile(fn))

topo = Topology()
r = files.csv_reader(topo, schema=sch, file=fn)
r = topo.source(files.CSVReader(schema=sch, file=fn))
expected = [ {'a':'A'+str(v), 'b':v+7} for v in range(13)]

tester = Tester(topo)
Expand Down Expand Up @@ -108,7 +108,7 @@ def test_composite_kwargs(self):
self.assertTrue(os.path.isfile(fn))

topo = Topology()
r = files.csv_reader(topo, schema=sch, file=fn)
r = topo.source(files.CSVReader(schema=sch, file=fn))
expected = [ {'a':'A'+str(v), 'b':v+7} for v in range(13)]

tester = Tester(topo)
Expand All @@ -123,7 +123,7 @@ def test_read_file_from_application_dir(self):
fn = os.path.join('etc', 'data.csv') # file name relative to application dir
sch = 'tuple<rstring a, int32 b>'
#fn = streamsx.spl.op.Expression.expression('getApplicationDir()+"'+'/'+fn+'"')
r = files.csv_reader(topo, schema=sch, file=fn)
r = topo.source(files.CSVReader(schema=sch, file=fn))
r.print()

tester = Tester(topo)
Expand Down Expand Up @@ -158,13 +158,13 @@ class TestParams(TestCase):
def test_filename_ok(self):
topo = Topology()
fn = streamsx.spl.op.Expression.expression('getApplicationDir()+"'+'/a/b"')
files.csv_reader(topo, schema='tuple<rstring a, int32 b>', file=fn)
files.csv_reader(topo, schema=CommonSchema.String, file="/tmp/a")
topo.source(files.CSVReader(schema='tuple<rstring a, int32 b>', file=fn))
topo.source(files.CSVReader(schema=CommonSchema.String, file="/tmp/a"))

def test_filename_bad(self):
topo = Topology()
fn = 1
self.assertRaises(TypeError, files.csv_reader, topo, 'tuple<rstring a>', fn) # expects str or Expression for file
self.assertRaises(TypeError, topo.source, files.CSVReader, 'tuple<rstring a>', fn) # expects str or Expression for file


class TestDirScan(TestCase):
Expand All @@ -183,13 +183,13 @@ def test_dir_scan(self):
fn = os.path.join('etc', 'data.csv') # file name relative to application dir
dir = streamsx.spl.op.Expression.expression('getApplicationDir()+"'+'/etc"')
scanned = topo.source(files.DirectoryScan(directory=dir))
scanned.print()
r = scanned.map(files.CSVFilesReader(), schema=StreamSchema('tuple<rstring a, int32 b>'))
r.print()

#result = streamsx.topology.context.submit("TOOLKIT", topo.graph) # creates tk* directory
#print('(TOOLKIT):' + str(result))
#assert(result.return_code == 0)
result = streamsx.topology.context.submit("BUNDLE", topo.graph) # creates sab file
#print('(BUNDLE):' + str(result))
assert(result.return_code == 0)
os.remove(result.bundlePath)
os.remove(result.jobConfigPath)
Expand Down

0 comments on commit d765fa0

Please sign in to comment.