Skip to content
This repository has been archived by the owner on Feb 9, 2021. It is now read-only.

Commit

Permalink
Closes #12
Browse files Browse the repository at this point in the history
  • Loading branch information
Dariusz Lenartowicz committed Nov 7, 2016
1 parent 8d6aa0a commit 8f665e7
Show file tree
Hide file tree
Showing 10 changed files with 117 additions and 10 deletions.
10 changes: 10 additions & 0 deletions src/Ses.Samples/SampleRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ public async Task Run()

Console.WriteLine(@"Starting subscriptions");
var subs = await SampleSubscriptions();

foreach (var poller in subs.GetPollers())
{
Console.WriteLine(poller);
foreach (var state in poller.SourceSequenceInfo)
{
Console.WriteLine($@" {state}");
}
}

await Task.Delay(5000);
Console.WriteLine(@"Stopping subscriptions");
store.Dispose();
Expand Down
9 changes: 7 additions & 2 deletions src/Ses.Subscriptions/EventStoreSubscriptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,14 @@ public void RunStoppedPollers()
}
}

public Type[] GetPollerTypes()
public PollerInfo[] GetPollers()
{
return _runners.Keys.ToArray();
return _pollers
.Select(x => x.GetInfo(
_runners[x.GetType()].GetInfo(),
_contractRegistry,
_pollerStateRepository))
.ToArray();
}
}
}
2 changes: 1 addition & 1 deletion src/Ses.Subscriptions/IEventStoreSubscriptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ public interface IEventStoreSubscriptions : IDisposable
{
void RunStoppedPoller(Type type, bool force = false);
void RunStoppedPollers();
Type[] GetPollerTypes();
PollerInfo[] GetPollers();
}
}
25 changes: 25 additions & 0 deletions src/Ses.Subscriptions/PollerInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using System;

namespace Ses.Subscriptions
{
public class PollerInfo
{
public RunnerInfo Runner { get; }
public Type PollerType { get; }
public string PollerContractName { get; }
public SourcePollerState[] SourceSequenceInfo { get; }

internal PollerInfo(RunnerInfo runner, Type pollerType, string pollerContractName, SourcePollerState[] sourceSequenceInfo)
{
Runner = runner;
PollerType = pollerType;
PollerContractName = pollerContractName;
SourceSequenceInfo = sourceSequenceInfo;
}

public override string ToString()
{
return $"Poller {PollerContractName} Locked:{Runner.IsLockedByPolicy}/Running:{Runner.IsRunning}";
}
}
}
8 changes: 2 additions & 6 deletions src/Ses.Subscriptions/PollerState.cs
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
namespace Ses.Subscriptions
{
public class PollerState
public class PollerState : SourcePollerState
{
public PollerState(string pollerContractName, string sourceContractName, string handlerContractName)
: base(pollerContractName, sourceContractName)
{
PollerContractName = pollerContractName;
SourceContractName = sourceContractName;
HandlerContractName = handlerContractName;
}

public string PollerContractName { get; }
public string SourceContractName { get; }
public string HandlerContractName { get; }
public long EventSequenceId { get; set; }

public override string ToString()
{
Expand Down
7 changes: 6 additions & 1 deletion src/Ses.Subscriptions/Runner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void Start()
{
if (_isLockedByPolicy)
{
_pollerContext.Logger.Warn($"Runner for poller {Poller.GetType().FullName} is locked and can't be started.");
_pollerContext.Logger.Warn($"Runner for poller {Poller.GetType().FullName} is locked and can't be started. Use ForceStart.");
}
else
{
Expand Down Expand Up @@ -139,5 +139,10 @@ private void Dispose(bool disposing)
_disposedTokenSource = null;
_runnerTimer = null;
}

public RunnerInfo GetInfo()
{
return new RunnerInfo(_startedAt.Value, _isLockedByPolicy, _isRunning);
}
}
}
18 changes: 18 additions & 0 deletions src/Ses.Subscriptions/RunnerInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using System;

namespace Ses.Subscriptions
{
public class RunnerInfo
{
public DateTime StartedAt { get; }
public bool IsLockedByPolicy { get; }
public bool IsRunning { get; }

internal RunnerInfo(DateTime startedAt, bool isLockedByPolicy, bool isRunning)
{
StartedAt = startedAt;
IsLockedByPolicy = isLockedByPolicy;
IsRunning = isRunning;
}
}
}
3 changes: 3 additions & 0 deletions src/Ses.Subscriptions/Ses.Subscriptions.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,14 @@
<Compile Include="IEventStoreSubscriptions.cs" />
<Compile Include="IPollerStateRepository.cs" />
<Compile Include="PollerContext.cs" />
<Compile Include="PollerInfo.cs" />
<Compile Include="PollerRetriesPolicy.cs" />
<Compile Include="PollerState.cs" />
<Compile Include="PollerTimeoutCalculator.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Runner.cs" />
<Compile Include="RunnerInfo.cs" />
<Compile Include="SourcePollerState.cs" />
<Compile Include="SubscriptionPoller.cs" />
<Compile Include="..\SharedAssemblyInfo.cs">
<Link>Properties\SharedAssemblyInfo.cs</Link>
Expand Down
20 changes: 20 additions & 0 deletions src/Ses.Subscriptions/SourcePollerState.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
namespace Ses.Subscriptions
{
public class SourcePollerState
{
public SourcePollerState(string pollerContractName, string sourceContractName)
{
PollerContractName = pollerContractName;
SourceContractName = sourceContractName;
}

public string PollerContractName { get; }
public string SourceContractName { get; }
public long EventSequenceId { get; set; }

public override string ToString()
{
return $"{PollerContractName}/{SourceContractName} = {EventSequenceId}";
}
}
}
25 changes: 25 additions & 0 deletions src/Ses.Subscriptions/SubscriptionPoller.cs
Original file line number Diff line number Diff line change
Expand Up @@ -231,5 +231,30 @@ private static long GetMinSequenceIdFor(IContractsRegistry contractsRegistry, Li
}
return min ?? 0;
}

public PollerInfo GetInfo(RunnerInfo runnerInfo, IContractsRegistry contractsRegistry, IPollerStateRepository stateRepository)
{
return new PollerInfo(
runnerInfo,
GetType(),
_pollerContractName,
GetSourceSequenceInfo(contractsRegistry, stateRepository));
}

private SourcePollerState[] GetSourceSequenceInfo(IContractsRegistry contractsRegistry, IPollerStateRepository stateRepository)
{
var pollerStates = new List<PollerState>(stateRepository.Load(_pollerContractName));
var list = new List<SourcePollerState>(Sources.Length);
foreach (var source in Sources)
{
var minSequenceId = GetMinSequenceIdFor(contractsRegistry, pollerStates, source);
var sourceContractName = contractsRegistry.GetContractName(source.GetType());
list.Add(new SourcePollerState(_pollerContractName, sourceContractName)
{
EventSequenceId = minSequenceId
});
}
return list.ToArray();
}
}
}

0 comments on commit 8f665e7

Please sign in to comment.