From 8e0d25cd3043128a5e7c9ce5e5dc5e587503cfde Mon Sep 17 00:00:00 2001 From: Brett Hoerner Date: Mon, 8 Aug 2011 10:35:25 -0500 Subject: [PATCH] Basic implementation. --- setup.py | 2 +- tests/__init__.py | 0 tests/tests.py | 6 +++ timak/timelines.py | 116 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 123 insertions(+), 1 deletion(-) create mode 100644 tests/__init__.py create mode 100644 tests/tests.py create mode 100644 timak/timelines.py diff --git a/setup.py b/setup.py index 135c649..3790b46 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup(name='timak', version=".".join(map(str, __import__("timak").__version__)), - description='Timelines in by Riak', + description='Timelines (activity streams) backed by Riak', author='Brett Hoerner', author_email='brett@bretthoerner.com', url='http://github.com/bretthoerner/timak', diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/tests.py b/tests/tests.py new file mode 100644 index 0000000..d8a0bcf --- /dev/null +++ b/tests/tests.py @@ -0,0 +1,6 @@ +import unittest2 + + +class TimakTest(unittest2.TestCase): + def test_add(self): + self.assertEqual(1, 1) diff --git a/timak/timelines.py b/timak/timelines.py new file mode 100644 index 0000000..97672ff --- /dev/null +++ b/timak/timelines.py @@ -0,0 +1,116 @@ +import datetime + + +class Timeline(object): + def __init__(self, connection=None, bucket="timelines", order='desc', + max_items=1000): + self.connection = connection + self.bucket = bucket + self.order = order + self.max_items = max_items + + def get_connection(self): + return self.connection + + def get_bucket(self): + return self.connection.bucket(self.bucket) + + def _datetime_to_js(dt): + return int(dt.strftime("%s") + dt.strftime("%f")[:3]) + + def _merge_two(self, obj1, obj2): + """ + Merges two data dictionaries, respecting the one with the most recent + modified time per item. + """ + for uniq_ident in obj2.keys(): + if (uniq_ident not in obj1) \ + or (obj1[uniq_ident]['modified'] \ + < obj2[uniq_ident]['modified']): + obj1[uniq_ident] = obj2[uniq_ident] + + return self._dict_to_list(obj1) + + def _list_to_dict(self, l): + if not l: return {} + d = {} + for o in l: + d[o['id']] = o + return d + + def _dict_to_list(self, d): + if not d: return [] + l = d.values() + reverse = self.order == 'desc' + l.sort(key=lambda x: x['timestamp'], reverse=reverse) + return l + + def _get_obj_and_data(self, key, write_merged=True): + """ + Returns RiakObject with proper vclock set and dictionary of merged entries. + + NOTE: The data on the object itself should not be used, the object is + returned only so it can be used later for updates. + """ + bucket = self.get_bucket() + + obj = bucket.get(key) + data = [self._list_to_dict(o.get_data()) for o + in obj.get_siblings() + if o.get_data() is not None] + + # if we have no data or only 1 sibling we can safetly return + # it without merging + if len(data) == 0: + return obj, {} + elif len(data) == 1: + return obj, data[0] + + resolved_data = reduce(self._merge_two, data) + # NOTE: is this really the only way to fix a conflict in the + # python riak library? + obj._vclock = obj.get_sibling(1).vclock() + + if write_merged: + obj.set_data(self._dict_to_list(resolved_data)[:self.max_items]) + obj.store() + + return obj, resolved_data + + def get(self, key): + """ + Returns timeline as list. + """ + # TODO: Optimize this so we don't have to coerce + # list->dict->list for the common case. + return self._dict_to_list(self._get_obj_and_data(key)[1]) + + def op(self, key, uniq_ident, obj_datetime, data=None, action='add'): + now = self._datetime_to_js(datetime.datetime.utcnow()) + obj, data = self._get_obj_and_data(key, write_merged=False) + + new_item = {'id': uniq_ident, + 'timestamp': self._datetime_to_js(obj_datetime), + 'modified': now} + if data: + new_item['data'] = data + if action == 'delete': + new_item['deleted'] = True + + existing = data.get(uniq_ident, None) + if existing: + if existing['modified'] < now: + data[uniq_ident] = new_item + else: + data[uniq_ident] = new_item + + timeline = self._dict_to_list(data)[:self.max_items] + obj.set_data(timeline) + obj.store() + return timeline + + def add(self, key, uniq_ident, obj_datetime, data=None): + return self.op(key, uniq_ident, obj_datetime, data=data) + + def delete(self, key, uniq_ident, obj_datetime, data=None): + return self.op(key, uniq_ident, obj_datetime, data=data, action='delete')