Skip to content

Commit

Permalink
Merge pull request #40 from markheger/master
Browse files Browse the repository at this point in the history
0.8.0
  • Loading branch information
markheger committed Feb 6, 2020
2 parents d8465f6 + d9b8781 commit cf74681
Show file tree
Hide file tree
Showing 9 changed files with 220 additions and 22 deletions.
4 changes: 2 additions & 2 deletions docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@
# built documents.
#
# The short X.Y version.
version = '0.7'
version = '0.8'
# The full version, including alpha/beta/rc tags.
release = '0.7.0'
release = '0.8.0'

# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
Expand Down
19 changes: 18 additions & 1 deletion streamsx/standard/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import enum

__all__ = ['CloseMode', 'WriteFailureAction', 'Format', 'Compression']
__all__ = ['CloseMode', 'WriteFailureAction', 'Format', 'Compression', 'SortByType', 'SortOrder']

@enum.unique
class CloseMode(enum.Enum):
Expand Down Expand Up @@ -59,3 +59,20 @@ class Compression(enum.Enum):
"""`gzip <https:://en.wikipedia.org/wiki/Gzip>`_ data compression."""
bzip2 = 2
"""`bzip2 <https:://en.wikipedia.org/wiki/Bzip2>`_ data compression."""

@enum.unique
class SortByType(enum.Enum):
"""Sort by type."""
date = 0
"""sort by file date"""
name = 1
"""sort by file name"""

@enum.unique
class SortOrder(enum.Enum):
"""Sort order."""
ascending = 0
"""ascending"""
descending = 1
"""descending"""

3 changes: 2 additions & 1 deletion streamsx/standard/_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@
Compression = enum.Enum('Compression', 'zlib gzip bzip2')
CloseMode = enum.Enum('CloseMode', 'punct count size time dynamic never')
WriteFailureAction = enum.Enum('WriteFailureAction', 'ignore log terminate')

SortByType = = enum.Enum('SortByType', 'date name')
SortOrder = = enum.Enum('SortOrder', 'ascending descending')
2 changes: 1 addition & 1 deletion streamsx/standard/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__='0.7.0'
__version__='0.8.0'
156 changes: 151 additions & 5 deletions streamsx/standard/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,161 @@
import os
import enum
import streamsx.spl.op
from streamsx.topology.schema import StreamSchema
from streamsx.standard import CloseMode, Format, Compression, WriteFailureAction
from streamsx.topology.schema import CommonSchema, StreamSchema
from streamsx.standard import CloseMode, Format, Compression, WriteFailureAction, SortOrder, SortByType
import streamsx.topology.composite

import streamsx.standard._version
__version__ = streamsx.standard._version.__version__


class DirectoryScan(streamsx.topology.composite.Source):
"""
Watches a directory, and generates file names on the output, one for each file that is found in the directory.
Example, scanning for files in application directory::
import streamsx.standard.files as files
from streamsx.topology.topology import Topology
dir = streamsx.spl.op.Expression.expression('getApplicationDir()+"'+'/etc"')
s = topo.source(files.DirectoryScan(directory=dir))
Attributes
----------
directory : str|Expression
Specifies the name of the directory to be scanned
pattern : str
Instructs the operator to ignore file names that do not match the regular expression pattern
schema : StreamSchema
Output schema, defaults to CommonSchema.String
options : kwargs
The additional optional parameters as variable keyword arguments.
"""

def __init__(self, directory, pattern=None, schema=CommonSchema.String, **options):
self.directory = directory
self.schema = schema
self.pattern = pattern
self.sleep_time = None
self.init_delay = None
self.sort_by = None
self.order = None
self.move_to_directory = None
self.ignore_dot_files = None
self.ignore_existing_files_at_startup = None

@property
def sleep_time(self):
"""
float: Specifies the minimal time between scans of the directory, in seconds. If this parameter is not specified, the default is 5.0 seconds.
"""
return self._sleep_time

@sleep_time.setter
def sleep_time(self, value):
self._sleep_time = value

@property
def init_delay(self):
"""
float: Specifies the number of seconds that the DirectoryScan operator delays before it starts to produce tuples.
"""
return self._init_delay

@init_delay.setter
def init_delay(self, value):
self._init_delay = value

@property
def sort_by(self):
"""
enum: Determines the order in which file names are generated during a single scan of the directory when there are multiple valid files at the same time. The valid values are date and name. If the sort_by parameter is not specified, the default sort order is set to date.
"""
return self._sort_by

@sort_by.setter
def sort_by(self, value):
self._sort_by = value

@property
def order(self):
"""
enum: Controls how the sortBy parameter sorts the files. The valid values are ascending and descending. If the order parameter is not specified, the default value is set to ascending.
"""
return self._order

@order.setter
def order(self, value):
self._order = value

@property
def move_to_directory(self):
"""
str: Specifies the name of the directory to which files are moved before the output tuple is generated.
"""
return self._move_to_directory

@move_to_directory.setter
def move_to_directory(self, value):
self._move_to_directory = value

@property
def ignore_dot_files(self):
"""
bool: Specifies whether the DirectoryScan operator ignores files with a leading period (.) in the directory. By default, the value is set to false and files with a leading period are processed.
"""
return self._ignore_dot_files

@ignore_dot_files.setter
def ignore_dot_files(self, value):
self._ignore_dot_files = value

@property
def ignore_existing_files_at_startup(self):
"""
bool: Specifies whether the DirectoryScan operator ignores pre-existing files in the directory. By default, the value is set to false and all files are processed as usual. If set to true, any files present in the directory are marked as already processed, and not submitted.
"""
return self._ignore_existing_files_at_startup

@ignore_existing_files_at_startup.setter
def ignore_existing_files_at_startup(self, value):
self._ignore_existing_files_at_startup = value


def populate(self, topology, name, **options):

if self.sleep_time is not None:
self.sleep_time = streamsx.spl.types.float64(self.sleep_time)
if self.init_delay is not None:
self.init_delay = streamsx.spl.types.float64(self.init_delay)
if self.ignore_existing_files_at_startup is not None:
if self.ignore_existing_files_at_startup is True:
self.ignore_existing_files_at_startup = streamsx.spl.op.Expression.expression('true')
if self.ignore_dot_files is not None:
if self.ignore_dot_files is True:
self.ignore_dot_files = streamsx.spl.op.Expression.expression('true')
if self.sort_by is not None:
self.sort_by = streamsx.spl.op.Expression.expression(self.sort_by)
if self.order is not None:
self.order = streamsx.spl.op.Expression.expression(self.order)

_op = _DirectoryScan(topology=topology, \
schema=self.schema, \
directory=self.directory, \
pattern=self.pattern, \
sleepTime=self.sleep_time, \
initDelay=self.init_delay, \
sortBy=self.sort_by, \
order=self.order, \
moveToDirectory=self.move_to_directory, \
ignoreDotFiles=self.ignore_dot_files, \
ignoreExistingFilesAtStartup=self.ignore_existing_files_at_startup, \
name=name)

return _op.stream


class FileSink(streamsx.topology.composite.ForEach):
"""
Write a stream to a file
Expand Down Expand Up @@ -47,11 +194,10 @@ class FileSink(streamsx.topology.composite.ForEach):
----------
file : str
Name of the output file.
options : dict
options : kwargs
The additional optional parameters as variable keyword arguments.
"""


def __init__(self, file, **options):
self.file = file
self.append = None
Expand Down Expand Up @@ -497,7 +643,7 @@ def csv_writer(stream, file, append=None, encoding=None, separator=None, flush=N


class _DirectoryScan(streamsx.spl.op.Source):
def __init__(self, topology, schema,directory, pattern=None, sleepTime=None, initDelay=None, sortBy=None, order=None, moveToDirectory=None, ignoreDotFiles=None, ignoreExistingFilesAtStartup=None, name=None):
def __init__(self, topology, schema, directory, pattern=None, sleepTime=None, initDelay=None, sortBy=None, order=None, moveToDirectory=None, ignoreDotFiles=None, ignoreExistingFilesAtStartup=None, name=None):
kind="spl.adapter::DirectoryScan"
inputs=None
schemas=schema
Expand Down
31 changes: 29 additions & 2 deletions streamsx/standard/tests/test_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import streamsx.standard.files as files
import streamsx.standard.utility as U
import streamsx.standard.relational as R
from streamsx.standard import CloseMode, Format, WriteFailureAction
from streamsx.standard import CloseMode, Format, Compression, WriteFailureAction, SortOrder, SortByType

from streamsx.topology.topology import Topology
from streamsx.topology.tester import Tester
Expand Down Expand Up @@ -132,7 +132,7 @@ def test_read_file_from_application_dir(self):

def test_filename_from_stream(self):
topo = Topology()
s = U.sequence(topo, iterations=5)
s = topo.source(U.Sequence(iterations=5))
F = U.SEQUENCE_SCHEMA.extend(StreamSchema('tuple<rstring filename>'))
fo = R.Functor.map(s, F)
fo.filename = fo.output(fo.outputs[0], '"myfile_{id}.txt"')
Expand Down Expand Up @@ -167,4 +167,31 @@ def test_filename_bad(self):
self.assertRaises(TypeError, files.csv_reader, topo, 'tuple<rstring a>', fn) # expects str or Expression for file


class TestDirScan(TestCase):
def setUp(self):
self.dir = tempfile.mkdtemp()
Tester.setup_standalone(self)

def tearDown(self):
shutil.rmtree(self.dir)

def test_dir_scan(self):
topo = Topology()
script_dir = os.path.dirname(os.path.realpath(__file__))
sample_file = os.path.join(script_dir, 'data.csv')
topo.add_file_dependency(sample_file, 'etc') # add sample file to etc dir in bundle
fn = os.path.join('etc', 'data.csv') # file name relative to application dir
dir = streamsx.spl.op.Expression.expression('getApplicationDir()+"'+'/etc"')
scanned = topo.source(files.DirectoryScan(directory=dir))
scanned.print()

#result = streamsx.topology.context.submit("TOOLKIT", topo.graph) # creates tk* directory
#print('(TOOLKIT):' + str(result))
#assert(result.return_code == 0)
result = streamsx.topology.context.submit("BUNDLE", topo.graph) # creates sab file
#print('(BUNDLE):' + str(result))
assert(result.return_code == 0)
os.remove(result.bundlePath)
os.remove(result.jobConfigPath)


14 changes: 7 additions & 7 deletions streamsx/standard/tests/test_relational.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def setUp(self):

def test_transform_two_outputs(self):
topo = Topology()
s = U.sequence(topo, iterations=10)
s = topo.source(U.Sequence(iterations=10))
fo = R.Functor.map(s, [StreamSchema('tuple<uint64 seq>'),StreamSchema('tuple<timestamp ts>')])
seq = fo.outputs[0]
ts = fo.outputs[1]
Expand All @@ -59,7 +59,7 @@ def test_transform_two_outputs(self):

def test_transform_filter(self):
topo = Topology()
s = U.sequence(topo, iterations=5)
s = topo.source(U.Sequence(iterations=5))
fo = R.Functor.map(s, StreamSchema('tuple<uint64 seq>'), filter='seq>=2ul')
r = fo.outputs[0]
r.print()
Expand All @@ -70,7 +70,7 @@ def test_transform_filter(self):

def test_transform_schema(self):
topo = Topology()
s = U.sequence(topo, iterations=10)
s = topo.source(U.Sequence(iterations=10))
A = U.SEQUENCE_SCHEMA.extend(StreamSchema('tuple<rstring a>'))
fo = R.Functor.map(s, A)
fo.a = fo.output(fo.outputs[0], '"string value"')
Expand All @@ -83,7 +83,7 @@ def test_transform_schema(self):

def test_transform_schema_two_outputs(self):
topo = Topology()
s = U.sequence(topo, iterations=2)
s = topo.source(U.Sequence(iterations=2))
fo = R.Functor.map(s, [StreamSchema('tuple<uint64 seq, rstring a>'),StreamSchema('tuple<timestamp ts, int32 b>')])
fo.a = fo.output(fo.outputs[0], '"string value"')
fo.b = fo.output(fo.outputs[1], 99)
Expand All @@ -105,7 +105,7 @@ def setUp(self):

def test_single_output(self):
topo = Topology()
s = U.sequence(topo, iterations=4)
s = topo.source(U.Sequence(iterations=4))
matches = R.Filter.matching(s, filter='seq<2ul')

tester = Tester(topo)
Expand All @@ -115,7 +115,7 @@ def test_single_output(self):

def test_non_matching_output(self):
topo = Topology()
s = U.sequence(topo, iterations=4)
s = topo.source(U.Sequence(iterations=4))
matches, non_matches = R.Filter.matching(s, filter='seq<2ul', non_matching=True)

tester = Tester(topo)
Expand All @@ -125,7 +125,7 @@ def test_non_matching_output(self):

def test_filter_none(self):
topo = Topology()
s = U.sequence(topo, iterations=4)
s = topo.source(U.Sequence(iterations=4))
matches = R.Filter.matching(s, filter=None)

tester = Tester(topo)
Expand Down
6 changes: 6 additions & 0 deletions streamsx/standard/tests/test_utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,12 @@ def test_deduplicate(self):
tester.contents(s, [1,2,4,5,2])
tester.test(self.test_ctxtype, self.test_config)

def test_deduplicate_param_check(self):
topo = Topology()
s = topo.source([1,2,1,4,5,2])
s = s.map(lambda v : {'a':v}, schema='tuple<int32 a>')
self.assertRaises(ValueError, s.map, U.Deduplicate(count=1, period=1))

def test_pair(self):
topo = Topology()
s = topo.source(U.Sequence(iterations=932))
Expand Down
7 changes: 4 additions & 3 deletions streamsx/standard/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ class Sequence(streamsx.topology.composite.Source):
from streamsx.topology.topology import Topology
import streamsx.standard.utility as U
seq = topo(U.Sequence(period=0.5), name='20Hz')
topo = Topology()
seq = topo.source(U.Sequence(period=0.5), name='20Hz')
"""
def __init__(self, period:float=None, iterations:int=None, delay:float=None):
Expand Down Expand Up @@ -139,11 +140,11 @@ class Throttle(streamsx.topology.composite.Map):
rate(float): Throttled rate of the returned stream in tuples/second.
precise(bool): Try to make the rate precise at the cost of increased overhead.
Example throttling a stream ``readings`` to around 10,000 tuples per second.
Example throttling a stream ``readings`` to around 10,000 tuples per second::
import streamsx.standard.utility as U
readings = readings.map(U.Throttle(rate=10000.0))
"""
def __init__(self, rate:float, precise:bool=False):
self.rate = rate
Expand Down

0 comments on commit cf74681

Please sign in to comment.