Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

EventStoreRepository GetById with Type at Runtime #19

Open
wants to merge 1 commit into from

1 participant

@cbaxter

In order to better facilitate loading an IAggregate with a type at Runtime (without using reflection) I added two new methods to the EventStoreRepository as shown below.

public virtual IAggregate GetById(Type aggregateType, Guid id)
public virtual IAggregate GetById(Type aggregateType, Guid id, int versionToLoad)

In order to keep the IRepository interface clean, I chose to push the prior implementations of GetById below in to the RepositoryExtensions class. The downside to the latter change is that it is a breaking change for those who may have overriden the EventStoreRepository class to extend either of the origional GetById methods. Of course the latter change is not required, but I felt that it was the clean choice given that the generic type paramter was only used for typeof(TAggregate) inside the private GetAggregate method.

The parameter Type aggregateType is validated inside of private GetAggregate method to ensure the IAggregate interface has been implemented by the type and will throw an ArgumentException if missing.

@cbaxter cbaxter Modified IRepository to accept Runtime type for GetById.
* Converted IRepository interface to use Type parameter.
* Added IRepository extensions for GetById<TAggregate>.
6c6f0d1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Jan 4, 2012
  1. @cbaxter

    Modified IRepository to accept Runtime type for GetById.

    cbaxter authored
    * Converted IRepository interface to use Type parameter.
    * Added IRepository extensions for GetById<TAggregate>.
This page is out of date. Refresh to see the latest.
View
1  src/proj/CommonDomain.Persistence.EventStore/CommonDomain.Persistence.EventStore.csproj
@@ -54,6 +54,7 @@
<Compile Include="SagaEventStoreRepository.cs">
<SubType>Code</SubType>
</Compile>
+ <Compile Include="TypeExtensions.cs" />
</ItemGroup>
<ItemGroup>
<Content Include="..\CustomDictionary.xml">
View
237 src/proj/CommonDomain.Persistence.EventStore/EventStoreRepository.cs
@@ -1,97 +1,100 @@
-namespace CommonDomain.Persistence.EventStore
-{
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using global::EventStore;
- using global::EventStore.Persistence;
-
- public class EventStoreRepository : IRepository, IDisposable
- {
- private const string AggregateTypeHeader = "AggregateType";
- private readonly IDictionary<Guid, Snapshot> snapshots = new Dictionary<Guid, Snapshot>();
- private readonly IDictionary<Guid, IEventStream> streams = new Dictionary<Guid, IEventStream>();
- private readonly IStoreEvents eventStore;
- private readonly IConstructAggregates factory;
- private readonly IDetectConflicts conflictDetector;
-
- public EventStoreRepository(
- IStoreEvents eventStore,
- IConstructAggregates factory,
- IDetectConflicts conflictDetector)
- {
- this.eventStore = eventStore;
- this.factory = factory;
- this.conflictDetector = conflictDetector;
- }
-
- public void Dispose()
- {
- this.Dispose(true);
- GC.SuppressFinalize(this);
- }
- protected virtual void Dispose(bool disposing)
- {
- if (!disposing)
- return;
-
- lock (this.streams)
- {
- foreach (var stream in this.streams)
- stream.Value.Dispose();
-
- this.snapshots.Clear();
- this.streams.Clear();
- }
+namespace CommonDomain.Persistence.EventStore
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Linq;
+ using global::EventStore;
+ using global::EventStore.Persistence;
+
+ public class EventStoreRepository : IRepository, IDisposable
+ {
+ private const string AggregateTypeHeader = "AggregateType";
+ private readonly IDictionary<Guid, Snapshot> snapshots = new Dictionary<Guid, Snapshot>();
+ private readonly IDictionary<Guid, IEventStream> streams = new Dictionary<Guid, IEventStream>();
+ private readonly IStoreEvents eventStore;
+ private readonly IConstructAggregates factory;
+ private readonly IDetectConflicts conflictDetector;
+
+ public EventStoreRepository(
+ IStoreEvents eventStore,
+ IConstructAggregates factory,
+ IDetectConflicts conflictDetector)
+ {
+ this.eventStore = eventStore;
+ this.factory = factory;
+ this.conflictDetector = conflictDetector;
+ }
+
+ public void Dispose()
+ {
+ this.Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+ protected virtual void Dispose(bool disposing)
+ {
+ if (!disposing)
+ return;
+
+ lock (this.streams)
+ {
+ foreach (var stream in this.streams)
+ stream.Value.Dispose();
+
+ this.snapshots.Clear();
+ this.streams.Clear();
+ }
}
- public virtual TAggregate GetById<TAggregate>(Guid id) where TAggregate : class, IAggregate
+ public virtual IAggregate GetById(Type aggregateType, Guid id)
{
- return GetById<TAggregate>(id, int.MaxValue);
+ return GetById(aggregateType, id, int.MaxValue);
}
- public virtual TAggregate GetById<TAggregate>(Guid id, int versionToLoad) where TAggregate : class, IAggregate
- {
- var snapshot = this.GetSnapshot(id, versionToLoad);
- var stream = this.OpenStream(id, versionToLoad, snapshot);
- var aggregate = this.GetAggregate<TAggregate>(snapshot, stream);
-
- ApplyEventsToAggregate(versionToLoad, stream, aggregate);
-
- return aggregate as TAggregate;
- }
- private static void ApplyEventsToAggregate(int versionToLoad, IEventStream stream, IAggregate aggregate)
- {
- if (versionToLoad == 0 || aggregate.Version < versionToLoad)
- foreach (var @event in stream.CommittedEvents.Select(x => x.Body))
- aggregate.ApplyEvent(@event);
- }
- private IAggregate GetAggregate<TAggregate>(Snapshot snapshot, IEventStream stream)
- {
- var memento = snapshot == null ? null : snapshot.Payload as IMemento;
- return this.factory.Build(typeof(TAggregate), stream.StreamId, memento);
- }
- private Snapshot GetSnapshot(Guid id, int version)
- {
- Snapshot snapshot;
- if (!this.snapshots.TryGetValue(id, out snapshot))
- this.snapshots[id] = snapshot = this.eventStore.Advanced.GetSnapshot(id, version);
-
- return snapshot;
- }
- private IEventStream OpenStream(Guid id, int version, Snapshot snapshot)
- {
- IEventStream stream;
- if (this.streams.TryGetValue(id, out stream))
- return stream;
-
- stream = snapshot == null
- ? this.eventStore.OpenStream(id, 0, version)
- : this.eventStore.OpenStream(snapshot, version);
-
- return this.streams[id] = stream;
- }
-
+ public virtual IAggregate GetById(Type aggregateType, Guid id, int versionToLoad)
+ {
+ var snapshot = this.GetSnapshot(id, versionToLoad);
+ var stream = this.OpenStream(id, versionToLoad, snapshot);
+ var aggregate = this.GetAggregate(aggregateType, snapshot, stream);
+
+ ApplyEventsToAggregate(versionToLoad, stream, aggregate);
+
+ return aggregate;
+ }
+ private static void ApplyEventsToAggregate(int versionToLoad, IEventStream stream, IAggregate aggregate)
+ {
+ if (versionToLoad == 0 || aggregate.Version < versionToLoad)
+ foreach (var @event in stream.CommittedEvents.Select(x => x.Body))
+ aggregate.ApplyEvent(@event);
+ }
+ private IAggregate GetAggregate(Type aggregateType, Snapshot snapshot, IEventStream stream)
+ {
+ if (aggregateType == null) throw new ArgumentNullException("aggregateType");
+ if (!aggregateType.Implements(typeof(IAggregate))) throw new ArgumentException(ExceptionMessages.NotAggregateType, "aggregateType");
+
+ var memento = snapshot == null ? null : snapshot.Payload as IMemento;
+ return this.factory.Build(aggregateType, stream.StreamId, memento);
+ }
+ private Snapshot GetSnapshot(Guid id, int version)
+ {
+ Snapshot snapshot;
+ if (!this.snapshots.TryGetValue(id, out snapshot))
+ this.snapshots[id] = snapshot = this.eventStore.Advanced.GetSnapshot(id, version);
+
+ return snapshot;
+ }
+ private IEventStream OpenStream(Guid id, int version, Snapshot snapshot)
+ {
+ IEventStream stream;
+ if (this.streams.TryGetValue(id, out stream))
+ return stream;
+
+ stream = snapshot == null
+ ? this.eventStore.OpenStream(id, 0, version)
+ : this.eventStore.OpenStream(snapshot, version);
+
+ return this.streams[id] = stream;
+ }
+
public virtual void Save(IAggregate aggregate, Guid commitId, Action<IDictionary<string, object>> updateHeaders)
{
var headers = PrepareHeaders(aggregate, updateHeaders);
@@ -123,14 +126,14 @@ public virtual void Save(IAggregate aggregate, Guid commitId, Action<IDictionary
throw new PersistenceException(e.Message, e);
}
}
- }
- private IEventStream PrepareStream(IAggregate aggregate, Dictionary<string, object> headers)
- {
- IEventStream stream;
- if (!this.streams.TryGetValue(aggregate.Id, out stream))
- this.streams[aggregate.Id] = stream = this.eventStore.CreateStream(aggregate.Id);
-
- foreach (var item in headers)
+ }
+ private IEventStream PrepareStream(IAggregate aggregate, Dictionary<string, object> headers)
+ {
+ IEventStream stream;
+ if (!this.streams.TryGetValue(aggregate.Id, out stream))
+ this.streams[aggregate.Id] = stream = this.eventStore.CreateStream(aggregate.Id);
+
+ foreach (var item in headers)
stream.UncommittedHeaders[item.Key] = item.Value;
aggregate.GetUncommittedEvents()
@@ -138,24 +141,24 @@ private IEventStream PrepareStream(IAggregate aggregate, Dictionary<string, obje
.Select(x => new EventMessage { Body = x })
.ToList()
.ForEach(stream.Add);
-
- return stream;
- }
- private static Dictionary<string, object> PrepareHeaders(IAggregate aggregate, Action<IDictionary<string, object>> updateHeaders)
- {
- var headers = new Dictionary<string, object>();
-
- headers[AggregateTypeHeader] = aggregate.GetType().FullName;
- if (updateHeaders != null)
- updateHeaders(headers);
-
- return headers;
- }
- private bool ThrowOnConflict(IEventStream stream, int skip)
- {
- var committed = stream.CommittedEvents.Skip(skip).Select(x => x.Body);
- var uncommitted = stream.UncommittedEvents.Select(x => x.Body);
- return this.conflictDetector.ConflictsWith(uncommitted, committed);
- }
- }
+
+ return stream;
+ }
+ private static Dictionary<string, object> PrepareHeaders(IAggregate aggregate, Action<IDictionary<string, object>> updateHeaders)
+ {
+ var headers = new Dictionary<string, object>();
+
+ headers[AggregateTypeHeader] = aggregate.GetType().FullName;
+ if (updateHeaders != null)
+ updateHeaders(headers);
+
+ return headers;
+ }
+ private bool ThrowOnConflict(IEventStream stream, int skip)
+ {
+ var committed = stream.CommittedEvents.Skip(skip).Select(x => x.Body);
+ var uncommitted = stream.UncommittedEvents.Select(x => x.Body);
+ return this.conflictDetector.ConflictsWith(uncommitted, committed);
+ }
+ }
}
View
11 src/proj/CommonDomain.Persistence.EventStore/ExceptionMessages.Designer.cs
@@ -1,7 +1,7 @@
//------------------------------------------------------------------------------
// <auto-generated>
// This code was generated by a tool.
-// Runtime Version:4.0.30319.1
+// Runtime Version:4.0.30319.239
//
// Changes to this file may cause incorrect behavior and will be lost if
// the code is regenerated.
@@ -70,6 +70,15 @@ internal class ExceptionMessages {
}
/// <summary>
+ /// Looks up a localized string similar to The argument must implement IAggregate..
+ /// </summary>
+ internal static string NotAggregateType {
+ get {
+ return ResourceManager.GetString("NotAggregateType", resourceCulture);
+ }
+ }
+
+ /// <summary>
/// Looks up a localized string similar to There were no uncommitted changes to persist. When attempting to save an aggregate there must be at least one uncommitted event to persist..
/// </summary>
internal static string NoWork {
View
3  src/proj/CommonDomain.Persistence.EventStore/ExceptionMessages.resx
@@ -120,6 +120,9 @@
<data name="ConflictingCommand" xml:space="preserve">
<value>The command issued conflicted with another command that was sent by another user, actor, or process in the system. The change could not be automatically merged. Please review the data that has changed and try your change again.</value>
</data>
+ <data name="NotAggregateType" xml:space="preserve">
+ <value>The argument must implement IAggregate.</value>
+ </data>
<data name="NoWork" xml:space="preserve">
<value>There were no uncommitted changes to persist. When attempting to save an aggregate there must be at least one uncommitted event to persist.</value>
</data>
View
17 src/proj/CommonDomain.Persistence.EventStore/TypeExtensions.cs
@@ -0,0 +1,17 @@
+using System;
+using System.Linq;
+
+namespace CommonDomain.Persistence.EventStore
+{
+ public static class TypeExtensions
+ {
+ public static Boolean Implements(this Type type, Type interfaceType)
+ {
+ return interfaceType != null &&
+ type != null &&
+ !type.IsAbstract &&
+ type.IsClass &&
+ type.GetInterfaces().Any(item => interfaceType.IsGenericTypeDefinition ? item.IsGenericType && item.GetGenericTypeDefinition() == interfaceType : item == interfaceType);
+ }
+ }
+}
View
6 src/proj/CommonDomain.Persistence/IRepository.cs
@@ -4,9 +4,9 @@ namespace CommonDomain.Persistence
using System.Collections.Generic;
public interface IRepository
- {
- TAggregate GetById<TAggregate>(Guid id) where TAggregate : class, IAggregate;
- TAggregate GetById<TAggregate>(Guid id, int version) where TAggregate : class, IAggregate;
+ {
+ IAggregate GetById(Type aggregateType, Guid id);
+ IAggregate GetById(Type aggregateType, Guid id, int version);
void Save(IAggregate aggregate, Guid commitId, Action<IDictionary<string, object>> updateHeaders);
}
}
View
20 src/proj/CommonDomain.Persistence/RepositoryExtensions.cs
@@ -2,11 +2,21 @@
namespace CommonDomain.Persistence
{
- public static class RepositoryExtensions
+ public static class RepositoryExtensions
+ {
+ public static TAggregate GetById<TAggregate>(this IRepository repository, Guid id) where TAggregate : class, IAggregate
{
- public static void Save(this IRepository repository, IAggregate aggregate, Guid commitId)
- {
- repository.Save(aggregate, commitId, a => {});
- }
+ return repository.GetById(typeof(TAggregate), id, int.MaxValue) as TAggregate;
}
+
+ public static TAggregate GetById<TAggregate>(this IRepository repository, Guid id, int versionToLoad) where TAggregate : class, IAggregate
+ {
+ return repository.GetById(typeof(TAggregate), id, versionToLoad) as TAggregate;
+ }
+
+ public static void Save(this IRepository repository, IAggregate aggregate, Guid commitId)
+ {
+ repository.Save(aggregate, commitId, a => { });
+ }
+ }
}
Something went wrong with that request. Please try again.