Skip to content

Commit

Permalink
feat: Auto recover missing projection store
Browse files Browse the repository at this point in the history
  • Loading branch information
leksyCode committed Jul 19, 2022
1 parent 7863d88 commit feb4fca
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 17 deletions.
8 changes: 7 additions & 1 deletion src/Elders.Cronus/Projections/ProjectionExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Elders.Cronus.Projections.Versioning;
using Elders.Cronus.EventStore.Index.Handlers;
using Elders.Cronus.Projections.Versioning;
using System;

namespace Elders.Cronus.Projections
Expand All @@ -14,5 +15,10 @@ public static bool IsProjectionVersionHandler(this string projectionName)
{
return projectionName.Equals(ProjectionVersionsHandler.ContractId, StringComparison.OrdinalIgnoreCase);
}

public static bool IsEventStoreIndexStatus(this string projectionName)
{
return projectionName.Equals(EventStoreIndexStatus.ContractId, StringComparison.OrdinalIgnoreCase);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public void Replay(string hash, IProjectionVersioningPolicy policy)

public void Rebuild(string hash)
{
EnsureThereIsNoOutdatedBuildingVersions();

if (CanRebuild(hash))
{
ProjectionVersion currentLiveVersion = state.Versions.GetLive();
Expand All @@ -74,18 +76,14 @@ public void VersionRequestTimedout(ProjectionVersion version, VersionRequestTime
}
}

public void NotifyHash(ProjectionVersionManagerId id, string hash, IProjectionVersioningPolicy policy, InMemoryProjectionVersionStore projectionStore)
public void NotifyHash(string hash, IProjectionVersioningPolicy policy)
{
EnsureThereIsNoOutdatedBuildingVersions();

if (ShouldReplay(hash))
{
Replay(hash, policy);
}
else if (ShouldRebuild(id.Id, projectionStore))
{
Rebuild(hash);
}
}

public void FinalizeVersionRequest(ProjectionVersion version)
Expand All @@ -108,13 +106,6 @@ private bool ShouldReplay(string hash)
return isInProgress == false && (state.Versions.HasLiveVersion == false || isNewHashTheLiveOne == false);
}

private bool ShouldRebuild(string projectionName, InMemoryProjectionVersionStore projectionStore)
{
ProjectionVersions versions = projectionStore.Get(projectionName);

return versions.HasLiveVersion == false && versions.HasRebuildingVersion() == false;
}

private bool CanReplay(string hash, IProjectionVersioningPolicy policy)
{
bool isNewHashTheLiveOne = state.Versions.IsHashTheLiveOne(hash);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ public class ProjectionVersionManagerAppService : ApplicationService<ProjectionV
ICommandHandler<RebuildProjectionCommand>
{
private readonly IProjectionVersioningPolicy projectionVersioningPolicy;
private readonly InMemoryProjectionVersionStore projectionStore;
private readonly IProjectionReader projectionReader;

public ProjectionVersionManagerAppService(IAggregateRepository repository, IProjectionVersioningPolicy projectionVersioningPolicy, InMemoryProjectionVersionStore projectionStore) : base(repository)

public ProjectionVersionManagerAppService(IAggregateRepository repository, IProjectionVersioningPolicy projectionVersioningPolicy, IProjectionReader projectionReader) : base(repository)
{
this.projectionVersioningPolicy = projectionVersioningPolicy;
this.projectionStore = projectionStore;
this.projectionReader = projectionReader;
}

public async Task HandleAsync(RegisterProjection command)
Expand All @@ -28,7 +29,12 @@ public async Task HandleAsync(RegisterProjection command)
if (result.IsSuccess)
{
ar = result.Data;
ar.NotifyHash(command.Id, command.Hash, projectionVersioningPolicy, projectionStore);
ar.NotifyHash(command.Hash, projectionVersioningPolicy);

if (await ShouldRebuildMissingSystemProjectionsAsync(command.Id, projectionReader).ConfigureAwait(false))
{
ar.Rebuild(command.Hash);
}
}

if (result.NotFound)
Expand Down Expand Up @@ -61,5 +67,23 @@ public Task HandleAsync(TimeoutProjectionVersionRequest command)
{
return UpdateAsync(command.Id, ar => ar.VersionRequestTimedout(command.Version, command.Timebox));
}

private async Task<bool> ShouldRebuildMissingSystemProjectionsAsync(ProjectionVersionManagerId projectionId, IProjectionReader projectionReader)
{
string projectionName = projectionId.Id;

if (projectionName.IsProjectionVersionHandler() || projectionName.IsEventStoreIndexStatus())
{
ReadResult<ProjectionVersionsHandler> result = await projectionReader.GetAsync<ProjectionVersionsHandler>(projectionId).ConfigureAwait(false);
if (result.IsSuccess)
{
ProjectionVersions versions = result.Data.State.AllVersions;
return versions.HasLiveVersion == false && versions.HasRebuildingVersion() == false;
}
return true;
}

return false;
}
}
}

0 comments on commit feb4fca

Please sign in to comment.