[docs]defmovefile(fname,dest):
+ """ Move local file to HDFS. """
+ ifos.path.exists(fname):
+ putfile(fname,dest)
+ try:
+ os.remove(fname)
+ exceptOSError,err:
+ sys.stderr.write("(WARN) Failed to remove local copy of HDFS file"
+ " (%s): %s"%(fname,err))
+
+
[docs]defgetfile(fname):""" Download file from HDFS. Return value: file name (without directory) """cmd=["hadoop","fs","-get",fname]
- name=os.path.basename(fname)
+ name=basename(fname)try:proc=subprocess.Popen(cmd,stdin=subprocess.PIPE,
@@ -124,6 +137,32 @@
Source code for pyDKB.common.hdfs
returnname
+
[docs]defFile(fname):
+ """ Get and open temporary local copy of HDFS file
+
+ Return value: open file object (TemporaryFile).
+ """
+ cmd=["hadoop","fs","-cat",fname]
+ tmp_file=tempfile.TemporaryFile()
+ try:
+ proc=subprocess.Popen(cmd,
+ stdin=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ stdout=tmp_file)
+ check_stderr(proc)
+ tmp_file.seek(0)
+ except(subprocess.CalledProcessError,OSError),err:
+ ifisinstance(err,subprocess.CalledProcessError):
+ err.cmd=' '.join(cmd)
+ tmp_file.close()
+ raiseHDFSException("Failed to get file from HDFS: %s\n"
+ "Error message: %s\n"%(fname,err))
+ iftmp_file.closed:
+ returnNone
+
+ returntmp_file
+
+
[docs]deflistdir(dirname,mode='a'):""" List files and/or subdirectories of HDFS directory.
@@ -172,7 +211,7 @@
Source code for pyDKB.common.hdfs
# We need to return only the name of the file or subdirfilename=line[7]
- filename=os.path.basename(filename)
+ filename=basename(filename)ifline[0][0]=='d':subdirs.append(filename)elifline[0][0]=='-':
@@ -186,6 +225,29 @@
Source code for pyDKB.common.hdfs
result=subdirsreturnresult
+
+
+
[docs]defbasename(path):
+ """ Return file name without path. """
+ ifpathisNone:
+ path=''
+ returnpath.basename(path).strip()
"""
-Utils to work with JSON (dict) objects.
+Utils to work with JSON objects.
+
+In context of Python, JSON [#]_ objects may be considered as structures
+consisting of six types of elements:
+
+- dictionaries,
+- lists,
+- strings,
+- numbers,
+- True/False,
+- Null.
+
+DKB project uses JSON for storing various information and transferring it
+between stages. This module contains functions which simplify some aspects
+of retrieving data from JSON objects.
+
+.. [#] https://www.json.org/
+
"""
[docs]defvalueByKey(json_data,key):""" Return value by a chain (list) of nested keys.
- Parameters:
- DICT json_data -- to search in
- STRING key -- dot-separated list of nested keys
+ It is common for JSON objects to contain many layers of dictionaries
+ nested in other dictionaries -- this function extracts the data from
+ such constructions according to given string or list with keys.
+ String should be in the format intended for nestedKeys() - nested
+ keys separated by dots.
+
+ :param json_data: to search in
+ :type json_data: dict
+ :param key: nested keys
+ :type key: str, list
+
+ :return: value (None if failed)
+ :rtype: depends on value, NoneType """nested_keys=nestedKeys(key)val=json_data
@@ -62,12 +89,19 @@
Source code for pyDKB.common.json_utils
[docs]defnestedKeys(key):
- """ Transform STRING with nested keys into LIST.
+ """ Transform string with nested keys into list.
+
+ String should contain keys separated by dot. If a key contains
+ dot itself, the key must be put between matching quotation marks.
+ Quotation marks inside the keys (not preceding or following a dot)
+ are treated as ordinary symbols. If a list is given instead of str,
+ it is returned without changes.
+
+ :param key: nested keys
+ :type key: str, list
- Parameters:
- STRING key -- dot-separated list of nested keys.
- If a key contains dot itself, the key must be put between
- quotation marks.
+ :return: nested keys
+ :rtype: list """iftype(key)==list:returnkey
diff --git a/Docs/build/html/_modules/pyDKB/dataflow/cds.html b/Docs/build/html/_modules/pyDKB/dataflow/cds.html
new file mode 100644
index 000000000..115a81c2d
--- /dev/null
+++ b/Docs/build/html/_modules/pyDKB/dataflow/cds.html
@@ -0,0 +1,205 @@
+
+
+
+
+
+
+
+ pyDKB.dataflow.cds — Data Knowledge Base documentation
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
[docs]defget_source_info(self):
+ """ Return current source info. """
+ raiseNotImplementedError
+
+
[docs]defget_message(self):
+ """ Get new message from current source.
+
+ Return values:
+ Message object
+ False (failed to parse message)
+ None (all input sources are empty)
+ """
+ s=self.get_stream()
+ ifnots:
+ msg=None
+ else:
+ msg=s.get_message()
+ returnmsg
+
+
[docs]defnext(self):
+ """ Return new Message, read from input stream. """
+ msg=self.get_message()
+ ifmsgisNone:
+ raiseStopIteration
+ returnmsg
+
+
[docs]defclose(self):
+ """ Close opened data stream and data source. """
+ forsin(self.get_stream(),self.get_source()):
+ ifsandnotgetattr(s,'closed',True):
+ s.close()
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/Docs/build/html/_modules/pyDKB/dataflow/communication/consumer/StreamConsumer.html b/Docs/build/html/_modules/pyDKB/dataflow/communication/consumer/StreamConsumer.html
new file mode 100644
index 000000000..d17cff674
--- /dev/null
+++ b/Docs/build/html/_modules/pyDKB/dataflow/communication/consumer/StreamConsumer.html
@@ -0,0 +1,140 @@
+
+
+
+
+
+
+
+ pyDKB.dataflow.communication.consumer.StreamConsumer — Data Knowledge Base documentation
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
Source code for pyDKB.dataflow.communication.consumer.StreamConsumer
+"""
+pyDKB.dataflow.communication.consumer.StreamConsumer
+
+Data consumer implementation for a single stream.
+
+TODO: think about multiple streams (like a number of named
+ pipes, etc). Prehaps, even merge this class with FileConsumer.
+"""
+
+importsys
+importos
+
+importConsumer
+from.importDataflowException
+from.importlogLevel
+
+
+
[docs]classStreamConsumer(Consumer.Consumer):
+ """ Data consumer implementation for Stream data source. """
+
+ fd=None
+
+ # Override
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/Docs/build/html/_modules/pyDKB/dataflow/messages.html b/Docs/build/html/_modules/pyDKB/dataflow/communication/messages.html
similarity index 74%
rename from Docs/build/html/_modules/pyDKB/dataflow/messages.html
rename to Docs/build/html/_modules/pyDKB/dataflow/communication/messages.html
index d1d02e31e..6f6aa449d 100644
--- a/Docs/build/html/_modules/pyDKB/dataflow/messages.html
+++ b/Docs/build/html/_modules/pyDKB/dataflow/communication/messages.html
@@ -6,17 +6,17 @@
- pyDKB.dataflow.messages — Data Knowledge Base documentation
-
-
-
-
-
-
-
-
+ pyDKB.dataflow.communication.messages — Data Knowledge Base documentation
+
+
+
+
+
+
+
+
-
+
@@ -31,13 +31,15 @@
-
Source code for pyDKB.dataflow.messages
+
Source code for pyDKB.dataflow.communication.messages
"""
+pyDKB.dataflow.communication.messages
+
Definition of abstract message class and specific message classes"""from.importmessageType
-frompyDKB.dataflow.typesimportcodeType
+from.importcodeTypeimportjsonimportsys
@@ -45,7 +47,7 @@
[docs]classDecodeUnknownType(NotImplementedError):""" Exception to be thrown when message type is not decodable. """def__init__(self,code,cls):message="%s can`t be decoded from %s" \
@@ -53,7 +55,7 @@
[docs]classEncodeUnknownType(NotImplementedError):""" Exception to be thrown when message type is not encodable. """def__init__(self,code,cls):message="%s can`t be encoded into %s" \
@@ -61,7 +63,7 @@
[docs]defMessage(msg_type):""" Return class XXXMessage, where XXX is the passed type. """ifnotmessageType.hasMember(msg_type):raiseValueError("Message type must be a member of messageType")
@@ -75,7 +77,7 @@
[docs]defencode(self,code):""" Encode original message from TYPE-specific format to CODE. Raises ValueError """raiseEncodeUnknownType(code,self.__class__)