Skip to content

Commit

Permalink
Use JSONAttribute instead of MapAttribute for the Event model. Dispat…
Browse files Browse the repository at this point in the history
…ch event_post_save. Update sample stream event source data in tests.
  • Loading branch information
geeknam committed Apr 12, 2017
1 parent 51e919a commit e925d9b
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 86 deletions.
83 changes: 38 additions & 45 deletions esser/handlers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,48 @@
import importlib
from collections import defaultdict
import json
from esser.signals import event_received, event_post_save
from esser.registry import registry
from esser.repositories.base import Event


def get_aggregate(aggregate_name, aggregate_id):
"""Given an aggregate name and id return Aggregate instance
Args:
aggregate_name (str): Aggregate Name
aggregate_id (str): Aggregate ID
Returns:
esser.entities.Entity: aggregate / entity
"""
path = registry.get_path(aggregate_name)
module, class_name = path.rsplit('.', 1)
app_module = importlib.import_module(module)
aggregate_class = getattr(app_module, class_name)
aggregate = aggregate_class(
aggregate_id=aggregate_id
)
return aggregate
class LambdaHandler(object):

@staticmethod
def get_aggregate(aggregate_name, aggregate_id):
"""Given an aggregate name and id return Aggregate instance
def image_to_event(image):
aggregate_id, version = image['aggregate_key']['S'].split(':')
return Event(
aggregate_name=image['aggregate_name']['S'],
aggregate_id=aggregate_id,
version=version,
event_type=image['event_type']['S'],
created_at=image['created_at']['S'],
event_data=image['event_data']['M'],
)
Args:
aggregate_name (str): Aggregate Name
aggregate_id (str): Aggregate ID
Returns:
esser.entities.Entity: aggregate / entity
"""
path = registry.get_path(aggregate_name)
module, class_name = path.rsplit('.', 1)
app_module = importlib.import_module(module)
aggregate_class = getattr(app_module, class_name)
aggregate = aggregate_class(
aggregate_id=aggregate_id
)
return aggregate

class LambdaHandler(object):
@staticmethod
def image_to_event(image):
aggregate_id, version = image['aggregate_key']['S'].split(':')
return Event(
aggregate_name=image['aggregate_name']['S'],
aggregate_id=aggregate_id,
version=version,
event_type=image['event_type']['S'],
created_at=image['created_at']['S'],
event_data=json.loads(image['event_data']['S']),
)

def handle_event(self, event, context):
event_name = event['EventName']
aggregate_id = event.get('AggregateId', None)
aggregate = get_aggregate(event['AggregateName'], aggregate_id)
aggregate = self.get_aggregate(event['AggregateName'], aggregate_id)
event_received.send(
sender=self.__class__,
aggregate_name=event['AggregateName'],
Expand All @@ -59,25 +59,18 @@ def handle_event(self, event, context):
return

def handle_stream(self, event, context):
aggregates = defaultdict(dict)
for record in event['Records']:
keys = record['dynamodb']['Keys']
# new_image = record['dynamodb']['NewImage']
new_image = record['dynamodb']['NewImage']
aggregate_name = keys['aggregate_name']['S']
aggregate_key = keys['aggregate_key']['S']
aggregate_id = aggregate_key.split(':')[0]
aggregate = get_aggregate(aggregate_name, aggregate_id)
# event_post_save.send(
# sender=aggregate.__class__,
# aggregate=aggregate,
# aggregate_name=aggregate.aggregate_name,
# aggregate_id=aggregate.aggregate_id,
# event_name=self.event_name,
# version=event_version,
# payload=attrs,
# )
aggregates[aggregate_name][aggregate_id] = aggregate.current_state
return aggregates
aggregate = self.get_aggregate(aggregate_name, aggregate_id)
event_obj = self.image_to_event(new_image)
event_post_save.send(
sender=aggregate.__class__,
event=event_obj
)


default_handler = LambdaHandler()
Expand Down
2 changes: 1 addition & 1 deletion esser/repositories/dynamodb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def to_event(self, obj):
version=obj.version,
event_type=obj.event_type,
created_at=obj.created_at,
event_data=obj.event_data.as_dict()
event_data=obj.event_data
)

def get_events(self, version):
Expand Down
4 changes: 2 additions & 2 deletions esser/repositories/dynamodb/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from pynamodb.models import Model
from pynamodb.attributes import (
UnicodeAttribute, UTCDateTimeAttribute,
MapAttribute
JSONAttribute, MapAttribute
)
from esser.constants import AGGREGATE_KEY_DELIMITER

Expand All @@ -20,7 +20,7 @@ class Meta:
aggregate_key = UnicodeAttribute(range_key=True)
event_type = UnicodeAttribute()
created_at = UTCDateTimeAttribute()
event_data = MapAttribute()
event_data = JSONAttribute()

@classmethod
def _conditional_operator_check(cls, conditional_operator):
Expand Down
5 changes: 1 addition & 4 deletions esser/signals/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,5 @@
)

event_post_save = dispatch.Signal(
providing_args=[
'aggregate_name', 'aggregate_id', 'payload',
'event_name', 'version',
]
providing_args=['event']
)
8 changes: 3 additions & 5 deletions examples/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
def on_event_received(event, context):
try:
event = handle_event(event, context)
return event.event_data.as_dict()
return event.event_data
except Exception as exc:
log.exception('Event: %s', event)
log.exception('Exception: %s', exc)
Expand All @@ -21,11 +21,9 @@ def on_event_saved(event, context):


def route(event, context):
log.info(event)
if 'EventName' in event:
event = on_event_received(event, context)
log.info(event)
return event
else:
aggregates = on_event_saved(event, context)
log.info(aggregates)
return aggregates
handle_stream(event, context)
11 changes: 8 additions & 3 deletions examples/items/receivers.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
from esser.signals.decorators import receiver
from esser.signals import event_pre_save, event_received
from esser.signals import event_pre_save, event_received, event_post_save
from esser.handlers import LambdaHandler
from items.commands import UpdatePrice


@receiver(event_pre_save, sender=UpdatePrice)
def check_price_update(sender, **kwargs):
pass
print('Event pre save handled: %s' % kwargs)


@receiver(event_received, sender=LambdaHandler)
def do_something(sender, **kwargs):
pass
print('Command received handled: %s' % kwargs)


@receiver(event_post_save)
def handle_event_saved(sender, **kwargs):
print('Post saved handled: %s' % kwargs['event'].as_dict())
18 changes: 7 additions & 11 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@

from setuptools import find_packages
from setuptools import setup

packages = [
'esser',
'esser.cli',
'esser.contrib',
'esser.handlers',
'esser.infra',
'esser.repositories'
]

setup(
name='esser',
packages=packages,
packages=find_packages(
exclude=[
'examples.*', 'examples', 'tests', 'requirements',
'docs'
]
),
license='Apache 2.0',
version='0.1.1',
description='Python Event Sourcing framework',
Expand Down
File renamed without changes.
28 changes: 13 additions & 15 deletions tests/test_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ def stream_factory(self, aggregate_name, aggregate_key):
"S": aggregate_key
}
},
'NewImage': {
'created_at': {'S': '2017-04-12T06:56:06.191104+0000'},
'aggregate_name': {'S': 'Item'},
'aggregate_key': {'S': '83e7b629-58a0-4194-80e4-dab5dbbd63c1:1'},
'event_type': {'S': 'ItemCreated'},
'event_data': {
'S': '{"price": 15, "name": "Coffee"}'
}
},
"SequenceNumber": "111",
"SizeBytes": 26,
"StreamViewType": "KEYS_ONLY"
Expand All @@ -37,26 +46,15 @@ def stream_factory(self, aggregate_name, aggregate_key):
]
}

def test_handle_stream(self):
@patch('examples.items.receivers.handle_event_saved')
def test_handle_stream(self, mock_handle):
with patch('uuid.uuid4') as mock_uuid:
mock_uuid.return_value = 'mockuuid'
self.item.created.save(
attrs={
'name': 'Yummy Choc',
'price': 10.50
'name': 'Yummy Choc', 'price': 10.50
}
)
event = self.item.price_updated.save(attrs={'price': 12.50})
stream = self.stream_factory('Item', event.aggregate_key)
aggregates = handle_stream(stream, {})
self.assertEquals(
aggregates,
{
'Item': {
'mockuuid': {
'name': 'Yummy Choc', 'price': 12.50,
'latest_version': 2
}
}
}
)
handle_stream(stream, {})

0 comments on commit e925d9b

Please sign in to comment.