diff --git a/README.md b/README.md deleted file mode 100644 index 91f6395..0000000 --- a/README.md +++ /dev/null @@ -1,4 +0,0 @@ -pious -===== - -A python package for dealing with basic input/output processes \ No newline at end of file diff --git a/README.rst b/README.rst new file mode 100644 index 0000000..a635667 --- /dev/null +++ b/README.rst @@ -0,0 +1,5 @@ +===== +Pious +===== + +A python package for dealing with basic input/output processes diff --git a/docs/Pious.pdf b/docs/Pious.pdf new file mode 100644 index 0000000..de3b628 Binary files /dev/null and b/docs/Pious.pdf differ diff --git a/docs/Pious.rst b/docs/Pious.rst new file mode 100644 index 0000000..4ab1c33 --- /dev/null +++ b/docs/Pious.rst @@ -0,0 +1,29 @@ +===== +PIOUS +===== + +.. contents:: + +.. section-numbering:: + +.. page:: + +Introduction +============ + +A python package for dealing with basic input/output processes. Pious came from +the observation that the processing data almost always boils down to a series +of discreet transforms. Splitting these transforms out into reusable modules is +not only a useful way to build data processing pipelines but also provide a +convenient way to design and describe the process to others. + +Pious aims to provide the tools needed to build complex data transformation +pipeline in a simple and easily understood manner. It also try to provide the +tools necessary to inspect data as it passes through the pipeline to make +debugging processes as easy as possible. + +.. page:: + +.. include:: pious_sources.rst +.. include:: pious_pipes.rst +.. include:: pious_consumers.rst diff --git a/docs/build.sh b/docs/build.sh new file mode 100755 index 0000000..fde3948 --- /dev/null +++ b/docs/build.sh @@ -0,0 +1,4 @@ +#!/bin/sh +rst2pdf Pious.rst -e preprocess +rm Pious.rst.build_temp +rm *~ diff --git a/docs/pious_consumers.rst b/docs/pious_consumers.rst new file mode 100644 index 0000000..27c814c --- /dev/null +++ b/docs/pious_consumers.rst @@ -0,0 +1,22 @@ +============== +Pious Consumer +============== + +Consumers are the termination points of pipelines. They consume the final +output of the pipeline. For example they may write to a file in various formats +or insert/replace into a database table or tables. + +Existing Consumers +================== + +Consumer + The base class for all Pious consumers. This is the class from which your + own custom consumers should derive. + +DbTable + Inserts/Replaces values into a table. Also has options for truncating + the table prior to the insert etc. + +CsvFile + Writes the data to a CSV file with optional headers and configurable + configurable separators and escape characters diff --git a/docs/pious_pipes.rst b/docs/pious_pipes.rst new file mode 100644 index 0000000..9b0de7a --- /dev/null +++ b/docs/pious_pipes.rst @@ -0,0 +1,31 @@ +=========== +Pious Pipes +=========== + +Pipes are similar to data sources in that they act as iterators. The different +is that they take a datasource (or another pipe) as an input. Pipes therefore +are the building blocks of your transformation pipeline. + +Pious provides a goodly collection of pipe with which to build your pipelines +providing filters, field transformers along with more complex pipes like +fork which takes a pipeline and splits it into 2 or multiple parallel pipes. + +Existing Pipes +============== + +Pipe + The base pipe class that all others extend. This should be the basis of + your own pipe implementations + +Ensure + Ensure that the dict passing through has certain keys and that if they are + not present then we set a default value + +Filter + Skips items that match certain criteria + +Fork + Splits the pipe into 2 parallel pipes - i.e. the output of the bound + iterator is fed into the input of 2 pipelines. + + diff --git a/docs/pious_sources.rst b/docs/pious_sources.rst new file mode 100644 index 0000000..0996a31 --- /dev/null +++ b/docs/pious_sources.rst @@ -0,0 +1,32 @@ +================== +Pious Data Sources +================== + +At the base of all Pious transform pipelines are data sources, which as the +name would imply are sources of the raw data you wish to transform. + +Pious provides a number of data source primatives along with the provisions to +build upon those primatives to fit your own requires. + +Data Source Primitives +====================== + +Source + The base class for all data sources. This is the class from which you + should extend you own custom data sources. + +CSVFile + A Character separated values file. You can specify the separator, escape character etc. + +XMLFile + The build blocks of a SAX based XML file parser + +FlatXMLFile + This extends the XMLFile primitive to provide a simple flat xml importer. + By flat XML I mean one that bascially imitates a CSV, it doesn't use + attributes, just flat un-nested elements. + +DbIterator + Iterators the results of an SQL query + + diff --git a/pious/__init__.py b/pious/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pious/models.py b/pious/models.py new file mode 100644 index 0000000..e69de29 diff --git a/pious/process/__init__.py b/pious/process/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pious/transform/__init__.py b/pious/transform/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pious/transform/consumers.py b/pious/transform/consumers.py new file mode 100644 index 0000000..e69de29 diff --git a/pious/transform/errors.py b/pious/transform/errors.py new file mode 100644 index 0000000..47b22ee --- /dev/null +++ b/pious/transform/errors.py @@ -0,0 +1,3 @@ + +class InvalidIterator(Exception): + pass diff --git a/pious/transform/log.py b/pious/transform/log.py new file mode 100644 index 0000000..80a34df --- /dev/null +++ b/pious/transform/log.py @@ -0,0 +1,17 @@ +class Logger(): + + template = "Watch item encountered:\n\tBefore: %s\n\tAfter: %s" + + def log_transform(self, data_in, data_out): + self._emit(self._get_message(data_in, data_out)) + + def _get_message(self, data_in, data_out): + return self.template % (data_in, data_out) + + def _emit(self, msg): + pass + +class ConsoleLogger(Logger): + + def _emit(self, msg): + print(msg) \ No newline at end of file diff --git a/pious/transform/pipes.py b/pious/transform/pipes.py new file mode 100644 index 0000000..dfd5d0e --- /dev/null +++ b/pious/transform/pipes.py @@ -0,0 +1,136 @@ +from pious.transform.errors import InvalidIterator + +class Pipe(object): + """ + The base object that should be extended by your own + pipes + """ + + def __init__(self): + self.watches = [] + self.logger = None + + def bind(self, iterator): + """ + Binds an iterator (anything that provides __iter__) + to this pipe + """ + if not getattr(iterator, '__iter__'): + raise InvalidIterator() + self.iterator = iterator.__iter__() + + def set_logger(self, logger): + """ + + """ + self._logger = logger + + def add_watch(self, watch): + self.watches.append(watch) + + def _apply(self, data): + """ + This is the method called to perform the + translation and should therefore be overridden + in derived classes + """ + return data + + def _is_watched(self, data): + for watch in self.watches: + if watch(data): + return True + return False + + def _log_watched(self, data_in, data_out): + self.logger.log_transform(data_in, data_out) + + def __iter__(self): + return self + + def next(self): + data_in = self.iterator.next() + data_out = self._apply(data_in) + if self._is_watched(data_in) or self._is_watched(data_out): + self._log_watched(data_in, data_out) + return data_out + + +class Ensure(Pipe): + """ + Ensure that the dict passing through has certain keys and that if they + are not present then we set a default value + """ + def __init__(self, map): + super(Ensure, self).__init__() + self.map = map + + def _apply(self, data): + return dict(self.map.items() + data.items()) + + +class Filter(Pipe): + """ + Skips items that the passed in matcher + tells it to + + matcher should be a function that returns + True if the items should be filtered out + """ + def __init__(self, matcher): + self.matcher = matcher + super(Filter, self).__init__() + + def next(self): + data_out = super(Filter, self).next() + while self.matcher(data_out): + data_out = super(Filter, self).next() + return data_out + +class Rename(Pipe): + """ + Renames keys based on the passed in key_map + + The key_map should be in the form: + {'old key name': 'new key name',} + """ + def __init__(self, key_map): + self.key_map = key_map + super(Rename, self).__init__() + + def _apply(self, data): + for key in self.key_map: + if key in data: + data[self.key_map[key]] = data[key] + del data[key] + return data + +class Winnow(Pipe): + """ + Removes unwanted keys from the data passing through + """ + def __init__(self, keys): + self.keys = keys + super(Winnow, self).__init__() + + def _apply(self, data): + for key in self.keys: + if key in data: + del data[key] + return data + +class AutoIncrement(Pipe): + """ + Added a unique (for this pipe) auto-incrementing + number to to the specified key + """ + def __init__(self, key, start_value = 0, interval = 1): + super(AutoIncrement, self).__init__() + self.key = key + self.counter_value = start_value + self.interval = interval + + def _apply(self, data): + self.counter_value += self.interval + data[self.key] = self.counter_value + return data \ No newline at end of file diff --git a/pious/transform/sources.py b/pious/transform/sources.py new file mode 100644 index 0000000..e69de29 diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..e4ac980 --- /dev/null +++ b/setup.py @@ -0,0 +1,30 @@ +#!/usr/bin/env python +""" +Installation script: + +To release a new version to PyPi: +- Run: python setup.py sdist upload +""" + +from setuptools import setup, find_packages + +setup(name='pious', + version='0.0.1', + url='https://github.com/a-musing-moose/pious', + author="Jonathan Moss", + author_email="jonathan.moss@tangentsnowball.com.au", + description="A python package for dealing with basic input/output processes", + long_description=open('README.rst').read(), + keywords="Data, Processing, Pipelines", + license='BSD', + platforms=['linux'], + packages=find_packages(exclude=["*.tests"]), + include_package_data = True, + install_requires=[], + # See http://pypi.python.org/pypi?%3Aaction=list_classifiers + classifiers=['Environment :: Console', + 'Intended Audience :: Developers', + 'License :: OSI Approved :: BSD License', + 'Operating System :: Unix', + 'Programming Language :: Python'] + ) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/log_tests.py b/tests/log_tests.py new file mode 100644 index 0000000..3c8587e --- /dev/null +++ b/tests/log_tests.py @@ -0,0 +1,17 @@ +from unittest import TestCase + +from pious.transform.log import ConsoleLogger + +class ConsoleLoggerTest(TestCase): + + def test_has_log_transform(self): + l = ConsoleLogger() + try: + getattr(l, 'log_transform'); + except AttributeError: + self.fail("Console logger is missing a log_transform method") + + i = { 'a': 'b' } + o = { 'a': 'c'} + l.log_transform(i, o) + diff --git a/tests/pipe_tests.py b/tests/pipe_tests.py new file mode 100644 index 0000000..f28558e --- /dev/null +++ b/tests/pipe_tests.py @@ -0,0 +1,163 @@ +from unittest import TestCase + +from pious.transform.pipes import Pipe, Ensure, Rename, Winnow, AutoIncrement + +class PipeTest(TestCase): + + def test_bind_accepts_iterator(self): + a = [1,2,3] + p = Pipe() + try: + p.bind(a) + except Exception: + self.fail("pipe.bind() raised an unexpectedly exception!") + + def test_bind_pukes_if_not_passed_an_iterator(self): + p = Pipe() + self.assertRaises(Exception, p, {}) + +class EnsureTest(TestCase): + + def test_sets_undefined_key(self): + a = [ + {'a': 1, 'b': 1} + ] + p = Ensure({'c': 2}) + p.bind(a) + for i in p: + self.assertIn('c', i) + + def test_does_not_overwrite_existing_key(self): + a = [ + {'a': 1, 'b': 1} + ] + p = Ensure({'a': 2}) + p.bind(a) + for i in p: + self.assertIn('a', i) + self.assertEquals(i['a'], 1) + +class RenameTest(TestCase): + + def test_renames_single_key(self): + test_data = [ + {'old_key': 'a_value'}, + {'old_key': 'a_value'} + ] + key_map = { + 'old_key':'new_key' + } + p = Rename(key_map) + p.bind(test_data) + for i in p: + self.assertIn('new_key', i) + self.assertEquals(i['new_key'], 'a_value') + self.assertNotIn('old_key', i) + + def test_renames_multiple_keys(self): + test_data = [ + { + 'old_key1': 'a_value1', + 'old_key2': 'a_value2' + }, + { + 'old_key1': 'a_value1', + 'old_key2': 'a_value2' + }, + ] + key_map = { + 'old_key1':'new_key1', + 'old_key2':'new_key2' + } + p = Rename(key_map) + p.bind(test_data) + for i in p: + self.assertIn('new_key1', i) + self.assertEquals(i['new_key1'], 'a_value1') + self.assertNotIn('old_key1', i) + self.assertIn('new_key2', i) + self.assertEquals(i['new_key2'], 'a_value2') + self.assertNotIn('old_key2', i) + + +class WinnowTest(TestCase): + + def test_single_key_removed(self): + test_data = [ + {'a_key': 'a_value'}, + ] + keys = [ + 'a_key' + ] + p = Winnow(keys) + p.bind(test_data) + for i in p: + self.assertNotIn('a_key', i) + + def test_remaining_key_unaffected(self): + test_data = [ + {'a_key': 'a_value', 'a_n_other_key': 'a_n_other_value'}, + ] + keys = [ + 'a_key' + ] + p = Winnow(keys) + p.bind(test_data) + for i in p: + self.assertNotIn('a_key', i) + self.assertIn('a_n_other_key', i) + self.assertEquals(i['a_n_other_key'], 'a_n_other_value') + + +class AutoIncrementTest(TestCase): + + def test_key_added(self): + test_data = [ + {'a': 'b',}, + {'a': 'b',} + ] + p = AutoIncrement('counter') + p.bind(test_data) + for i in p: + self.assertIn('counter', i) + + def test_key_increments(self): + test_data = [ + {'a': 'b',}, + {'a': 'b',} + ] + p = AutoIncrement('counter') + p.bind(test_data) + c = 0 + for i in p: + c += 1 + self.assertIn('counter', i) + self.assertEquals(i['counter'], c) + + def test_key_increment_interval(self): + test_data = [ + {'a': 'b',}, + {'a': 'b',} + ] + p = AutoIncrement('counter', interval=2) + p.bind(test_data) + c = 0 + for i in p: + c += 2 + self.assertIn('counter', i) + self.assertEquals(i['counter'], c) + + def test_key_increment_start(self): + test_data = [ + {'a': 'b',}, + {'a': 'b',} + ] + p = AutoIncrement('counter', start_value=2) + p.bind(test_data) + c = 2 + for i in p: + c += 1 + self.assertIn('counter', i) + self.assertEquals(i['counter'], c) + +