-
Notifications
You must be signed in to change notification settings - Fork 533
/
EventStore.cs
94 lines (77 loc) · 3.04 KB
/
EventStore.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
using System;
using System.Collections.Generic;
using System.Linq;
namespace SimpleCQRS
{
public interface IEventStore
{
void SaveEvents(Guid aggregateId, IEnumerable<Event> events, int expectedVersion);
List<Event> GetEventsForAggregate(Guid aggregateId);
}
public class EventStore : IEventStore
{
private readonly IEventPublisher _publisher;
private struct EventDescriptor
{
public readonly Event EventData;
public readonly Guid Id;
public readonly int Version;
public EventDescriptor(Guid id, Event eventData, int version)
{
EventData = eventData;
Version = version;
Id = id;
}
}
public EventStore(IEventPublisher publisher)
{
_publisher = publisher;
}
private readonly Dictionary<Guid, List<EventDescriptor>> _current = new Dictionary<Guid, List<EventDescriptor>>();
public void SaveEvents(Guid aggregateId, IEnumerable<Event> events, int expectedVersion)
{
List<EventDescriptor> eventDescriptors;
// try to get event descriptors list for given aggregate id
// otherwise -> create empty dictionary
if(!_current.TryGetValue(aggregateId, out eventDescriptors))
{
eventDescriptors = new List<EventDescriptor>();
_current.Add(aggregateId,eventDescriptors);
}
// check whether latest event version matches current aggregate version
// otherwise -> throw exception
else if(eventDescriptors[eventDescriptors.Count - 1].Version != expectedVersion && expectedVersion != -1)
{
throw new ConcurrencyException();
}
var i = expectedVersion;
// iterate through current aggregate events increasing version with each processed event
foreach (var @event in events)
{
i++;
@event.Version = i;
// push event to the event descriptors list for current aggregate
eventDescriptors.Add(new EventDescriptor(aggregateId,@event,i));
// publish current event to the bus for further processing by subscribers
_publisher.Publish(@event);
}
}
// collect all processed events for given aggregate and return them as a list
// used to build up an aggregate from its history (Domain.LoadsFromHistory)
public List<Event> GetEventsForAggregate(Guid aggregateId)
{
List<EventDescriptor> eventDescriptors;
if (!_current.TryGetValue(aggregateId, out eventDescriptors))
{
throw new AggregateNotFoundException();
}
return eventDescriptors.Select(desc => desc.EventData).ToList();
}
}
public class AggregateNotFoundException : Exception
{
}
public class ConcurrencyException : Exception
{
}
}