Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

This is my suggestion on how to solve the event conversion. #76

Merged
merged 14 commits into from

3 participants

@MikeEast

To use it, simply implement the IConvertEvents interface and hookup the EventConverter when Wiring up the store.

::m

@jimitndiaye

What if the interface IConvertEvents<,> is implemented explicitly?

@MikeEast

Thanks for noticing!

I will add support for getting explicitly implemented methods as well.

I'll submit a new Pull request in about 7 hours or so. Or something.

@joliver

You'll probably want have the Convert function use a while loop internally because it's possible that you want to convert eventA to eventB and then eventB to eventC. It would also be really nice to have a constructor overloads so that those who wanted to use explicit registration instead of reflection-based wireup could do so (as per the project goals found in the readme). Lastly, I noticed the IConvertEvents interface was .NET 4.0-specific. I'm still supporting .NET 3.5 for this release (which should be the last one). Any particular reason for the in/out covariant/contravariant parameters on the interface?

@MikeEast

The looping thing is a great idea. I will implement it.

I was thinking about the explicit registration thing this morning and it should indeed be allowed. Assembly scanning is not a very nice way to do it. Also, I am thinking about if it should make it into its own thing in the Wireup but I better leave the decision about that to you.

The in/out covariant/contravariant thing is just a alt-enter Resharper reflex consequence. My brain had no part in that at all. It has no function whatsoever and I'll remove it.

@joliver

The only other thought I had was about the location of the conversion. In the back of my mind, I always pictured conversion as something a serialization wrapper would take care of. In other words, I've been thinking we could accomplish the same thing from the serializer level. I'm still back and forth on this one though and a pipeline hook is definitely a place where it could work, but at that point, the ordering of the pipeline hooks may become important.

@MikeEast

You have a point. The sooner the better.

Still, the interface IConvertEvents<,> is pretty much what we want, right?

@MikeEast

How about promoting the conversion to its own thing?

The default behavior is no conversion.

Using Wireup,

you can choose something like .UsingEventConversion().AutoScan() for automatic scanning of converters, like I have done now.

You can also choose something like .UsingEventConversion().WithConverters.FromAssemblyContainingType(typeof(MyConverter)). Here I think we can improve the API, but you get the idea.

Then for the implementation, the conversion will not be a IPipelineHook but occur just before the pipeline hooks executes, in OptimisticEventStore.GetFrom(..). Line #65 or so. OptimisticEventStore will get the converters injected.

I think that might be the best place. Regardless, this way the functionality is kept internal so that compability is kept even though the serializer wrapper idea of yours materializes and gets implemented.

@MikeEast

Here is another suggestion. Forgive my commit frenzy. I evidently suck at git.

In this commit, the Event conversion has become its own thing and is configurable from the Wireup. I don't know what you think about me poking around in the API, but as this is a suggestion I took the liberty in doing so.

Have a look and tell me what you think.

@joliver

Okay, here's what I'd like to do. I think making the IConvertCommits is a great interface and it should definitely have it's own interface like you have done. After looking at the issues and different storage engines, I determined that wrapping it up in the serializer is absolutely the wrong place, so it's good we didn't go down that road. However, I believe that wrapping things up in a "pipeline hook" is the right place. I'm trying to keep the primary EventStore objects as clean and uncluttered as possible which is one of the reasons I took the dispatcher and put it into a pipeline hook.

One other big one is that I'd like to move all reflection and type/assembly scanning into the wireup and out of the constructor. Ideally the converter which just be using a Dictionary> and however those were discovered or registered is a wireup issue.

@MikeEast

Ok, that sounds great!

I'll turn it back into a IPipelineHook and try to figure out a useful API for the Wireup. I'll also move all scanning things to the wireup. I might commit something tonight.

@MikeEast

Here we go! This version should do it!

The default behaviour now is that there are no event conversion. To enable it, you have to explicitly use the Wireup. There are three ways to do it.
The simplest way is to use .UsingEventUpconversion() which makes the Wireup scan all assemblies for converters.
The second alternative is to use .WithConvertersFrom(params Assembly[]) which only scans the supplied assemblies.
The final alternative is to use .WithConvertersFromAssemblyContaining(params Type[]) which scans all assemblies containing the supplied types.

The event converter pipeline hook will use the converters found from scanning and convert events when they are retrieved from the underlying persistence. This will take place before any other pipeline hook registered executes.

The event converter convert specific events recursively which means that an event of type Event1 will be converted to Event2 and then to Event3 and so forth. This recursion will continue until there is no converter found.

Event converters are created by implementing the IConvertEvents interface. The event converter chooses which converter to use based on TSource.

Feel free to trash the Wireup API as you see fit.

Let me know what you think.

@joliver joliver merged commit 757026a into from
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Sep 27, 2011
  1. Implemented an event converting IPipelineHook

    Mikael Ostberg authored
  2. Minor

    Mikael Ostberg authored
  3. Deleted old EventConverter

    Mikael Ostberg authored
Commits on Sep 28, 2011
  1. Changed the implementation from being a IPipelineHook to being its ow…

    Mikael Ostberg authored
    …n concept
  2. Changed the implementation from being a IPipelineHook to being its ow…

    Mikael Ostberg authored
    …n concept
  3. Changed the implementation from being a IPipelineHook to being its ow…

    Mikael Ostberg authored
    …n concept
  4. Changed the implementation from being a IPipelineHook to being its ow…

    Mikael Ostberg authored
    …n concept
  5. Changed the implementation from being a IPipelineHook to being its ow…

    Mikael Ostberg authored
    …n concept
  6. Fixed a possible nullref

    Mikael Ostberg authored
Commits on Sep 29, 2011
  1. Implemented the event upconverter as a IPipelineHook that is register…

    Mikael Ostberg authored
    …ed explicitly using Wireup
  2. Added prevention for creation of multiple converters with the same so…

    Mikael Ostberg authored
    …urce type
  3. Removed failing event converter so that the tests works

    Mikael Ostberg authored
This page is out of date. Refresh to see the latest.
Showing with 519 additions and 87 deletions.
  1. +1 −1  .gitignore
  2. +1 −0  src/proj/EventStore.Core/EventStore.Core.csproj
  3. +55 −0 src/proj/EventStore.Core/EventUpconverterPipelineHook.cs
  4. +3 −3 src/proj/EventStore.Core/OptimisticEventStore.cs
  5. +9 −0 src/proj/EventStore.Core/Resources.Designer.cs
  6. +6 −3 src/proj/EventStore.Core/Resources.resx
  7. +2 −0  src/proj/EventStore.Wireup/EventStore.Wireup.csproj
  8. +73 −0 src/proj/EventStore.Wireup/EventUpconverterWireup.cs
  9. +10 −0 src/proj/EventStore.Wireup/EventUpconverterWireupExtensions.cs
  10. +19 −1 src/proj/EventStore.Wireup/Messages.Designer.cs
  11. +6 −0 src/proj/EventStore.Wireup/Messages.resx
  12. +19 −14 src/proj/EventStore.Wireup/Wireup.cs
  13. +2 −0  src/proj/EventStore/EventStore.csproj
  14. +32 −32 src/proj/EventStore/ICommitEvents.cs
  15. +17 −0 src/proj/EventStore/IConvertEvents.cs
  16. +26 −26 src/proj/EventStore/IPipelineHook.cs
  17. +46 −0 src/proj/EventStore/MultipleConvertersFoundException.cs
  18. +2 −0  src/tests/EventStore.Core.UnitTests/EventStore.Core.UnitTests.csproj
  19. +182 −0 src/tests/EventStore.Core.UnitTests/EventUpconverterPipelineHookTests.cs
  20. +8 −7 src/tests/EventStore.Core.UnitTests/OptimisticEventStoreTests.cs
View
2  .gitignore
@@ -10,4 +10,4 @@ _ReSharper.*
*.Resharper
*.Cache
*.cache
-~$*
+~$*
View
1  src/proj/EventStore.Core/EventStore.Core.csproj
@@ -44,6 +44,7 @@
<Compile Include="..\VersionAssemblyInfo.cs">
<Link>Properties\VersionAssemblyInfo.cs</Link>
</Compile>
+ <Compile Include="EventUpconverterPipelineHook.cs" />
<Compile Include="DispatchSchedulerPipelinkHook.cs" />
<Compile Include="Logging\ConsoleWindowLogger.cs" />
<Compile Include="Logging\ExtensionMethods.cs" />
View
55 src/proj/EventStore.Core/EventUpconverterPipelineHook.cs
@@ -0,0 +1,55 @@
+namespace EventStore
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Linq;
+ using System.Reflection;
+ using Logging;
+
+ public class EventUpconverterPipelineHook : IPipelineHook
+ {
+ private static readonly ILog Logger = LogFactory.BuildLogger(typeof(EventUpconverterPipelineHook));
+ private readonly Dictionary<Type, Func<object, object>> converters;
+
+ public EventUpconverterPipelineHook(Dictionary<Type, Func<object, object>> converters)
+ {
+ this.converters = converters;
+ }
+
+ private object Convert(object body)
+ {
+ Func<object, object> converter;
+ object result = body;
+ if (this.converters.TryGetValue(body.GetType(), out converter))
+ {
+ result = Convert(converter(body));
+ Logger.Debug(Resources.ConvertingEvent, body.GetType(), result.GetType());
+ }
+ return result;
+ }
+
+ public Commit Select(Commit committed)
+ {
+ foreach (var eventMessage in committed.Events)
+ {
+ eventMessage.Body = Convert(eventMessage.Body);
+ }
+ return committed;
+ }
+
+ public bool PreCommit(Commit attempt)
+ {
+ return true;
+ }
+
+ public void PostCommit(Commit committed)
+ {
+ }
+
+ public void Dispose()
+ {
+ this.converters.Clear();
+ GC.SuppressFinalize(this);
+ }
+ }
+}
View
6 src/proj/EventStore.Core/OptimisticEventStore.cs
@@ -10,7 +10,7 @@ public class OptimisticEventStore : IStoreEvents, ICommitEvents
{
private static readonly ILog Logger = LogFactory.BuildLogger(typeof(OptimisticEventStore));
private readonly IPersistStreams persistence;
- private readonly IEnumerable<IPipelineHook> pipelineHooks;
+ private readonly IEnumerable<IPipelineHook> pipelineHooks;
public OptimisticEventStore(IPersistStreams persistence, IEnumerable<IPipelineHook> pipelineHooks)
{
@@ -18,7 +18,7 @@ public OptimisticEventStore(IPersistStreams persistence, IEnumerable<IPipelineHo
throw new ArgumentNullException("persistence");
this.persistence = persistence;
- this.pipelineHooks = pipelineHooks ?? new IPipelineHook[0];
+ this.pipelineHooks = pipelineHooks ?? new IPipelineHook[0];
}
public void Dispose()
@@ -73,7 +73,7 @@ public virtual IEnumerable<Commit> GetFrom(Guid streamId, int minRevision, int m
if (filtered == null)
Logger.Info(Resources.PipelineHookFilteredCommit);
else
- yield return filtered;
+ yield return filtered;
}
}
public virtual void Commit(Commit attempt)
View
9 src/proj/EventStore.Core/Resources.Designer.cs
@@ -169,6 +169,15 @@ internal class Resources {
}
/// <summary>
+ /// Looks up a localized string similar to Converting an Event from &apos;{0}&apos; to &apos;{1}&apos;..
+ /// </summary>
+ internal static string ConvertingEvent {
+ get {
+ return ResourceManager.GetString("ConvertingEvent", resourceCulture);
+ }
+ }
+
+ /// <summary>
/// Looks up a localized string similar to Creating stream &apos;{0}&apos;..
/// </summary>
internal static string CreatingStream {
View
9 src/proj/EventStore.Core/Resources.resx
@@ -279,7 +279,10 @@
<data name="UnableToDispatch" xml:space="preserve">
<value>Configured dispatcher of type '{0}' was unable to dispatch commit '{1}'.</value>
</data>
- <data name="UnableToMarkDispatched">
- <value><![CDATA[Unable to mark commit '{0}' as dispatched, the underlying storage has already been disposed]]></value>
- </data>
+ <data name="UnableToMarkDispatched" xml:space="preserve">
+ <value>Unable to mark commit '{0}' as dispatched, the underlying storage has already been disposed</value>
+ </data>
+ <data name="ConvertingEvent" xml:space="preserve">
+ <value>Converting an Event from '{0}' to '{1}'.</value>
+ </data>
</root>
View
2  src/proj/EventStore.Wireup/EventStore.Wireup.csproj
@@ -46,6 +46,8 @@
</Compile>
<Compile Include="AsynchronousDispatchSchedulerWireup.cs" />
<Compile Include="AsynchronousDispatchSchedulerWireupExtensions.cs" />
+ <Compile Include="EventUpconverterWireup.cs" />
+ <Compile Include="EventUpconverterWireupExtensions.cs" />
<Compile Include="LoggingWireupExtensions.cs" />
<Compile Include="Messages.Designer.cs">
<AutoGen>True</AutoGen>
View
73 src/proj/EventStore.Wireup/EventUpconverterWireup.cs
@@ -0,0 +1,73 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+namespace EventStore
+{
+ using System.Reflection;
+ using Logging;
+
+ public class EventUpconverterWireup : Wireup
+ {
+ private static readonly ILog Logger = LogFactory.BuildLogger(typeof(EventUpconverterWireup));
+ private readonly List<Assembly> assembliesToScan = new List<Assembly>();
+
+ public EventUpconverterWireup(Wireup wireup) : base(wireup)
+ {
+ Logger.Debug(Messages.EventUpconverterRegistered);
+
+ this.Container.Register(c =>
+ {
+ if (!this.assembliesToScan.Any())
+ this.assembliesToScan.AddRange(getAllAssemblies());
+ var converters = GetConverters(this.assembliesToScan);
+ return new EventUpconverterPipelineHook(converters);
+ });
+ }
+
+ private Dictionary<Type, Func<object, object>> GetConverters(IEnumerable<Assembly> toScan)
+ {
+ var c = from a in toScan
+ from t in a.GetTypes()
+ let i = t.GetInterface(typeof (IConvertEvents<,>).FullName)
+ where i != null
+ let sourceType = i.GetGenericArguments().First()
+ let convertMethod = i.GetMethod("Convert", BindingFlags.Public | BindingFlags.Instance | BindingFlags.NonPublic)
+ let instance = Activator.CreateInstance(t)
+ select new KeyValuePair<Type, Func<object, object>>(
+ sourceType,
+ e => convertMethod.Invoke(instance, new[] {e as object})
+ );
+ try
+ {
+ return c.ToDictionary(x => x.Key, x => x.Value);
+ }
+ catch (ArgumentException ex)
+ {
+ throw new MultipleConvertersFoundException(ex.Message, ex);
+ }
+ }
+
+ private IEnumerable<Assembly> getAllAssemblies()
+ {
+ return Assembly.GetCallingAssembly()
+ .GetReferencedAssemblies()
+ .Select(Assembly.Load)
+ .Concat(new[] {Assembly.GetCallingAssembly()});
+ }
+
+ public virtual EventUpconverterWireup WithConvertersFrom(params Assembly[] assemblies)
+ {
+ Logger.Debug(Messages.EventUpconvertersLoadedFrom, string.Concat(", ", assemblies));
+ this.assembliesToScan.AddRange(assemblies);
+ return this;
+ }
+
+ public virtual EventUpconverterWireup WithConvertersFromAssemblyContaining(params Type[] converters)
+ {
+ var assemblies = converters.Select(c => c.Assembly).Distinct();
+ Logger.Debug(Messages.EventUpconvertersLoadedFrom, string.Concat(", ", assemblies));
+ this.assembliesToScan.AddRange(assemblies);
+ return this;
+ }
+ }
+}
View
10 src/proj/EventStore.Wireup/EventUpconverterWireupExtensions.cs
@@ -0,0 +1,10 @@
+namespace EventStore
+{
+ public static class EventUpconverterWireupExtensions
+ {
+ public static EventUpconverterWireup UsingEventUpconversion(this Wireup wireup)
+ {
+ return new EventUpconverterWireup(wireup);
+ }
+ }
+}
View
20 src/proj/EventStore.Wireup/Messages.Designer.cs
@@ -1,7 +1,7 @@
//------------------------------------------------------------------------------
// <auto-generated>
// This code was generated by a tool.
-// Runtime Version:4.0.30319.235
+// Runtime Version:4.0.30319.237
//
// Changes to this file may cause incorrect behavior and will be lost if
// the code is regenerated.
@@ -205,6 +205,24 @@ internal class Messages {
}
/// <summary>
+ /// Looks up a localized string similar to Configuring the store to upconvert events when fetched..
+ /// </summary>
+ internal static string EventUpconverterRegistered {
+ get {
+ return ResourceManager.GetString("EventUpconverterRegistered", resourceCulture);
+ }
+ }
+
+ /// <summary>
+ /// Looks up a localized string similar to Will scan for event upconverters from the following assemblies: &apos;{0}&apos;.
+ /// </summary>
+ internal static string EventUpconvertersLoadedFrom {
+ get {
+ return ResourceManager.GetString("EventUpconvertersLoadedFrom", resourceCulture);
+ }
+ }
+
+ /// <summary>
/// Looks up a localized string similar to Initializing the configured persistence engine..
/// </summary>
internal static string InitializingEngine {
View
6 src/proj/EventStore.Wireup/Messages.resx
@@ -207,4 +207,10 @@
<data name="SynchronousDispatcherTwoPhaseCommits" xml:space="preserve">
<value>Only the synchronous dispatcher can enlist in two-phase commits.</value>
</data>
+ <data name="EventUpconverterRegistered" xml:space="preserve">
+ <value>Configuring the store to upconvert events when fetched.</value>
+ </data>
+ <data name="EventUpconvertersLoadedFrom" xml:space="preserve">
+ <value>Will scan for event upconverters from the following assemblies: '{0}'</value>
+ </data>
</root>
View
33 src/proj/EventStore.Wireup/Wireup.cs
@@ -1,21 +1,21 @@
-namespace EventStore
+namespace EventStore
{
using System.Collections.Generic;
using System.Linq;
using System.Transactions;
- using Dispatcher;
+ using Dispatcher;
using Persistence;
using Persistence.InMemoryPersistence;
public class Wireup
{
private readonly Wireup inner;
- private readonly NanoContainer container;
-
+ private readonly NanoContainer container;
+
protected Wireup(NanoContainer container)
{
this.container = container;
- }
+ }
protected Wireup(Wireup inner)
{
this.inner = inner;
@@ -46,35 +46,40 @@ protected NanoContainer Container
public virtual Wireup HookIntoPipelineUsing(IEnumerable<IPipelineHook> hooks)
{
return this.HookIntoPipelineUsing((hooks ?? new IPipelineHook[0]).ToArray());
- }
+ }
public virtual Wireup HookIntoPipelineUsing(params IPipelineHook[] hooks)
{
ICollection<IPipelineHook> collection = (hooks ?? new IPipelineHook[] { }).Where(x => x != null).ToArray();
this.Container.Register(collection);
return this;
}
-
+
public virtual IStoreEvents Build()
{
if (this.inner != null)
return this.inner.Build();
return this.Container.Resolve<IStoreEvents>();
- }
-
+ }
+
private static IStoreEvents BuildEventStore(NanoContainer context)
{
var scopeOption = context.Resolve<TransactionScopeOption>();
var concurrencyHook = scopeOption == TransactionScopeOption.Suppress ? new OptimisticPipelineHook() : null;
var dispatchSchedulerHook = new DispatchSchedulerPipelinkHook(context.Resolve<IScheduleDispatches>());
+ var eventUpconverterPipelineHook = context.Resolve<EventUpconverterPipelineHook>();
var pipelineHooks = context.Resolve<ICollection<IPipelineHook>>() ?? new IPipelineHook[0];
- pipelineHooks = new IPipelineHook[] { concurrencyHook, dispatchSchedulerHook }
- .Concat(pipelineHooks)
- .Where(x => x != null)
- .ToArray();
+ pipelineHooks = new IPipelineHook[] {
+ eventUpconverterPipelineHook,
+ concurrencyHook,
+ dispatchSchedulerHook
+ }
+ .Concat(pipelineHooks)
+ .Where(x => x != null)
+ .ToArray();
return new OptimisticEventStore(context.Resolve<IPersistStreams>(), pipelineHooks);
}
- }
+ }
}
View
2  src/proj/EventStore/EventStore.csproj
@@ -51,9 +51,11 @@
</Compile>
<Compile Include="Dispatcher\IDispatchCommits.cs" />
<Compile Include="ExtensionMethods.cs" />
+ <Compile Include="IConvertEvents.cs" />
<Compile Include="Logging\ILog.cs" />
<Compile Include="IPipelineHook.cs" />
<Compile Include="Logging\LogFactory.cs" />
+ <Compile Include="MultipleConvertersFoundException.cs" />
<Compile Include="Persistence\StorageUnavailableException.cs" />
<Compile Include="Serialization\IDocumentSerializer.cs" />
<Compile Include="StreamNotFoundException.cs" />
View
64 src/proj/EventStore/ICommitEvents.cs
@@ -1,36 +1,36 @@
-namespace EventStore
-{
- using System;
- using System.Collections.Generic;
- using Persistence;
-
- /// <summary>
- /// Indicates the ability to commit events and access events to and from a given stream.
+namespace EventStore
+{
+ using System;
+ using System.Collections.Generic;
+ using Persistence;
+
+ /// <summary>
+ /// Indicates the ability to commit events and access events to and from a given stream.
/// </summary>
/// <remarks>
/// Instances of this class must be designed to be multi-thread safe such that they can be shared between threads.
- /// </remarks>
- public interface ICommitEvents
- {
- /// <summary>
- /// Gets the corresponding commits from the stream indicated starting at the revision specified until the
- /// end of the stream sorted in ascending order--from oldest to newest.
- /// </summary>
- /// <param name="streamId">The stream from which the events will be read.</param>
- /// <param name="minRevision">The minimum revision of the stream to be read.</param>
- /// <param name="maxRevision">The maximum revision of the stream to be read.</param>
- /// <returns>A series of committed events from the stream specified sorted in ascending order..</returns>
- /// <exception cref="StorageException" />
- /// <exception cref="StorageUnavailableException" />
- IEnumerable<Commit> GetFrom(Guid streamId, int minRevision, int maxRevision);
-
- /// <summary>
- /// Writes the to-be-commited events provided to the underlying persistence mechanism.
- /// </summary>
- /// <param name="attempt">The series of events and associated metadata to be commited.</param>
- /// <exception cref="ConcurrencyException" />
- /// <exception cref="StorageException" />
- /// <exception cref="StorageUnavailableException" />
- void Commit(Commit attempt);
- }
+ /// </remarks>
+ public interface ICommitEvents
+ {
+ /// <summary>
+ /// Gets the corresponding commits from the stream indicated starting at the revision specified until the
+ /// end of the stream sorted in ascending order--from oldest to newest.
+ /// </summary>
+ /// <param name="streamId">The stream from which the events will be read.</param>
+ /// <param name="minRevision">The minimum revision of the stream to be read.</param>
+ /// <param name="maxRevision">The maximum revision of the stream to be read.</param>
+ /// <returns>A series of committed events from the stream specified sorted in ascending order..</returns>
+ /// <exception cref="StorageException" />
+ /// <exception cref="StorageUnavailableException" />
+ IEnumerable<Commit> GetFrom(Guid streamId, int minRevision, int maxRevision);
+
+ /// <summary>
+ /// Writes the to-be-commited events provided to the underlying persistence mechanism.
+ /// </summary>
+ /// <param name="attempt">The series of events and associated metadata to be commited.</param>
+ /// <exception cref="ConcurrencyException" />
+ /// <exception cref="StorageException" />
+ /// <exception cref="StorageUnavailableException" />
+ void Commit(Commit attempt);
+ }
}
View
17 src/proj/EventStore/IConvertEvents.cs
@@ -0,0 +1,17 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+
+namespace EventStore
+{
+ /// <summary>
+ /// Provides the ability to upconvert an event from one type to another.
+ /// </summary>
+ /// <typeparam name="TSource">The event type to convert from</typeparam>
+ /// <typeparam name="TTarget">The event type to convert to</typeparam>
+ public interface IConvertEvents<TSource, TTarget>
+ {
+ TTarget Convert(TSource sourceEvent);
+ }
+}
View
52 src/proj/EventStore/IPipelineHook.cs
@@ -1,33 +1,33 @@
-namespace EventStore
+namespace EventStore
{
using System;
- /// <summary>
- /// Provides the ability to hook into the pipeline of persisting a commit.
+ /// <summary>
+ /// Provides the ability to hook into the pipeline of persisting a commit.
/// </summary>
/// <remarks>
/// Instances of this class must be designed to be multi-thread safe such that they can be shared between threads.
- /// </remarks>
- public interface IPipelineHook : IDisposable
- {
- /// <summary>
- /// Hooks into the selection pipeline just prior to the commit being returned to the caller.
- /// </summary>
- /// <param name="committed">The commit to be filtered.</param>
- /// <returns>If successful, returns a populated commit; otherwise returns null.</returns>
- Commit Select(Commit committed);
-
- /// <summary>
- /// Hooks into the commit pipeline prior to persisting the commit to durable storage.
- /// </summary>
- /// <param name="attempt">The attempt to be committed.</param>
- /// <returns>If processing should continue, returns true; otherwise returns false.</returns>
- bool PreCommit(Commit attempt);
-
- /// <summary>
- /// Hooks into the commit pipeline just after the commit has been *successfully* committed to durable storage.
- /// </summary>
- /// <param name="committed">The commit which has been persisted.</param>
- void PostCommit(Commit committed);
- }
+ /// </remarks>
+ public interface IPipelineHook : IDisposable
+ {
+ /// <summary>
+ /// Hooks into the selection pipeline just prior to the commit being returned to the caller.
+ /// </summary>
+ /// <param name="committed">The commit to be filtered.</param>
+ /// <returns>If successful, returns a populated commit; otherwise returns null.</returns>
+ Commit Select(Commit committed);
+
+ /// <summary>
+ /// Hooks into the commit pipeline prior to persisting the commit to durable storage.
+ /// </summary>
+ /// <param name="attempt">The attempt to be committed.</param>
+ /// <returns>If processing should continue, returns true; otherwise returns false.</returns>
+ bool PreCommit(Commit attempt);
+
+ /// <summary>
+ /// Hooks into the commit pipeline just after the commit has been *successfully* committed to durable storage.
+ /// </summary>
+ /// <param name="committed">The commit which has been persisted.</param>
+ void PostCommit(Commit committed);
+ }
}
View
46 src/proj/EventStore/MultipleConvertersFoundException.cs
@@ -0,0 +1,46 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Runtime.Serialization;
+using System.Text;
+
+namespace EventStore
+{
+ /// <summary>
+ /// Represents the failure that occurs when there are two or more event converters created for the same source type.
+ /// </summary>
+ [Serializable]
+ public class MultipleConvertersFoundException : Exception
+ {
+ /// <summary>
+ /// Initializes a new instance of the MultipleConvertersFoundException class.
+ /// </summary>
+ public MultipleConvertersFoundException()
+ {
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the MultipleConvertersFoundException class.
+ /// </summary>
+ public MultipleConvertersFoundException(string message)
+ : base(message)
+ {
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the MultipleConvertersFoundException class.
+ /// </summary>
+ public MultipleConvertersFoundException(string message, Exception innerException)
+ : base(message, innerException)
+ {
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the MultipleConvertersFoundException class.
+ /// </summary>
+ protected MultipleConvertersFoundException(SerializationInfo info, StreamingContext context)
+ : base(info, context)
+ {
+ }
+ }
+}
View
2  src/tests/EventStore.Core.UnitTests/EventStore.Core.UnitTests.csproj
@@ -46,6 +46,7 @@
</Compile>
<Compile Include="DispatchCommitHookTests.cs" />
<Compile Include="DispatcherTests\AsynchronousDispatcherTests.cs" />
+ <Compile Include="EventUpconverterPipelineHookTests.cs" />
<Compile Include="ExtensionMethods.cs" />
<Compile Include="OptimisticCommitHookTests.cs" />
<Compile Include="OptimisticEventStoreTests.cs" />
@@ -65,6 +66,7 @@
<RequiredTargetFramework>4.0</RequiredTargetFramework>
<HintPath>..\..\..\bin\Moq.4.0.10827\lib\NET40\Moq.dll</HintPath>
</Reference>
+ <Reference Include="System" />
</ItemGroup>
<ItemGroup>
<Content Include="..\..\proj\CustomDictionary.xml">
View
182 src/tests/EventStore.Core.UnitTests/EventUpconverterPipelineHookTests.cs
@@ -0,0 +1,182 @@
+#pragma warning disable 169
+// ReSharper disable InconsistentNaming
+
+namespace EventStore.Core.UnitTests
+{
+ using System;
+ using System.Linq;
+ using Machine.Specifications;
+ using System.Collections.Generic;
+ using System.Reflection;
+ using It = Machine.Specifications.It;
+
+ [Subject("EventUpconverterPipelineHook")]
+ public class when_opening_a_commit_that_does_not_have_convertible_events : using_event_converter
+ {
+ static readonly Commit commit = new Commit(
+ Guid.NewGuid(), 0, Guid.NewGuid(), 0, DateTime.MinValue, null, null
+ );
+ static Commit converted;
+
+ Establish context = () =>
+ commit.Events.Add(new EventMessage { Body = new NonConvertingEvent() });
+
+ Because of = () =>
+ converted = EventUpconverter.Select(commit);
+
+ It should_not_be_converted = () =>
+ converted.ShouldBeTheSameAs(commit);
+
+ It should_have_the_same_instance_of_the_event = () =>
+ converted.Events.Single().ShouldEqual(commit.Events.Single());
+ }
+
+ [Subject("EventUpconverterPipelineHook")]
+ public class when_opening_a_commit_that_has_convertible_events : using_event_converter
+ {
+ static readonly Commit commit = new Commit(
+ Guid.NewGuid(), 0, Guid.NewGuid(), 0, DateTime.MinValue, null, null
+ );
+ static Commit converted;
+ static Guid id = Guid.NewGuid();
+ static readonly EventMessage eventMessage = new EventMessage {
+ Body = new ConvertingEvent(id)
+ };
+
+ Establish context = () =>
+ commit.Events.Add(eventMessage);
+
+ Because of = () =>
+ converted = EventUpconverter.Select(commit);
+
+ It should_be_of_the_converted_type = () =>
+ converted.Events.Single().Body.GetType().ShouldEqual(typeof(ConvertingEvent3));
+
+ It should_have_the_same_id_of_the_commited_event = () =>
+ ((ConvertingEvent3)converted.Events.Single().Body).Id.ShouldEqual(id);
+ }
+
+ [Subject("EventUpconverterPipelineHook")]
+ public class when_an_event_converter_implements_the_IConvertEvents_interface_explicitly : using_event_converter
+ {
+ static readonly Commit commit = new Commit(
+ Guid.NewGuid(), 0, Guid.NewGuid(), 0, DateTime.MinValue, null, null
+ );
+ static Commit converted;
+ static readonly Guid id = Guid.NewGuid();
+ static readonly EventMessage eventMessage = new EventMessage
+ {
+ Body = new ConvertingEvent2(id, "FooEvent")
+ };
+
+ Establish context = () =>
+ commit.Events.Add(eventMessage);
+
+ Because of = () =>
+ converted = EventUpconverter.Select(commit);
+
+ It should_be_of_the_converted_type = () =>
+ converted.Events.Single().Body.GetType().ShouldEqual(typeof(ConvertingEvent3));
+
+ It should_have_the_same_id_of_the_commited_event = () =>
+ ((ConvertingEvent3)converted.Events.Single().Body).Id.ShouldEqual(id);
+ }
+
+ public class using_event_converter
+ {
+ protected static IEnumerable<Assembly> assemblies;
+ protected static Dictionary<Type, Func<object, object>> converters;
+ protected static EventUpconverterPipelineHook EventUpconverter;
+
+ Establish context = () => {
+ assemblies = getAllAssemblies();
+ converters = GetConverters(assemblies);
+ EventUpconverter = new EventUpconverterPipelineHook(converters);
+ };
+
+ private static Dictionary<Type, Func<object, object>> GetConverters(IEnumerable<Assembly> toScan)
+ {
+ var c = from a in toScan
+ from t in a.GetTypes()
+ let i = t.GetInterface(typeof(IConvertEvents<,>).FullName)
+ where i != null
+ let sourceType = i.GetGenericArguments().First()
+ let convertMethod = i.GetMethod("Convert", BindingFlags.Public | BindingFlags.Instance | BindingFlags.NonPublic)
+ let instance = Activator.CreateInstance(t)
+ select new KeyValuePair<Type, Func<object, object>>(
+ sourceType,
+ e => convertMethod.Invoke(instance, new[] { e as object })
+ );
+ try
+ {
+ return c.ToDictionary(x => x.Key, x => x.Value);
+ }
+ catch (ArgumentException ex)
+ {
+ throw new MultipleConvertersFoundException(ex.Message, ex);
+ }
+ }
+
+ private static IEnumerable<Assembly> getAllAssemblies()
+ {
+ return Assembly.GetCallingAssembly()
+ .GetReferencedAssemblies()
+ .Select(Assembly.Load)
+ .Concat(new[] {Assembly.GetCallingAssembly()});
+ }
+ }
+
+ class ConvertingEventConverter : IConvertEvents<ConvertingEvent, ConvertingEvent2>
+ {
+ public ConvertingEvent2 Convert(ConvertingEvent sourceEvent)
+ {
+ return new ConvertingEvent2(sourceEvent.Id, "Temp");
+ }
+ }
+
+ class ExplicitConvertingEventConverter : IConvertEvents<ConvertingEvent2, ConvertingEvent3>
+ {
+ ConvertingEvent3 IConvertEvents<ConvertingEvent2, ConvertingEvent3>.Convert(ConvertingEvent2 sourceEvent)
+ {
+ return new ConvertingEvent3(sourceEvent.Id, "Temp", true);
+ }
+ }
+
+ class NonConvertingEvent {}
+ class ConvertingEvent
+ {
+ public Guid Id { get; set; }
+
+ public ConvertingEvent(Guid id)
+ {
+ Id = id;
+ }
+ }
+ class ConvertingEvent2
+ {
+ public Guid Id { get; set; }
+ public string Name { get; set; }
+
+ public ConvertingEvent2(Guid id, string name)
+ {
+ Id = id;
+ Name = name;
+ }
+ }
+ class ConvertingEvent3
+ {
+ public Guid Id { get; set; }
+ public string Name { get; set; }
+ public bool ImExplicit { get; set; }
+
+ public ConvertingEvent3(Guid id, string name, bool imExplicit)
+ {
+ Id = id;
+ Name = name;
+ ImExplicit = imExplicit;
+ }
+ }
+}
+
+// ReSharper enable InconsistentNaming
+#pragma warning restore 169
View
15 src/tests/EventStore.Core.UnitTests/OptimisticEventStoreTests.cs
@@ -382,21 +382,22 @@ public class when_disposing_the_event_store : using_persistence
public abstract class using_persistence
{
protected static Guid streamId = Guid.NewGuid();
- protected static Mock<IPersistStreams> persistence;
- protected static OptimisticEventStore store;
- protected static List<Mock<IPipelineHook>> pipelineHooks;
+ protected static Mock<IPersistStreams> persistence;
+ protected static OptimisticEventStore store;
+ protected static List<Mock<IPipelineHook>> pipelineHooks;
- Establish context = () =>
+ Establish context = () =>
{
persistence = new Mock<IPersistStreams>();
pipelineHooks = new List<Mock<IPipelineHook>>();
- store = new OptimisticEventStore(persistence.Object, pipelineHooks.Select(x => x.Object));
+
+ store = new OptimisticEventStore(persistence.Object, pipelineHooks.Select(x => x.Object));
};
- Cleanup everything = () =>
+ Cleanup everything = () =>
streamId = Guid.NewGuid();
- protected static Commit BuildCommitStub(Guid commitId)
+ protected static Commit BuildCommitStub(Guid commitId)
{
return new Commit(streamId, 1, commitId, 1, SystemTime.UtcNow, null, null);
}
Something went wrong with that request. Please try again.