This repository has been archived by the owner on Jul 27, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
/
repository.py
125 lines (107 loc) · 4.95 KB
/
repository.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
from .unifiedtranscoder import UnifiedStoredEvent
from django.conf import settings
from django.db.utils import IntegrityError
from django.utils import timezone
from djangoevents.exceptions import AlreadyExists
from eventsourcing.domain.services.eventstore import AbstractStoredEventRepository
from eventsourcing.domain.services.eventstore import EntityVersionDoesNotExist
from eventsourcing.domain.services.transcoding import EntityVersion
from eventsourcing.utils.time import timestamp_from_uuid
import datetime
class DjangoStoredEventRepository(AbstractStoredEventRepository):
def __init__(self, *args, **kwargs):
from .models import Event # import model at runtime for making top level import possible
self.EventModel = Event
super(DjangoStoredEventRepository, self).__init__(*args, **kwargs)
def append(self, new_stored_event, new_version_number=None, max_retries=3, artificial_failure_rate=0):
"""
Saves given stored event in this repository.
"""
# Check the new event is a stored event instance.
assert isinstance(new_stored_event, UnifiedStoredEvent)
# Not calling validate_expected_version!
# Write the event.
self.write_version_and_event(
new_stored_event=new_stored_event,
new_version_number=new_version_number,
max_retries=max_retries,
artificial_failure_rate=artificial_failure_rate,
)
def get_entity_version(self, stored_entity_id, version_number):
events_query = self.EventModel.objects.filter(stored_entity_id=stored_entity_id)\
.filter(aggregate_version=version_number)
if not events_query.exists():
raise EntityVersionDoesNotExist()
return EntityVersion(
entity_version_id=self.make_entity_version_id(stored_entity_id, version_number),
event_id=events_query[0].event_id,
)
def write_version_and_event(self, new_stored_event, new_version_number=None, max_retries=3,
artificial_failure_rate=0):
try:
self.EventModel.objects.create(
event_id=new_stored_event.event_id,
event_type=new_stored_event.event_type,
event_version=new_stored_event.event_version,
event_data=new_stored_event.event_data,
aggregate_id=new_stored_event.aggregate_id,
aggregate_type=new_stored_event.aggregate_type,
aggregate_version=new_stored_event.aggregate_version,
create_date=make_aware_if_needed(new_stored_event.create_date),
metadata=new_stored_event.metadata,
module_name=new_stored_event.module_name,
class_name=new_stored_event.class_name,
stored_entity_id=new_stored_event.stored_entity_id,
)
except IntegrityError as err:
create_attempt = not new_version_number
if create_attempt:
msg = "Aggregate with id %r already exists" % new_stored_event.aggregate_id
raise AlreadyExists(msg)
else:
raise err
def get_entity_events(self, stored_entity_id, after=None, until=None, limit=None, query_ascending=True,
results_ascending=True):
events = self.EventModel.objects.filter(stored_entity_id=stored_entity_id)
if query_ascending:
events = events.order_by('id')
else:
events = events.order_by('-id')
if after is not None:
after_ts = datetime.datetime.fromtimestamp(timestamp_from_uuid(after))
if query_ascending:
events = events.filter(create_date__gt=after_ts)
else:
events = events.filter(create_date__gte=after_ts)
if until is not None:
until_ts = datetime.datetime.fromtimestamp(timestamp_from_uuid(until))
if query_ascending:
events = events.filter(create_date__lte=until_ts)
else:
events = events.filter(create_date__lt=until_ts)
if limit is not None:
events = events[:limit]
events = list(events)
if results_ascending != query_ascending:
events.reverse()
return [from_model_instance(e) for e in events]
def from_model_instance(event):
return UnifiedStoredEvent(
event_id=event.event_id,
event_type=event.event_type,
event_version=event.event_version,
event_data=event.event_data,
aggregate_id=event.aggregate_id,
aggregate_type=event.aggregate_type,
aggregate_version=event.aggregate_version,
create_date=event.create_date,
metadata=event.metadata,
module_name=event.module_name,
class_name=event.class_name,
stored_entity_id=event.stored_entity_id
)
def make_aware_if_needed(dt):
if settings.USE_TZ:
return timezone.make_aware(dt)
else:
return dt