Skip to content

Commit

Permalink
Merge pull request #26 from poloBBQ/DualSaga-Rebus-persister-implemen…
Browse files Browse the repository at this point in the history
…tation

DualSaga Rebus persister implementation
  • Loading branch information
pruiz committed Sep 21, 2017
2 parents 7656b9a + ce5e148 commit a30e1bb
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 0 deletions.
3 changes: 3 additions & 0 deletions HermaFx.Rebus/HermaFx.Rebus.csproj
Expand Up @@ -52,6 +52,9 @@
<Compile Include="MessageDateExtensions.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="RebusExtensions.cs" />
<Compile Include="SagaPersisters\DualSagaExtensions.cs" />
<Compile Include="SagaPersisters\DualSagaPersister.cs" />
<Compile Include="SagaPersisters\DualSagaTimeoutStorage.cs" />
<Compile Include="Sagas\ISagaLock.cs" />
<Compile Include="Sagas\SagaLockingExtensions.cs" />
<Compile Include="Sagas\ISagaLockingProvider.cs" />
Expand Down
63 changes: 63 additions & 0 deletions HermaFx.Rebus/SagaPersisters/DualSagaExtensions.cs
@@ -0,0 +1,63 @@
using System;
using System.Reflection;
using Rebus;
using Rebus.Configuration;
using Rebus.Timeout;

namespace HermaFx.Rebus
{
public static class DualSagaExtensions
{
private static readonly BindingFlags _PrivateBflags = BindingFlags.NonPublic | BindingFlags.Instance;

// Access to RebusSagasConfigurer(Backbone) constructor through reflection
private static readonly ConstructorInfo _RebusSagaConfigurerCtor = typeof(RebusSagasConfigurer).GetConstructor(_PrivateBflags, null, new Type[] {typeof(ConfigurationBackbone)}, null);
private static readonly Func<ConfigurationBackbone, RebusSagasConfigurer> RebusSagasConfigurerFactory = x => _RebusSagaConfigurerCtor.Invoke(new object[] { x }) as RebusSagasConfigurer;

// Access to RebusTimeoutsConfigurer(Backbone) constructor through reflection
private static readonly ConstructorInfo _RebusTimeoutsConfigurerCtor = typeof(RebusTimeoutsConfigurer).GetConstructor(_PrivateBflags, null, new Type[] { typeof(ConfigurationBackbone) }, null);
private static readonly Func<ConfigurationBackbone, RebusTimeoutsConfigurer> RebusTimeoutsConfigurerFactory = x => _RebusTimeoutsConfigurerCtor.Invoke(new object[] { x }) as RebusTimeoutsConfigurer;

/// <summary>
/// Store sagas using two IStoreSagaDatas.
/// </summary>
/// <param name="configurer">SagasConfigurer to use DualSagaPersister in</param>
/// <param name="oldStoreSagaData">Sagas are found in this IStoreSagaData and updated.</param>
/// <param name="newStoreSagaData">Sagas are found in this IStoreSagaData and updated, as well as inserted.</param>
public static void StoreInDualPersister(this RebusSagasConfigurer configurer, Action<RebusSagasConfigurer> oldStoreSagaData, Action<RebusSagasConfigurer> newStoreSagaData)
{
var backbone1 = new ConfigurationBackbone((IContainerAdapter)configurer.Backbone.ActivateHandlers);
var configurer1 = RebusSagasConfigurerFactory(backbone1);

oldStoreSagaData(configurer1);

var backbone2 = new ConfigurationBackbone((IContainerAdapter)configurer.Backbone.ActivateHandlers);
var configurer2 = RebusSagasConfigurerFactory(backbone2);

newStoreSagaData(configurer2);

configurer.Use(new DualSagaPersister(configurer1.Backbone.StoreSagaData, configurer2.Backbone.StoreSagaData));
}

/// <summary>
/// Store saga timeouts using two IStoreTimeouts.
/// </summary>
/// <param name="configurer">TimeoutsConfigurer to use DualSagaPersister in</param>
/// <param name="oldStoreTimeouts">Timeouts are found in this IStoreTimeouts.</param>
/// <param name="newStoreTimeouts">Timeouts are found in this IStoreSagaData as well as inserted.</param>
public static void StoreInDualPersister(this RebusTimeoutsConfigurer configurer, Action<RebusTimeoutsConfigurer> oldStoreTimeouts, Action<RebusTimeoutsConfigurer> newStoreTimeouts)
{
var backbone1 = new ConfigurationBackbone((IContainerAdapter)configurer.Backbone.ActivateHandlers);
var configurer1 = RebusTimeoutsConfigurerFactory(backbone1);

oldStoreTimeouts(configurer1);

var backbone2 = new ConfigurationBackbone((IContainerAdapter)configurer.Backbone.ActivateHandlers);
var configurer2 = RebusTimeoutsConfigurerFactory(backbone2);

newStoreTimeouts(configurer2);

configurer.Use(new DualSagaTimeoutStorage(configurer1.Backbone.StoreTimeouts, configurer2.Backbone.StoreTimeouts));
}
}
}
90 changes: 90 additions & 0 deletions HermaFx.Rebus/SagaPersisters/DualSagaPersister.cs
@@ -0,0 +1,90 @@
using System;
using System.Collections.Generic;

using Rebus;

namespace HermaFx.Rebus
{
public class DualSagaPersister : IStoreSagaData
{
private const string SAGA_PERSISTER_IMPL = nameof(DualSagaPersister) + ":PersisterImpl:";

private IStoreSagaData _oldSagaPersister;
private IStoreSagaData _newSagaPersister;

public DualSagaPersister(IStoreSagaData oldSagaPersister, IStoreSagaData newSagaPersister)
{
_oldSagaPersister = oldSagaPersister;
_newSagaPersister = newSagaPersister;
}

public void Insert(ISagaData sagaData, string[] sagaDataPropertyPathsToIndex)
{
// New insertions will always be persisted under the new IStoreSagaData
_newSagaPersister.Insert(sagaData, sagaDataPropertyPathsToIndex);
}

public void Update(ISagaData sagaData, string[] sagaDataPropertyPathsToIndex)
{
if (!MessageContext.HasCurrent)
{
throw new InvalidOperationException("Rebus MessageContext has no Current context, DualSagaPersister can't update the saga");
}

GetStoreSagaDataImplementation(
(bool)MessageContext.GetCurrent().Items[SAGA_PERSISTER_IMPL + sagaData.GetType().FullName])
.Update(sagaData, sagaDataPropertyPathsToIndex);
}

public void Delete(ISagaData sagaData)
{
if (!MessageContext.HasCurrent)
{
throw new InvalidOperationException("Rebus MessageContext has no Current context, DualSagaPersister can't delete the saga");
}

GetStoreSagaDataImplementation(
(bool)MessageContext.GetCurrent().Items[SAGA_PERSISTER_IMPL + sagaData.GetType().FullName])
.Delete(sagaData);
}

public T Find<T>(string sagaDataPropertyPath, object fieldFromMessage) where T : class, ISagaData
{
if (!MessageContext.HasCurrent)
{
throw new InvalidOperationException("Rebus MessageContext has no Current context, DualSagaPersister can't find the saga");
}

var persisterData = _newSagaPersister.Find<T>(sagaDataPropertyPath, fieldFromMessage);
if (persisterData != null)
{
MessageContext.GetCurrent().Items[SAGA_PERSISTER_IMPL + persisterData.GetType().FullName] = IsNewStoreSagaData(_newSagaPersister);
return persisterData;
}

persisterData = _oldSagaPersister.Find<T>(sagaDataPropertyPath, fieldFromMessage);
if (persisterData != null)
{
MessageContext.GetCurrent().Items[SAGA_PERSISTER_IMPL + persisterData.GetType().FullName] = IsNewStoreSagaData(_oldSagaPersister);
return persisterData;
}

return null;
}

/// <summary>
/// Gets the appropriate IStoreSagaData
/// </summary>
/// <param name="isNewStoreSagaData">True if new IStoreSagaData. False otherwise.</param>
/// <returns>IStoreSagaData instance</returns>
private IStoreSagaData GetStoreSagaDataImplementation(bool isNewStoreSagaData)
{
return isNewStoreSagaData ? _newSagaPersister : _oldSagaPersister;
}

private bool IsNewStoreSagaData(IStoreSagaData sagaPersister)
{
return sagaPersister == _newSagaPersister;
}
}
}
29 changes: 29 additions & 0 deletions HermaFx.Rebus/SagaPersisters/DualSagaTimeoutStorage.cs
@@ -0,0 +1,29 @@
using System.Linq;

using Rebus.Timeout;

namespace HermaFx.Rebus
{
public class DualSagaTimeoutStorage : IStoreTimeouts
{
private IStoreTimeouts _oldTimeoutStorage;
private IStoreTimeouts _newTimeoutStorage;

public DualSagaTimeoutStorage(IStoreTimeouts oldTimeoutStorage, IStoreTimeouts newTimeoutStorage)
{
_oldTimeoutStorage = oldTimeoutStorage;
_newTimeoutStorage = newTimeoutStorage;
}

public void Add(Timeout newTimeout)
{
// New insertions will always be persisted under the new IStoreTimeouts
_newTimeoutStorage.Add(newTimeout);
}

DueTimeoutsResult IStoreTimeouts.GetDueTimeouts()
{
return new DueTimeoutsResult(_oldTimeoutStorage.GetDueTimeouts().DueTimeouts.Union(_newTimeoutStorage.GetDueTimeouts().DueTimeouts));
}
}
}

0 comments on commit a30e1bb

Please sign in to comment.