Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Re-arranging and added a bunch of pipes

  • Loading branch information...
commit fd92ca2e556cfaf7eecfe30f2c161710349521b6 1 parent eca767a
@a-musing-moose authored
View
4 README.md
@@ -1,4 +0,0 @@
-pious
-=====
-
-A python package for dealing with basic input/output processes
View
5 README.rst
@@ -0,0 +1,5 @@
+=====
+Pious
+=====
+
+A python package for dealing with basic input/output processes
View
BIN  docs/Pious.pdf
Binary file not shown
View
29 docs/Pious.rst
@@ -0,0 +1,29 @@
+=====
+PIOUS
+=====
+
+.. contents::
+
+.. section-numbering::
+
+.. page::
+
+Introduction
+============
+
+A python package for dealing with basic input/output processes. Pious came from
+the observation that the processing data almost always boils down to a series
+of discreet transforms. Splitting these transforms out into reusable modules is
+not only a useful way to build data processing pipelines but also provide a
+convenient way to design and describe the process to others.
+
+Pious aims to provide the tools needed to build complex data transformation
+pipeline in a simple and easily understood manner. It also try to provide the
+tools necessary to inspect data as it passes through the pipeline to make
+debugging processes as easy as possible.
+
+.. page::
+
+.. include:: pious_sources.rst
+.. include:: pious_pipes.rst
+.. include:: pious_consumers.rst
View
4 docs/build.sh
@@ -0,0 +1,4 @@
+#!/bin/sh
+rst2pdf Pious.rst -e preprocess
+rm Pious.rst.build_temp
+rm *~
View
22 docs/pious_consumers.rst
@@ -0,0 +1,22 @@
+==============
+Pious Consumer
+==============
+
+Consumers are the termination points of pipelines. They consume the final
+output of the pipeline. For example they may write to a file in various formats
+or insert/replace into a database table or tables.
+
+Existing Consumers
+==================
+
+Consumer
+ The base class for all Pious consumers. This is the class from which your
+ own custom consumers should derive.
+
+DbTable
+ Inserts/Replaces values into a table. Also has options for truncating
+ the table prior to the insert etc.
+
+CsvFile
+ Writes the data to a CSV file with optional headers and configurable
+ configurable separators and escape characters
View
31 docs/pious_pipes.rst
@@ -0,0 +1,31 @@
+===========
+Pious Pipes
+===========
+
+Pipes are similar to data sources in that they act as iterators. The different
+is that they take a datasource (or another pipe) as an input. Pipes therefore
+are the building blocks of your transformation pipeline.
+
+Pious provides a goodly collection of pipe with which to build your pipelines
+providing filters, field transformers along with more complex pipes like
+fork which takes a pipeline and splits it into 2 or multiple parallel pipes.
+
+Existing Pipes
+==============
+
+Pipe
+ The base pipe class that all others extend. This should be the basis of
+ your own pipe implementations
+
+Ensure
+ Ensure that the dict passing through has certain keys and that if they are
+ not present then we set a default value
+
+Filter
+ Skips items that match certain criteria
+
+Fork
+ Splits the pipe into 2 parallel pipes - i.e. the output of the bound
+ iterator is fed into the input of 2 pipelines.
+
+
View
32 docs/pious_sources.rst
@@ -0,0 +1,32 @@
+==================
+Pious Data Sources
+==================
+
+At the base of all Pious transform pipelines are data sources, which as the
+name would imply are sources of the raw data you wish to transform.
+
+Pious provides a number of data source primatives along with the provisions to
+build upon those primatives to fit your own requires.
+
+Data Source Primitives
+======================
+
+Source
+ The base class for all data sources. This is the class from which you
+ should extend you own custom data sources.
+
+CSVFile
+ A Character separated values file. You can specify the separator, escape character etc.
+
+XMLFile
+ The build blocks of a SAX based XML file parser
+
+FlatXMLFile
+ This extends the XMLFile primitive to provide a simple flat xml importer.
+ By flat XML I mean one that bascially imitates a CSV, it doesn't use
+ attributes, just flat un-nested elements.
+
+DbIterator
+ Iterators the results of an SQL query
+
+
View
0  pious/__init__.py
No changes.
View
0  pious/models.py
No changes.
View
0  pious/process/__init__.py
No changes.
View
0  pious/transform/__init__.py
No changes.
View
0  pious/transform/consumers.py
No changes.
View
3  pious/transform/errors.py
@@ -0,0 +1,3 @@
+
+class InvalidIterator(Exception):
+ pass
View
17 pious/transform/log.py
@@ -0,0 +1,17 @@
+class Logger():
+
+ template = "Watch item encountered:\n\tBefore: %s\n\tAfter: %s"
+
+ def log_transform(self, data_in, data_out):
+ self._emit(self._get_message(data_in, data_out))
+
+ def _get_message(self, data_in, data_out):
+ return self.template % (data_in, data_out)
+
+ def _emit(self, msg):
+ pass
+
+class ConsoleLogger(Logger):
+
+ def _emit(self, msg):
+ print(msg)
View
136 pious/transform/pipes.py
@@ -0,0 +1,136 @@
+from pious.transform.errors import InvalidIterator
+
+class Pipe(object):
+ """
+ The base object that should be extended by your own
+ pipes
+ """
+
+ def __init__(self):
+ self.watches = []
+ self.logger = None
+
+ def bind(self, iterator):
+ """
+ Binds an iterator (anything that provides __iter__)
+ to this pipe
+ """
+ if not getattr(iterator, '__iter__'):
+ raise InvalidIterator()
+ self.iterator = iterator.__iter__()
+
+ def set_logger(self, logger):
+ """
+
+ """
+ self._logger = logger
+
+ def add_watch(self, watch):
+ self.watches.append(watch)
+
+ def _apply(self, data):
+ """
+ This is the method called to perform the
+ translation and should therefore be overridden
+ in derived classes
+ """
+ return data
+
+ def _is_watched(self, data):
+ for watch in self.watches:
+ if watch(data):
+ return True
+ return False
+
+ def _log_watched(self, data_in, data_out):
+ self.logger.log_transform(data_in, data_out)
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ data_in = self.iterator.next()
+ data_out = self._apply(data_in)
+ if self._is_watched(data_in) or self._is_watched(data_out):
+ self._log_watched(data_in, data_out)
+ return data_out
+
+
+class Ensure(Pipe):
+ """
+ Ensure that the dict passing through has certain keys and that if they
+ are not present then we set a default value
+ """
+ def __init__(self, map):
+ super(Ensure, self).__init__()
+ self.map = map
+
+ def _apply(self, data):
+ return dict(self.map.items() + data.items())
+
+
+class Filter(Pipe):
+ """
+ Skips items that the passed in matcher
+ tells it to
+
+ matcher should be a function that returns
+ True if the items should be filtered out
+ """
+ def __init__(self, matcher):
+ self.matcher = matcher
+ super(Filter, self).__init__()
+
+ def next(self):
+ data_out = super(Filter, self).next()
+ while self.matcher(data_out):
+ data_out = super(Filter, self).next()
+ return data_out
+
+class Rename(Pipe):
+ """
+ Renames keys based on the passed in key_map
+
+ The key_map should be in the form:
+ {'old key name': 'new key name',}
+ """
+ def __init__(self, key_map):
+ self.key_map = key_map
+ super(Rename, self).__init__()
+
+ def _apply(self, data):
+ for key in self.key_map:
+ if key in data:
+ data[self.key_map[key]] = data[key]
+ del data[key]
+ return data
+
+class Winnow(Pipe):
+ """
+ Removes unwanted keys from the data passing through
+ """
+ def __init__(self, keys):
+ self.keys = keys
+ super(Winnow, self).__init__()
+
+ def _apply(self, data):
+ for key in self.keys:
+ if key in data:
+ del data[key]
+ return data
+
+class AutoIncrement(Pipe):
+ """
+ Added a unique (for this pipe) auto-incrementing
+ number to to the specified key
+ """
+ def __init__(self, key, start_value = 0, interval = 1):
+ super(AutoIncrement, self).__init__()
+ self.key = key
+ self.counter_value = start_value
+ self.interval = interval
+
+ def _apply(self, data):
+ self.counter_value += self.interval
+ data[self.key] = self.counter_value
+ return data
View
0  pious/transform/sources.py
No changes.
View
30 setup.py
@@ -0,0 +1,30 @@
+#!/usr/bin/env python
+"""
+Installation script:
+
+To release a new version to PyPi:
+- Run: python setup.py sdist upload
+"""
+
+from setuptools import setup, find_packages
+
+setup(name='pious',
+ version='0.0.1',
+ url='https://github.com/a-musing-moose/pious',
+ author="Jonathan Moss",
+ author_email="jonathan.moss@tangentsnowball.com.au",
+ description="A python package for dealing with basic input/output processes",
+ long_description=open('README.rst').read(),
+ keywords="Data, Processing, Pipelines",
+ license='BSD',
+ platforms=['linux'],
+ packages=find_packages(exclude=["*.tests"]),
+ include_package_data = True,
+ install_requires=[],
+ # See http://pypi.python.org/pypi?%3Aaction=list_classifiers
+ classifiers=['Environment :: Console',
+ 'Intended Audience :: Developers',
+ 'License :: OSI Approved :: BSD License',
+ 'Operating System :: Unix',
+ 'Programming Language :: Python']
+ )
View
0  tests/__init__.py
No changes.
View
17 tests/log_tests.py
@@ -0,0 +1,17 @@
+from unittest import TestCase
+
+from pious.transform.log import ConsoleLogger
+
+class ConsoleLoggerTest(TestCase):
+
+ def test_has_log_transform(self):
+ l = ConsoleLogger()
+ try:
+ getattr(l, 'log_transform');
+ except AttributeError:
+ self.fail("Console logger is missing a log_transform method")
+
+ i = { 'a': 'b' }
+ o = { 'a': 'c'}
+ l.log_transform(i, o)
+
View
163 tests/pipe_tests.py
@@ -0,0 +1,163 @@
+from unittest import TestCase
+
+from pious.transform.pipes import Pipe, Ensure, Rename, Winnow, AutoIncrement
+
+class PipeTest(TestCase):
+
+ def test_bind_accepts_iterator(self):
+ a = [1,2,3]
+ p = Pipe()
+ try:
+ p.bind(a)
+ except Exception:
+ self.fail("pipe.bind() raised an unexpectedly exception!")
+
+ def test_bind_pukes_if_not_passed_an_iterator(self):
+ p = Pipe()
+ self.assertRaises(Exception, p, {})
+
+class EnsureTest(TestCase):
+
+ def test_sets_undefined_key(self):
+ a = [
+ {'a': 1, 'b': 1}
+ ]
+ p = Ensure({'c': 2})
+ p.bind(a)
+ for i in p:
+ self.assertIn('c', i)
+
+ def test_does_not_overwrite_existing_key(self):
+ a = [
+ {'a': 1, 'b': 1}
+ ]
+ p = Ensure({'a': 2})
+ p.bind(a)
+ for i in p:
+ self.assertIn('a', i)
+ self.assertEquals(i['a'], 1)
+
+class RenameTest(TestCase):
+
+ def test_renames_single_key(self):
+ test_data = [
+ {'old_key': 'a_value'},
+ {'old_key': 'a_value'}
+ ]
+ key_map = {
+ 'old_key':'new_key'
+ }
+ p = Rename(key_map)
+ p.bind(test_data)
+ for i in p:
+ self.assertIn('new_key', i)
+ self.assertEquals(i['new_key'], 'a_value')
+ self.assertNotIn('old_key', i)
+
+ def test_renames_multiple_keys(self):
+ test_data = [
+ {
+ 'old_key1': 'a_value1',
+ 'old_key2': 'a_value2'
+ },
+ {
+ 'old_key1': 'a_value1',
+ 'old_key2': 'a_value2'
+ },
+ ]
+ key_map = {
+ 'old_key1':'new_key1',
+ 'old_key2':'new_key2'
+ }
+ p = Rename(key_map)
+ p.bind(test_data)
+ for i in p:
+ self.assertIn('new_key1', i)
+ self.assertEquals(i['new_key1'], 'a_value1')
+ self.assertNotIn('old_key1', i)
+ self.assertIn('new_key2', i)
+ self.assertEquals(i['new_key2'], 'a_value2')
+ self.assertNotIn('old_key2', i)
+
+
+class WinnowTest(TestCase):
+
+ def test_single_key_removed(self):
+ test_data = [
+ {'a_key': 'a_value'},
+ ]
+ keys = [
+ 'a_key'
+ ]
+ p = Winnow(keys)
+ p.bind(test_data)
+ for i in p:
+ self.assertNotIn('a_key', i)
+
+ def test_remaining_key_unaffected(self):
+ test_data = [
+ {'a_key': 'a_value', 'a_n_other_key': 'a_n_other_value'},
+ ]
+ keys = [
+ 'a_key'
+ ]
+ p = Winnow(keys)
+ p.bind(test_data)
+ for i in p:
+ self.assertNotIn('a_key', i)
+ self.assertIn('a_n_other_key', i)
+ self.assertEquals(i['a_n_other_key'], 'a_n_other_value')
+
+
+class AutoIncrementTest(TestCase):
+
+ def test_key_added(self):
+ test_data = [
+ {'a': 'b',},
+ {'a': 'b',}
+ ]
+ p = AutoIncrement('counter')
+ p.bind(test_data)
+ for i in p:
+ self.assertIn('counter', i)
+
+ def test_key_increments(self):
+ test_data = [
+ {'a': 'b',},
+ {'a': 'b',}
+ ]
+ p = AutoIncrement('counter')
+ p.bind(test_data)
+ c = 0
+ for i in p:
+ c += 1
+ self.assertIn('counter', i)
+ self.assertEquals(i['counter'], c)
+
+ def test_key_increment_interval(self):
+ test_data = [
+ {'a': 'b',},
+ {'a': 'b',}
+ ]
+ p = AutoIncrement('counter', interval=2)
+ p.bind(test_data)
+ c = 0
+ for i in p:
+ c += 2
+ self.assertIn('counter', i)
+ self.assertEquals(i['counter'], c)
+
+ def test_key_increment_start(self):
+ test_data = [
+ {'a': 'b',},
+ {'a': 'b',}
+ ]
+ p = AutoIncrement('counter', start_value=2)
+ p.bind(test_data)
+ c = 2
+ for i in p:
+ c += 1
+ self.assertIn('counter', i)
+ self.assertEquals(i['counter'], c)
+
+
Please sign in to comment.
Something went wrong with that request. Please try again.