In [960]:
import boto3
import pprint
from boto3.dynamodb.conditions import Key
import datetime
import dateutil

In [961]:
# Assumes external process:
#   java -Djava.library.path=./DynamoDBLocal_lib -jar DynamoDBLocal.jar -sharedDb --inMemory

ddb = boto3.resource('dynamodb', endpoint_url='http://localhost:8000')
dc = boto3.client('dynamodb', endpoint_url='http://localhost:8000')

In [962]:
try:
    _dbn
except:
    _dbn = 0
_dbn += 1

class Db(object):
    def __init__(self, ddb):
        self._ddb = ddb
        entityTableName = 'entities_%s' % _dbn
        mapTableName    = 'map_%s' % _dbn

        self._entities = ddb.create_table(
            TableName=entityTableName,
            KeySchema=[
                {
                    'AttributeName': 'name',
                    'KeyType': 'HASH'
                },
                ],
            AttributeDefinitions=[
                {
                    'AttributeName': 'name',
                    'AttributeType': 'S'
                },
                ],
            ProvisionedThroughput={
                'ReadCapacityUnits': 5,
                'WriteCapacityUnits': 5
                }
            )
        self._entities.meta.client.get_waiter('table_exists').wait(TableName=entityTableName)

        self._map = ddb.create_table(
            TableName=mapTableName,
            KeySchema=[
                {
                    'AttributeName': 'entity',
                    'KeyType': 'HASH'
                },
                {
                    'AttributeName': 'key',
                    'KeyType': 'RANGE'
                }
                ],
            AttributeDefinitions=[
                {
                    'AttributeName': 'entity',
                    'AttributeType': 'S'
                },
                {
                    'AttributeName': 'key',
                    'AttributeType': 'S'
                },
                ],
            ProvisionedThroughput={
                'ReadCapacityUnits': 5,
                'WriteCapacityUnits': 5
                }
            )
        self._map.meta.client.get_waiter('table_exists').wait(TableName=mapTableName)
     
    def getEntity(self, name):
        return self._entities.get_item(Key={'name': name})
    def putEntity(self, item):
        self._entities.put_item(Item=item)
        
    def getMapEntries(self, entity):
        return self._map.query(KeyConditionExpression=Key('entity').eq(entity.meta.path()))
    def putMapEntry(self, item):
        self._map.put_item(Item=item)
        
    def describe(self):
        print 'entities:', self._entities.item_count
        print 'map:',      self._map.item_count
        
rawdb = Db(ddb)
print rawdb

<__main__.Db object at 0x10ae87c90>


In [963]:
class TypeRegistry(object):
    def __init__(self):
        self.clsToId = {}
        self.idToCls = {}
    def add(self, cls, typeId):
        self.clsToId[cls] = typeId
        self.idToCls[typeId] = cls
    def typeId(self, cls):
        return self.clsToId[cls]
    def cls(self, typeId):
        return self.idToCls[typeId]
    def name(self, typeId):
        return self.cls(typeId).__name__
    
_tr = TypeRegistry()

In [964]:
_uid = 10000
def getUUID():
    global _uid
    _uid = _uid+1
    s = str(_uid)
    return '%02d.%s' % (len(s), s)

In [965]:
class Timestamp(object):
    def __init__(self, t=None, v=None):
        self.validTime       = v or datetime.datetime.utcnow()
        self.transactionTime = t or datetime.datetime.utcnow()
    def str(self):
        return self.transactionTime.isoformat()
    def writeForm(self):
        return [ self.transactionTime.isoformat(), self.validTime.isoformat() ]
    def __repr__(self):
        return '<TS:t=%s,v=%s>' % (self.transactionTime.isoformat(), self.validTime.isoformat())
    
    @classmethod
    def fromReadForm(cls, v):
        return Timestamp(dateutil.parser.parse(v[0]), 
                         dateutil.parser.parse(v[1]))

In [966]:
class ObjectDb(object):
    def __init__(self, db):
        self.db = db
        self.cache = {}
        
    def reify(self, d, path):
        item = d['Item']
        cls = _tr.cls(item['type'])
        obj = cls(None, db=self)
        ts = item['timestamp']
        if ts:
            ts = Timestamp.fromReadForm(ts)
        obj._fromStoredForm(path, 
                            _payload   = item['payload'], 
                            _encoding  = item['encoding'], 
                            _timestamp = ts)
        return obj
    def get(self, name):
        if name not in self.cache:
            self.cache[name] = self.reify(self.db.getEntity(name), name )
        return self.cache[name]
    def put(self, item):
        self.db.putEntity(item)
        
    def _allEventRecords(self, entity):
        response = self.db.getMapEntries(entity)
        return response['Items']
    def _allEventNames(self, entity):
        return [ i['event'] for i in self._allEventRecords(entity) ]
    def _allEventObjects(self, entity):
        names = self._allEventNames(entity)
        objs = [ self.get(name) for name in names ]
        return objs
        
    def putMapItem(self, item):
        self.db.putMapEntry(item)
        
    def describe(self):
        self.db.describe()
    
    def __enter__(self, *a):
        _dbs.append(self)
    def __exit__(self, *a):
        db = _dbs.pop()
        assert db == self
        
db = -1                             
_odb = ObjectDb(rawdb)

_dbs = [None]

def currentDb():
    return _dbs[-1]

In [967]:
class EncDec(object):
    decoders = {}
    encoders = []
    
    def __init__(self, name, test, encode, decode):
        self.name = name
        self.test = test
        self.encode = encode
        self.decode = decode
        assert name not in self.decoders
        self.encoders.append(self)
        self.decoders[name] = decode
        
    @classmethod
    def encode(cls, value):
        for e in cls.encoders:
            if e.test(value):
                return e.name, e.encode(value)
        return None, value
    
    @classmethod
    def decode(cls, name, value, meta):
        if name is None:
            return value
        return cls.decoders[name](value, meta)
    
def encode(v):
    return EncDec.encode(v)

def decode(t, v, meta):
    return EncDec.decode(t, v, meta)
    
def addEncoding(name, test, encode, decode):
    EncDec(name, test, encode, decode)

def _persist(o):
    o.write()
    return o

addEncoding('O',
            lambda v: isinstance(v, DBO),
            lambda v: _persist(v).meta.path(),
            lambda v, meta: meta.db.get(v)
           )
                   
addEncoding('OL',
            lambda v: isinstance(v, list) and v,
            lambda v: [ _persist(o).meta.path() for o in v ],
            lambda v, meta: [ meta.db.get(p) for p in v ]
           )         

In [968]:
class DBOMeta(object):
    def __init__(self, obj, name=None, db=None, kwargs=None):
        if db is None:
            db = currentDb()
        self.typeId     = _tr.typeId(obj.__class__)
        self._name      = name
        self.db         = db
        self._payload   = None
        self._encoding  = None
        self._timestamp = None
        self._data      = kwargs
        self.isNew      = True
    def _fromStoredForm(self, path, _payload, _encoding, _timestamp):
        prefix = self._prefix()
        assert path.startswith(prefix)
        self._name = path[len(prefix):]
        op = {}
        while _encoding:
            v = _encoding.pop()
            k = _encoding.pop()
            op[k] = v
        self._payload   = _payload
        self._encoding  = op
        self._timestamp = _timestamp
        self.isNew      = False
    def __repr__(self):
        s = '<Meta %s:p=%s|en=%s|ts=%s|db=%s:%s>' % (self.path(),
                                                     self._payload, 
                                                     self._encoding, 
                                                     self._timestamp, 
                                                     self.db, 
                                                     self._data)
        return s
    def _prefix(self):
        return '/Global/%s/' % _tr.name(self.typeId)
    def name(self):
        if self._name is None:
            self._name = getUUID()
        return self._name
    def path(self):
        return '%s%s' % (self._prefix(), self.name())
    def prepToWrite(self):
        if self._encoding is None:
            enc = {}
            payload = {}
            for k, v in self._data.items():
                t, s = encode(v)
                payload[k] = s
                if t:
                    enc[k] = t
            self._payload = payload
            self._encoding = enc
    def getField(self, name):
        if self._payload and name in self._payload:
            p = self._payload[name]
            t = self._encoding.get(name)
            v = decode(t, p, self)
            return v
        elif self._data and name in self._data:
            return self._data[name]
        return None
    def write(self, timestamp=None):
        if not self.isNew:
            return self
        self.prepToWrite()
        path = self.path()
        print 'Writing (meta)', path, timestamp
        op = []
        for k, v in self._encoding.items():
            op.append(k)
            op.append(v)
        item = {'name':      path,
                'type':      self.typeId,
                'payload':   self._payload,
                'encoding':  op,
                'timestamp': timestamp.writeForm() if timestamp else None,
                }
        db = self.db
        db.put(item)
        self.isNew = False
        return self
      
class EventMeta(DBOMeta):
    def write(self, timestamp=None):
        if timestamp is None:
            timestamp = Timestamp()
        self.prepToWrite()
        db = self.db
        for v in self.containers():
            m = _MapElement(v, self, timestamp, db)
            m.write()
        self._timestamp = timestamp
        return super(EventMeta, self).write(timestamp=timestamp)
    def containers(self):
        # XXX - wrong. need to understand containers correctly
        ret = []
        for k, v in self._encoding.items():
            if v == 'O':
                ret.append(self._data[k])
        return ret

In [969]:
class NoVal(object):
    pass

_noVal = NoVal()

def node(*a, **k):
    if k:
        def g(*aa, **kk):
            for kw in k:
                assert kw in ('stored',)
            f = aa[0]
            info = k.copy()
            info['name'] = f.func_name
            def fn2(*aaa, **kkk):
                obj = aaa[0]
                key = getattr(obj, f.func_name)
                v = Context.current().get(key)
                if v is not _noVal:
                    return v
                ret = f(*aaa, **kkk)
            fn2.nodeInfo = {'name': info}
            return fn2
        return g
    
    f = a[0]
    def fn(*aa, **kk):
        obj = aa[0]
        key = getattr(obj, f.func_name)
        v = Context.current().get(key)
        if v is not _noVal:
            return v
        ret = f(*aa, **kk)
    fn.nodeInfo = {'name': f.func_name}
    return fn

In [970]:
class DBOMetaClass(type):
    def __new__(cls, name, parents, attrs):
        #print 'doing', name, cls
        
        nodeFns = []
        for attrname, attrvalue in attrs.iteritems():
            if getattr(attrvalue, 'nodeInfo', 0):
                nodeFns.append(attrvalue.nodeInfo)
                
        ret = super(DBOMetaClass, cls).__new__(cls, name, parents, attrs)
        #print 'done', name, cls, nodeFns
        return ret


In [971]:
class DBO(object):
    __metaclass__ = DBOMetaClass
    _metaclass = DBOMeta
    def __init__(self, name=None, db=None, **kwargs):
        self.meta = self._metaclass(self, name=name, db=db, kwargs=kwargs)
    def _fromStoredForm(self, path, _payload, _encoding, _timestamp):
        self.meta._fromStoredForm(path, _payload, _encoding, _timestamp)
    def write(self):
        self.meta.write()
        db = self.meta.db
        db.cache[self.meta.path()] = self
        return self
    def getField(self, name):
        return self.meta.getField(name)

class _MapElement(DBO):
    def __init__(self, entity, event, timestamp, db):
        self.entity = entity
        self.event = event
        self.timestamp = timestamp
        self.db = db
    def write(self):
        item = {'entity':    self.entity.meta.path(),
                'event':     self.event.path(),
                'key':       self.timestamp.str() + '|' + self.event.path(),
                }
        db = self.db
        db.putMapItem(item)
        return self

In [972]:
class Entity(DBO):
    def _allEventRecords(self):
        db = self.meta.db
        response = db._allEventRecords(self)
        return response
    
    def _allEventNames(self):
        db = self.meta.db
        return db._allEventNames(self)
    
    def _allEventObjects(self):
        db = self.meta.db
        return db._allEventObjects(self)
    
    def clock(self):
        db = self.meta.db
        return Clock.get('Main', db=db)
    
    def _visibleEventObjects(self):
        evs = self._allEventObjects()
        cutoffs = self.clock().cutoffs()
        if cutoffs:
            evs = [ e for e in evs if e.meta._timestamp.transactionTime <= cutoffs.transactionTime ]
        return evs
    
    def _activeEventObjects(self):
        evs = self._visibleEventObjects()
        cutoffs = self.clock().cutoffs()
        deletes = set()
        cancels = set()
        active = []
        for e in reversed(evs):
            if e in deletes:
                continue
            if isinstance(e, DeleteEvent):
                deletes.update(e._amends())
                continue
            if isinstance(e, CancelEvent):
                cancels.update(e._amends())
                continue
            else:
                deletes.update(e._amends())
            if e in cancels:
                continue
            active.append(e)

        if cutoffs:
            active = [ e for e in active if e.meta._timestamp.validTime <= cutoffs.validTime ]
        return sorted(active, key=lambda e: e.meta._timestamp.validTime)
    
    def printActivity(self, evs=None):
        if evs is None:
            evs = self._allEventObjects()
        for e in evs:
            print e.str()
            
    @classmethod
    def get(cls, name, db):
        typeId = _tr.typeId(cls)
        prefix = '/Global/%s/' % _tr.name(typeId)
        path = '%s%s' % (prefix, name)
        return db.get(path)

    
class Event(DBO):
    _metaclass = EventMeta

    def amends(self):
        return self.getField('amends')

    def _amends(self):
        a = self.amends()
        if isinstance(a, list):
            return a
        if a:
            return [a]
        return []
    
    def str(self):
        return '<%s, isNew=%s, ts=%s>' % (self.meta.path(), self.meta.isNew, self.meta._timestamp)
    
class DeleteEvent(Event):
    pass

class CancelEvent(Event):
    pass

In [973]:
class Clock(Entity):
    @node
    def cutoffs(self):
        return None

In [974]:
_tr.add(Clock, 1)
_tr.add(DeleteEvent, 2)
_tr.add(CancelEvent, 3)

In [975]:
class Context(object):
    _contexts = []
    
    def __init__(self, tweaks):
        self.tweaks = tweaks
    def __enter__(self, *a):
        self._contexts.append(self)
    def __exit__(self, *a):
        c = self._contexts.pop()
        assert c == self
    def get(self, cmb):
        return self.tweaks.get(cmb, _noVal)
    
    @classmethod
    def current(cls):
        if not cls._contexts:
            cls._contexts.append(Context({}))
        return cls._contexts[-1]


In [976]:
class RefData(Entity):

    def state(self):
        evs = self._activeEventObjects()
        cutoffs = self.clock().cutoffs()
        
        if evs:
            return evs[-1]
        
    def getField(self, name):
        s = self.state()
        if s:
            return s.getField(name)

class RefDataUpdateEvent(Event):
    
    def str(self):
        return '%s: %-15s: %-15s: %-30s' % (self.meta._timestamp.validTime,
                                            self.getField('fullName'), 
                                            self.getField('address'), 
                                            self.getField('company'),
                                            ) 

_tr.add(RefData, 11)
_tr.add(RefDataUpdateEvent, 12)

In [977]:
class CustomerRefData(RefData):
    
    def fullName(self):
        return self.getField('fullName')
            
        print  
_tr.add(CustomerRefData, 20)

In [978]:
_db = _odb
with _db:
    cl = Clock('Main').write()

    cr = CustomerRefData('Customer123').write()

    ev1 = RefDataUpdateEvent(fullName='Eliza Smith',
                             address='10 Main St',
                             company='Frobozz Magic Avocado Company',
                             entity=cr).write()

    ts1 = Timestamp()

    ev2 = RefDataUpdateEvent(fullName='Eliza Smith',
                             address='10 Main St',
                             company='Frobozz Magic Friut Company',
                             entity=cr).write()
    
    ts2 = Timestamp()

    ev3 = RefDataUpdateEvent(fullName='Eliza Smith',
                             address='10 Main St',
                             company='Frobozz Magic Fruit Company',
                             entity=cr,
                             comment='Grr. Typo.',
                             amends=ev2).write()

    ts3 = Timestamp()
    
    ev4 = RefDataUpdateEvent(fullName='Eliza James',
                             address='10 Main St',
                             company='Frobozz Magic Fruit Company',
                             entity=cr).write()


cr.printActivity()

Writing /Global/Clock/Main None
Writing /Global/CustomerRefData/Customer123 None
Writing /Global/RefDataUpdateEvent/05.10001 <TS:t=2017-03-03T12:47:06.587819,v=2017-03-03T12:47:06.587817>
Writing /Global/RefDataUpdateEvent/05.10002 <TS:t=2017-03-03T12:47:06.597222,v=2017-03-03T12:47:06.597221>
Writing /Global/RefDataUpdateEvent/05.10003 <TS:t=2017-03-03T12:47:06.605891,v=2017-03-03T12:47:06.605890>
Writing /Global/RefDataUpdateEvent/05.10004 <TS:t=2017-03-03T12:47:06.622589,v=2017-03-03T12:47:06.622588>
2017-03-03 12:47:06.587817: Eliza Smith    : 10 Main St     : Frobozz Magic Avocado Company 
2017-03-03 12:47:06.605911: Eliza Smith    : 10 Main St     : Frobozz Magic Friut Company   
2017-03-03 12:47:06.605890: Eliza Smith    : 10 Main St     : Frobozz Magic Fruit Company   
2017-03-03 12:47:06.605911: Eliza Smith    : 10 Main St     : Frobozz Magic Friut Company   
2017-03-03 12:47:06.622588: Eliza James    : 10 Main St     : Frobozz Magic Fruit Company   


In [979]:
print 'current:', cr.fullName()
with Context({cl.cutoffs: ts1}) as ctx:
    print 'at ts1 :', cr.fullName()
    print
    
    cr.printActivity(cr._visibleEventObjects())

cr.printActivity(cr._visibleEventObjects())
cr.printActivity(cr._activeEventObjects())

current: Eliza James
at ts1 : Eliza Smith

2017-03-03 12:47:06.587817: Eliza Smith    : 10 Main St     : Frobozz Magic Avocado Company 
2017-03-03 12:47:06.587817: Eliza Smith    : 10 Main St     : Frobozz Magic Avocado Company 
2017-03-03 12:47:06.605911: Eliza Smith    : 10 Main St     : Frobozz Magic Friut Company   
2017-03-03 12:47:06.605890: Eliza Smith    : 10 Main St     : Frobozz Magic Fruit Company   
2017-03-03 12:47:06.605911: Eliza Smith    : 10 Main St     : Frobozz Magic Friut Company   
2017-03-03 12:47:06.622588: Eliza James    : 10 Main St     : Frobozz Magic Fruit Company   
2017-03-03 12:47:06.587817: Eliza Smith    : 10 Main St     : Frobozz Magic Avocado Company 
2017-03-03 12:47:06.605890: Eliza Smith    : 10 Main St     : Frobozz Magic Fruit Company   
2017-03-03 12:47:06.605911: Eliza Smith    : 10 Main St     : Frobozz Magic Friut Company   
2017-03-03 12:47:06.622588: Eliza James    : 10 Main St     : Frobozz Magic Fruit Company   


In [980]:
_db2 = ObjectDb(rawdb)
c = _db2.get(cr.meta.path())
print c.fullName()
print c.meta.isNew

Eliza James
False


In [981]:
class Workbook(Entity):
    
    def workItems(self):
        evs = self._activeEventObjects()
        items = set()
        print 'self:', self.meta.path()
        for e in evs:
            ibb = e._itemsByBook()
            print 'e:', e.meta.path(), ibb
        print
        
class WorkItem(Entity):
    pass

class WorkItemOpenEvent(Event):

    @node(stored=True)
    def item(self):
        ret = WorkItem(openEvent=self, db=self.meta.db)
        print 'Make %s.item() -> %s' % ( self, ret )
        return ret
    
    def book1(self):
        return self.getField('book1')
    
    def book2(self):
        return self.getField('book2')
    
    def _items(self):
        return [ [ self.item(), 1, self.book1(), self.book2() ] ]
    
    def _itemsByBook(self):
        ret = {}
        def add(i, q, b):
            if b not in ret:
                ret[b] = {}
            if i not in ret[b]:
                ret[b][i] = 0
            ret[b][i] += q
        for i, q, b1, b2 in self._items():
            add(i, q, b1)
            add(i, -q, b2)


In [982]:
_tr.add(Workbook, 30)
_tr.add(WorkItem, 31)
_tr.add(WorkItemOpenEvent, 32)

In [983]:
with _db:
    wb1 = Workbook('Customer123')
    hd  = Workbook('Helpdesk')
    wb3 = Workbook('Customer.joe')

    ev1 = WorkItemOpenEvent(message='Help, I forgot my password',
                            book1=wb1,
                            book2=hd).write()
    ev2 = WorkItemOpenEvent(message='Help! My computer is on fire!',
                            book1=wb3,
                            book2=hd).write()
    ev3 = WorkItemOpenEvent(message='My mouse is broken',
                            book1=wb1,
                            book2=hd).write()

Writing /Global/Workbook/Customer123 None
Writing /Global/Workbook/Helpdesk None
Writing /Global/WorkItemOpenEvent/05.10005 <TS:t=2017-03-03T12:47:06.758493,v=2017-03-03T12:47:06.758489>
Writing /Global/Workbook/Customer.joe None
Writing /Global/WorkItemOpenEvent/05.10006 <TS:t=2017-03-03T12:47:06.781497,v=2017-03-03T12:47:06.781495>
Writing /Global/WorkItemOpenEvent/05.10007 <TS:t=2017-03-03T12:47:06.802248,v=2017-03-03T12:47:06.802245>


In [984]:
hd.printActivity()

print hd.workItems()
print hd.workItems()

</Global/WorkItemOpenEvent/05.10005, isNew=False, ts=<TS:t=2017-03-03T12:47:06.758493,v=2017-03-03T12:47:06.758489>>
</Global/WorkItemOpenEvent/05.10006, isNew=False, ts=<TS:t=2017-03-03T12:47:06.781497,v=2017-03-03T12:47:06.781495>>
</Global/WorkItemOpenEvent/05.10007, isNew=False, ts=<TS:t=2017-03-03T12:47:06.802248,v=2017-03-03T12:47:06.802245>>
self: /Global/Workbook/Helpdesk
Make <__main__.WorkItemOpenEvent object at 0x10c0b2c10>.item() -> <__main__.WorkItem object at 0x10ad3c510>
e: /Global/WorkItemOpenEvent/05.10005 None
Make <__main__.WorkItemOpenEvent object at 0x109179a90>.item() -> <__main__.WorkItem object at 0x10ad3c510>
e: /Global/WorkItemOpenEvent/05.10006 None
Make <__main__.WorkItemOpenEvent object at 0x10ad11810>.item() -> <__main__.WorkItem object at 0x10ad3c510>
e: /Global/WorkItemOpenEvent/05.10007 None

None
self: /Global/Workbook/Helpdesk
Make <__main__.WorkItemOpenEvent object at 0x10c0b2c10>.item() -> <__main__.WorkItem object at 0x10a962cd0>
e: /Global/WorkIte