Skip to content
This repository has been archived by the owner on Feb 9, 2021. It is now read-only.

Commit

Permalink
Closes #8
Browse files Browse the repository at this point in the history
  • Loading branch information
Dariusz Lenartowicz committed Oct 21, 2016
1 parent 79c11a0 commit 04fecb7
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public class ProjectionsSubscriptionPooler : SubscriptionPooler
{
public ProjectionsSubscriptionPooler(ISubscriptionEventSource[] sources) : base(sources)
{

RetriesPolicy = PoolerRetriesPolicy.Defaut();
}

protected override IEnumerable<Type> FindHandlerTypes()
Expand Down
17 changes: 17 additions & 0 deletions src/Ses.Subscriptions/FetchAttemptsThresholdException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using System;

namespace Ses.Subscriptions
{
internal class FetchAttemptsThresholdException : Exception
{
public string PoolerType { get; private set; }
public int ExecuteRetryAttempts { get; private set; }

public FetchAttemptsThresholdException(string poolerType, int executeRetryAttempts, Exception exception)
: base($"Pooler {poolerType} excides retries attempts threshold.", exception)
{
PoolerType = poolerType;
ExecuteRetryAttempts = executeRetryAttempts;
}
}
}
26 changes: 26 additions & 0 deletions src/Ses.Subscriptions/PoolerRetriesPolicy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
namespace Ses.Subscriptions
{
public class PoolerRetriesPolicy
{
public int HandlerAttemptsThreshold { get; private set; }
public int FetchAttemptsThreshold { get; private set; }

public PoolerRetriesPolicy(int fetchAttemptsThreshold, int handlerAttemptsThreshold)
{
if (fetchAttemptsThreshold < 0) fetchAttemptsThreshold = 0;
if (handlerAttemptsThreshold < 0) handlerAttemptsThreshold = 0;
FetchAttemptsThreshold = fetchAttemptsThreshold;
HandlerAttemptsThreshold = handlerAttemptsThreshold;
}

public static PoolerRetriesPolicy Defaut()
{
return new PoolerRetriesPolicy(3, 3);
}

public static PoolerRetriesPolicy NoRetries()
{
return new PoolerRetriesPolicy(0, 0);
}
}
}
14 changes: 11 additions & 3 deletions src/Ses.Subscriptions/Runner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,17 @@ private async Task Run()
}
else
{
var anyDispatched = await Pooler.Execute(_poolerContext, _disposedTokenSource.Token);
_runnerTimer.Interval = _timeoutCalc.CalculateNext(anyDispatched);
_runnerTimer.Start();
try
{
var anyDispatched = await Pooler.Execute(_poolerContext, _disposedTokenSource.Token);
_runnerTimer.Interval = _timeoutCalc.CalculateNext(anyDispatched);
_runnerTimer.Start();
}
catch (FetchAttemptsThresholdException e)
{
_poolerContext.Logger.Error(e.ToString());
Stop();
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/Ses.Subscriptions/Ses.Subscriptions.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@
<ItemGroup>
<Compile Include="EnumerableExtensions.cs" />
<Compile Include="EventStoreSubscriptions.cs" />
<Compile Include="FetchAttemptsThresholdException.cs" />
<Compile Include="HandlerRegistrar.cs" />
<Compile Include="IEventStoreSubscriptions.cs" />
<Compile Include="IPoolerStateRepository.cs" />
<Compile Include="PoolerContext.cs" />
<Compile Include="PoolerRetriesPolicy.cs" />
<Compile Include="PoolerState.cs" />
<Compile Include="PoolerTimeoutCalculator.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
Expand Down
59 changes: 37 additions & 22 deletions src/Ses.Subscriptions/SubscriptionPooler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public abstract class SubscriptionPooler : ISubscriptionPooler

protected SubscriptionPooler(ISubscriptionEventSource[] sources)
{
RetriesPolicy = PoolerRetriesPolicy.Defaut();
Sources = sources;
_handlerRegistrar = CreateHandlerRegistrar();
_contractSubscriptions = new Dictionary<ISubscriptionEventSource, int>(Sources.Length);
Expand All @@ -26,6 +27,7 @@ protected SubscriptionPooler(ISubscriptionEventSource[] sources)
private HandlerRegistrar CreateHandlerRegistrar() => new HandlerRegistrar(FindHandlerTypes());

public ISubscriptionEventSource[] Sources { get; }
public PoolerRetriesPolicy RetriesPolicy { get; protected set; }
public virtual TimeSpan? RunForDuration => null;
public virtual TimeSpan GetFetchTimeout() => TimeSpan.Zero;
protected abstract IEnumerable<Type> FindHandlerTypes();
Expand All @@ -36,6 +38,7 @@ protected SubscriptionPooler(ISubscriptionEventSource[] sources)

internal void OnStart(IContractsRegistry contractsRegistry)
{
if (RetriesPolicy == null) RetriesPolicy = PoolerRetriesPolicy.NoRetries();
_poolerContractName = contractsRegistry.GetContractName(GetType());
var eventTypes = GetConcreteSubscriptionEventTypes();
if (eventTypes == null) return;
Expand Down Expand Up @@ -67,36 +70,48 @@ internal async Task OnStartAsync(IContractsRegistry contractsRegistry)
internal async Task<bool> Execute(PoolerContext ctx, CancellationToken cancellationToken = default(CancellationToken))
{
var anyDispatched = false;
try
var executionRetryAttempts = 0;
while (true)
{
var poolerStates = new List<PoolerState>(await ctx.StateRepository.LoadAsync(_poolerContractName, cancellationToken));
var timeline = await FetchEventTimeline(ctx, poolerStates);

foreach (var item in timeline)
try
{
foreach (var handlerInfo in _handlerRegistrar.RegisteredHandlerInfos) // all handlers can/should run in parallel
var poolerStates = new List<PoolerState>(await ctx.StateRepository.LoadAsync(_poolerContractName, cancellationToken));
var timeline = await FetchEventTimeline(ctx, poolerStates);

foreach (var item in timeline)
{
var state = FindOrCreateState(ctx.ContractsRegistry, poolerStates, item.SourceType, handlerInfo.HandlerType);
if (item.Envelope.SequenceId > state.EventSequenceId)
foreach (var handlerInfo in _handlerRegistrar.RegisteredHandlerInfos) // all handlers can/should run in parallel
{
try
{
anyDispatched = await TryDispatch(ctx, handlerInfo, item.Envelope, state);
}
catch (Exception ex)
var state = FindOrCreateState(ctx.ContractsRegistry, poolerStates, item.SourceType, handlerInfo.HandlerType);
if (item.Envelope.SequenceId > state.EventSequenceId)
{
PostHandleEventException(item.Envelope, handlerInfo.HandlerType, ex);
throw;
var handlingRetryAttempts = 0;
while (true)
{
try
{
anyDispatched = await TryDispatch(ctx, handlerInfo, item.Envelope, state);
break;
}
catch (Exception ex)
{
PostHandleEventError(item.Envelope, handlerInfo.HandlerType, ex, handlingRetryAttempts);
if (handlingRetryAttempts >= RetriesPolicy.HandlerAttemptsThreshold) throw;
handlingRetryAttempts++;
}
}
}
}
}
return anyDispatched;
}
catch (Exception e)
{
if (executionRetryAttempts >= RetriesPolicy.FetchAttemptsThreshold)
throw new FetchAttemptsThresholdException(GetType().Name, executionRetryAttempts, e);
executionRetryAttempts++;
}
}
catch (Exception e)
{
ctx.Logger.Error(e.ToString());
}
return anyDispatched;
}

private async Task<bool> TryDispatch(PoolerContext ctx, HandlerRegistrar.HandlerTypeInfo handlerInfo, EventEnvelope envelope, PoolerState state)
Expand Down Expand Up @@ -131,7 +146,7 @@ private async Task<bool> TryDispatch(PoolerContext ctx, HandlerRegistrar.Handler

protected virtual void PreHandleEvent(EventEnvelope envelope, Type handlerType) { }
protected virtual void PostHandleEvent(EventEnvelope envelope, Type handlerType) { }
protected virtual void PostHandleEventException(EventEnvelope envelope, Type handlerType, Exception exception) { }
protected virtual void PostHandleEventError(EventEnvelope envelope, Type handlerType, Exception exception, int retryAttempts) { }

private PoolerState FindOrCreateState(IContractsRegistry contractsRegistry, List<PoolerState> poolerStates, Type sourceType, Type handlerType)
{
Expand Down Expand Up @@ -172,7 +187,7 @@ private async Task<List<ExtractedEvent>> FetchEventTimeline(PoolerContext ctx, L

var events = await Task.WhenAll(tasks.ToArray());
var merged = Merge(events);
ctx.Logger.Trace("Fetched {0} events from {1} stream sources.", merged.Count.ToString(), Sources.Length.ToString());
ctx.Logger.Trace("{0} fetched {1} events from {2} stream sources.", _poolerContractName, merged.Count.ToString(), Sources.Length.ToString());
return merged;
}

Expand Down

0 comments on commit 04fecb7

Please sign in to comment.