Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/pyDKB' into pyDKB-tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Evildoor committed Apr 24, 2019
2 parents c3e4810 + 83f9193 commit c81a096
Show file tree
Hide file tree
Showing 186 changed files with 1,047 additions and 84 deletions.
3 changes: 3 additions & 0 deletions Utils/Dataflow/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pyDKB.egg-info/
build/
dist/
15 changes: 15 additions & 0 deletions Utils/Dataflow/pyDKB-setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/usr/bin/env python

import setuptools
import os

package = 'pyDKB'
basedir = os.path.abspath(os.path.dirname(__file__))
with open(os.path.join(basedir, package, 'VERSION')) as version_file:
version = version_file.read().strip()

setuptools.setup(name=package,
version=version,
packages=setuptools.find_packages(),
package_data={'pyDKB.dataflow': ['dkbID.conf'],
'pyDKB': ['VERSION']})
1 change: 1 addition & 0 deletions Utils/Dataflow/pyDKB/VERSION
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
0.3-SNAPSHOT
7 changes: 6 additions & 1 deletion Utils/Dataflow/pyDKB/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@
import dataflow
import common

__version__ = "0.3-SNAPSHOT"
import os


basedir = os.path.dirname(__file__)
with open(os.path.join(basedir, 'VERSION')) as version_file:
__version__ = version_file.read().strip()

__all__ = ["dataflow"]
27 changes: 23 additions & 4 deletions Utils/Dataflow/pyDKB/common/custom_readline.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ def custom_readline(f, newline):
The last line can be incomplete, if the input data flow is interrupted
in the middle of data writing.
To check if iteration is not over without reading next value, one may
`send(True)` to the generator: it will return `True` if there is another
message to yield or raise `StopIteration` if nothing left.
Keyword arguments:
f -- file/stream to read
newline -- custom delimiter
Expand All @@ -20,14 +24,29 @@ def custom_readline(f, newline):
flags = fcntl.fcntl(f.fileno(), fcntl.F_GETFL)
fcntl.fcntl(f.fileno(), fcntl.F_SETFL, flags | os.O_NONBLOCK)
buf = ""
# Flag variable to say send() from next()
send_not_next = None
while True:
if poller.poll(500):
chunk = f.read()
if not chunk:
yield buf
if buf:
while send_not_next:
# If we are here, the source is not empty for sure:
# we have another message to yield
send_not_next = yield True
yield buf
break
buf += chunk
if send_not_next:
# We keep on reading the stream, so it is not closed yet
# and (in theory) may provide another message sooner or later
send_not_next = yield True
while newline in buf:
pos = buf.index(newline)
yield buf[:pos]
buf = buf[pos + len(newline):]
pos = buf.index(newline) + len(newline)
while send_not_next:
# If we are here, the source is not empty for sure:
# we have another message to yield
send_not_next = yield True
send_not_next = yield buf[:pos]
buf = buf[pos:]
12 changes: 12 additions & 0 deletions Utils/Dataflow/pyDKB/dataflow/communication/consumer/Consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,18 @@ def init_stream(self):
.setType(self.message_type) \
.build()

def stream_is_readable(self):
""" Check if input data stream is readable.
:returns: True -- stream is initialized and not empty,
False -- stream is empty,
None -- stream is not initialized
:rtype: bool, NoneType
"""
if not self._stream:
return None
return self._stream.is_readable()

def get_stream(self):
""" Get input stream linked to the current source.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@
class FileConsumer(Consumer.Consumer):
""" Data consumer implementation for HDFS data source. """

# Input file names (iterable object)
input_filenames = None

# Current file
current_file = None

Expand All @@ -34,38 +31,47 @@ def reconfigure(self, config={}):
if not config:
config = self.config

if not (config.get('input_files', None)
or config.get('input_dir', None)):
raise Consumer.ConsumerException("No input files specified.")

if not self.config.get('input_dir'):
self.config['input_dir'] = os.path.curdir
self.input_files = None

super(FileConsumer, self).reconfigure(config)

def source_is_empty(self):
""" Check if current source is empty.
def source_is_readable(self):
""" Check if current source is readable.
Return value:
True (empty)
False (not empty)
None (no source)
:returns: None -- no source,
False -- source is empty / fully read,
True -- source is defined and is not empty
:rtype: bool, NoneType
"""
f = self.current_file
if not f:
return None
fd = f['fd']
if not f.get('size'):
result = None
fd = self.current_file['fd'] if self.current_file else None
if self._stream and self._stream.get_fd() == fd:
result = self.stream_is_readable()
if fd and result is None:
# check file directly only when there's no stream bound to it
stat = os.fstat(fd.fileno())
f['size'] = stat.st_size
return fd.tell() == f['size']
result = fd.tell() != stat.st_size
return result

def get_source_info(self):
""" Return current source info. """
return self.current_file

def init_sources(self):
""" Initialize sources iterator if not initialized yet. """
if not self.input_files:
self.input_files = self._input_files()

def get_source(self):
""" Get nearest non-empty source (current or next). """
if self.source_is_empty() is not False:
result = self.next_source()
else:
result = None
if self.source_is_readable() or self.next_source():
result = self.current_file['fd']
return result

Expand All @@ -77,7 +83,7 @@ def next_source(self):
None (no files left)
"""
if not self.input_files:
self.input_files = self._input_files()
self.init_sources()
try:
self.current_file = self.input_files.next()
result = self.get_source()
Expand Down Expand Up @@ -108,6 +114,8 @@ def _filenames_from_dir(self, dirname):
ext = Message(self.message_type).extension()
try:
dir_content = os.listdir(dirname)
# Make files order predictable
dir_content.sort()
for f in dir_content:
if os.path.isfile(os.path.join(dirname, f)) \
and f.lower().endswith(ext):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ def _filenames_from_dir(self, dirname):
"""
try:
files = hdfs.listdir(dirname, "f")
# Make files order predictable
files.sort()
except HDFSException, err:
raise Consumer.ConsumerException(err)
return files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ def reconfigure(self, config={}):
""" (Re)configure Stream consumer. """
self.fd = sys.stdin
super(StreamConsumer, self).reconfigure(config)
if self.config['eom'] == '':
raise Consumer.ConsumerException("Empty EOM is not allowed "
"for stream input.")

def get_source_info(self):
""" Return current source info. """
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from HDFSConsumer import HDFSConsumer
from StreamConsumer import StreamConsumer

from Consumer import ConsumerException

__all__ = ['ConsumerBuilder']


Expand All @@ -28,10 +30,12 @@ def __init__(self, config={}):

if config.get('hdfs'):
self.setSource('h')
elif config.get('source'):
self.setSource(config.get('source'))
elif config.get('mode') in ('s', 'm'):
self.setSource('s')
else:
self.setSource(config.get('source'))
self.setSource('f')

def setSource(self, source):
""" Set data source for the consumer. """
Expand All @@ -46,6 +50,7 @@ def setSource(self, source):
% (sources.keys(), source))

self.consumerClass = sources[source]
self.config['source'] = source
return self

def setType(self, Type):
Expand Down
2 changes: 1 addition & 1 deletion Utils/Dataflow/pyDKB/dataflow/communication/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def content(self):

@classmethod
def extension(cls):
""" Return file extension corresponding this message type. """
""" Return file extension corresponding to this message type. """
return cls._ext


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from HDFSProducer import HDFSProducer
from StreamProducer import StreamProducer

from Producer import ProducerException

__all__ = ['ProducerBuilder']


Expand All @@ -30,10 +32,12 @@ def __init__(self, config={}):

if config.get('hdfs'):
self.setDest('h')
elif config.get('dest'):
self.setDest(config.get('dest'))
elif config.get('mode') in ('s', 'm'):
self.setDest('s')
else:
self.setDest(config.get('dest'))
self.setDest('f')

def setDest(self, dest):
""" Set data destination for the producer. """
Expand All @@ -48,6 +52,7 @@ def setDest(self, dest):
% (dests.keys(), dest))

self.producerClass = dests[dest]
self.config['dest'] = dest
return self

def setType(self, Type):
Expand Down
91 changes: 83 additions & 8 deletions Utils/Dataflow/pyDKB/dataflow/communication/stream/InputStream.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
from . import Message
from pyDKB.common import custom_readline

import os
import sys


class InputStream(Stream):
""" Implementation of the input stream. """
Expand All @@ -24,21 +27,82 @@ def _reset_iterator(self):
fd = self.get_fd()
if self.EOM == '\n':
self.__iterator = iter(fd.readline, "")
self.is_readable = self._fd_is_readable
elif self.EOM == '':
self.__iterator = iter([fd.read()])
self.__iterator = iter(fd.read, "")
self.is_readable = self._fd_is_readable
else:
self.__iterator = custom_readline(fd, self.EOM)
self.is_readable = self._gi_is_readable

def reset(self, fd, close=True):
def reset(self, fd, close=True, force=False):
""" Reset current stream with new file descriptor.
Overrides parent method to reset __iterator property.
"""
super(InputStream, self).reset(fd, close)
# Not _reset_iterator(), as we are not sure someone
# will ask for new messages -- then why read the whole file
# in advance (if EOM appears to be '')?
self.__iterator = None
old_fd = super(InputStream, self).reset(fd, close)
# We do not want to reset iterator if `reset()` was called
# with the same `fd` as before.
if force or (old_fd and fd != old_fd):
self._reset_iterator()
return old_fd

def is_readable(self):
""" Check if current input stream is readable.
:returns: None -- not initialized,
False -- empty,
True -- not empty
:rtype: bool, NoneType
"""
return self._unknown_is_readable()

def _unknown_is_readable(self):
""" Placeholder: readability test for not initialized stream.
This function is needed in case that we need to reset `is_readable`
and the whole Stream object back to the "undefined" state.
:returns: None
:rtype: NoneType
"""
return None

def _fd_is_readable(self):
""" Check if bound file descriptor is readable.
:returns: None -- not initialized,
False -- empty,
True -- not empty
:rtype: bool, NoneType
"""
fd = self.get_fd()
if not fd:
result = None
elif getattr(fd, 'closed', True):
result = False
elif fd.fileno() == sys.stdin.fileno():
result = True
else:
stat = os.fstat(fd.fileno())
result = fd.tell() != stat.st_size
return result

def _gi_is_readable(self):
""" Check if the generator iterator can return value on `next()` call.
:returns: False -- empty,
True -- not empty
:rtype: bool, NoneType
"""
try:
return self.__iterator.send(True)
except StopIteration:
return False
except TypeError:
# If method 'next()' was never called yet,
# sending anything but None raises TypeError
return self._fd_is_readable()

def parse_message(self, message):
""" Verify and parse input message.
Expand Down Expand Up @@ -80,4 +144,15 @@ def next(self):
if not self.__iterator:
self._reset_iterator()
msg = self.__iterator.next()
return self.parse_message(msg)
if not msg.endswith(self.EOM):
log_msg = msg[:10] + '<...>' * (len(msg) > 20)
log_msg += msg[-min(len(msg) - 10, 10):]
log_msg = log_msg.replace('\n', r'\n')
self.log("Unexpected end of stream, skipping rest of input:\n"
"'%s'" % log_msg, logLevel.WARN)
return False
else:
if self.EOM != '':
msg = msg[:-len(self.EOM)]
result = self.parse_message(msg)
return result
Loading

0 comments on commit c81a096

Please sign in to comment.