Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
1 contributor

Users who have contributed to this file

100 lines (83 sloc) 3.08 KB
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using CqrsWithEs.Domain.Base;
using MediatR;
namespace CqrsWithEs.DataAccess
{
/*
* Clone of Greg Young in memory es
* https://github.com/gregoryyoung/m-r/blob/master/SimpleCQRS/Events.cs
*
*/
public interface IEventStore
{
IMediator Bus { get; set; }
void SaveEvents(Guid aggregateId, IEnumerable<Event> events, int expectedVersion);
List<Event> GetEventsForAggregate(Guid aggregateId);
}
public class EventStore : IEventStore
{
public IMediator Bus { get; set; }
private struct EventDescriptor
{
public readonly Event EventData;
public readonly Guid Id;
public readonly int Version;
public EventDescriptor(Guid id, Event eventData, int version)
{
EventData = eventData;
Version = version;
Id = id;
}
}
public EventStore()
{
}
private readonly IDictionary<Guid, List<EventDescriptor>> current = new ConcurrentDictionary<Guid, List<EventDescriptor>>();
public void SaveEvents(Guid aggregateId, IEnumerable<Event> events, int expectedVersion)
{
List<EventDescriptor> eventDescriptors;
// try to get event descriptors list for given aggregate id
// otherwise -> create empty dictionary
if(!current.TryGetValue(aggregateId, out eventDescriptors))
{
eventDescriptors = new List<EventDescriptor>();
current.Add(aggregateId,eventDescriptors);
}
// check whether latest event version matches current aggregate version
// otherwise -> throw exception
else if(eventDescriptors[eventDescriptors.Count - 1].Version != expectedVersion && expectedVersion != -1)
{
throw new ConcurrencyException();
}
var i = expectedVersion;
// iterate through current aggregate events increasing version with each processed event
foreach (var @event in events)
{
i++;
@event.Version = i;
// push event to the event descriptors list for current aggregate
eventDescriptors.Add(new EventDescriptor(aggregateId,@event,i));
// publish current event to the bus for further processing by subscribers
Bus.Publish(@event);
}
}
public List<Event> GetEventsForAggregate(Guid aggregateId)
{
List<EventDescriptor> eventDescriptors;
if (!current.TryGetValue(aggregateId, out eventDescriptors))
{
throw new AggregateNotFoundException();
}
return eventDescriptors.Select(desc => desc.EventData).ToList();
}
}
public class AggregateNotFoundException : Exception
{
}
public class ConcurrencyException : Exception
{
}
}
You can’t perform that action at this time.