Skip to content

Commit

Permalink
feat: Ensure canceling of projections is working (for better experien…
Browse files Browse the repository at this point in the history
…ce we should deploy UI )
  • Loading branch information
leksyCode committed Jun 15, 2022
1 parent 36fcc14 commit 1186428
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 9 deletions.
1 change: 1 addition & 0 deletions src/Elders.Cronus/Cluster/Job/JobExecutionStatus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
public enum JobExecutionStatus
{
Completed,
Canceled,
Failed,
Running
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ namespace Elders.Cronus.Projections.Rebuilding
public interface IRebuildingProjectionRepository
{
public Task<IEnumerable<EventStream>> LoadEventsAsync(IEnumerable<IndexRecord> indexRecords, ProjectionVersion version);
public Task SaveAggregateCommitsAsync(IEnumerable<EventStream> eventStreams, ProjectionVersion version);
public Task SaveAggregateCommitsAsync(IEnumerable<EventStream> eventStreams, RebuildProjection_JobData jobData
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ public async Task<bool> ShouldBeCanceledAsync(ProjectionVersion version, DateTim
logger.Error(() => $"Version `{version}` is outdated. There is a newer one which is already live.");
return true;
}
if (allVersions.IsCanceled(version) && version.ProjectionName.Equals(ProjectionVersionsHandler.ContractId, StringComparison.OrdinalIgnoreCase) == false)
else if (allVersions.IsCanceled(version) && version.ProjectionName.Equals(ProjectionVersionsHandler.ContractId, StringComparison.OrdinalIgnoreCase) == false)
{
logger.Error(() => $"Version `{version}` was canceled.false");
logger.Error(() => $"Version `{version}` was canceled");
return true;
}

Expand Down
35 changes: 33 additions & 2 deletions src/Elders.Cronus/Projections/Rebuilding/RebuildProjection_Job.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,13 @@ protected override async Task<JobExecutionStatus> RunJobAsync(IClusterOperations
return JobExecutionStatus.Running;

if (await projectionVersionHelper.ShouldBeCanceledAsync(version, Data.DueDate).ConfigureAwait(false))
return JobExecutionStatus.Failed;
{
if (Data.IsCanceled == false)
await CancelJobAsync(cluster).ConfigureAwait(false);

logger.Info(() => $"The job {version} has been cancelled.");
return JobExecutionStatus.Canceled;
}

await projectionStoreInitializer.InitializeAsync(version).ConfigureAwait(false);

Expand All @@ -66,6 +72,21 @@ protected override async Task<JobExecutionStatus> RunJobAsync(IClusterOperations
bool hasMoreRecords = true;
while (hasMoreRecords && Data.IsCompleted == false)
{
if (cancellationToken.IsCancellationRequested)
{
logger.Info(() => $"The job {version} has been cancelled.");
return JobExecutionStatus.Running;
}

if (Data.IsCanceled || await projectionVersionHelper.ShouldBeCanceledAsync(version, Data.DueDate).ConfigureAwait(false))
{
if (Data.IsCanceled == false)
await CancelJobAsync(cluster).ConfigureAwait(false);

logger.Info(() => $"The job {version} has been cancelled.");
return JobExecutionStatus.Canceled;
}

hasMoreRecords = await RebuildEventsAsync(eventTypeId, cluster, cancellationToken).ConfigureAwait(false);
}
}
Expand All @@ -92,7 +113,7 @@ private async Task<bool> RebuildEventsAsync(string eventTypeId, IClusterOperatio
LoadIndexRecordsResult indexRecordsResult = await eventToAggregateIndex.EnumerateRecordsAsync(eventTypeId, paginationToken).ConfigureAwait(false); // TODO: Think about cassandra exception here. Such exceptions MUST be handled in the concrete impl of the IndexStore
IEnumerable<EventStream> eventStreams = await rebuildingRepository.LoadEventsAsync(indexRecordsResult.Records, Data.Version).ConfigureAwait(false);

await rebuildingRepository.SaveAggregateCommitsAsync(eventStreams, Data.Version).ConfigureAwait(false);
await rebuildingRepository.SaveAggregateCommitsAsync(eventStreams, Data).ConfigureAwait(false);

await NotifyClusterAsync(eventTypeId, indexRecordsResult.PaginationToken, cluster, cancellationToken).ConfigureAwait(false);

Expand All @@ -113,6 +134,16 @@ private async Task NotifyClusterAsync(string eventTypeId, string paginationToken
signalPublisher.Publish(progressSignal);
}

private async Task CancelJobAsync(IClusterOperations cluster)
{
Data.IsCanceled = true;
Data.Timestamp = DateTimeOffset.UtcNow;
Data = await cluster.PingAsync(Data).ConfigureAwait(false);

var finishSignal = progressTracker.GetProgressFinishedSignal();
signalPublisher.Publish(finishSignal);
}

protected override RebuildProjection_JobData Override(RebuildProjection_JobData fromCluster, RebuildProjection_JobData fromLocal)
{
if (fromCluster.IsCompleted && fromCluster.Timestamp < fromLocal.Timestamp || fromCluster.Version < fromLocal.Version)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@ public class RebuildProjection_JobData : IJobData
public RebuildProjection_JobData()
{
IsCompleted = false;
IsCanceled = false;
EventTypePaging = new List<EventPaging>();
Timestamp = DateTimeOffset.UtcNow;
DueDate = DateTimeOffset.MaxValue;
}

public bool IsCompleted { get; set; }

public bool IsCanceled { get; set; }

public List<EventPaging> EventTypePaging { get; set; }

public ProjectionVersion Version { get; set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,24 @@ public async Task<IEnumerable<EventStream>> LoadEventsAsync(IEnumerable<IndexRec
return streamTasks.Select(t => t.Result);
}

public async Task SaveAggregateCommitsAsync(IEnumerable<EventStream> eventStreams, ProjectionVersion version)
public async Task SaveAggregateCommitsAsync(IEnumerable<EventStream> eventStreams, RebuildProjection_JobData Data)
{
List<Task> indexingTasks = new List<Task>();

foreach (EventStream stream in eventStreams)
{
if (Data.IsCanceled)
return;

try
{
foreach (AggregateCommit arCommit in stream.Commits)
{
Task indexAction = progressTracker.CompleteActionWithProgressSignalAsync(() => index.IndexAsync(arCommit, version));
Task indexAction = progressTracker.CompleteActionWithProgressSignalAsync(() => index.IndexAsync(arCommit, Data.Version));
indexingTasks.Add(indexAction);
}
}
catch (Exception ex) when (logger.WarnException(ex, () => $"{stream.ToString()} was skipped when rebuilding {version.ProjectionName}.")) { }
catch (Exception ex) when (logger.WarnException(ex, () => $"{stream.ToString()} was skipped when rebuilding {Data.Version.ProjectionName}.")) { }
}

await Task.WhenAll(indexingTasks.ToArray()).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ public async Task HandleAsync(CreateNewProjectionVersion sagaTimeout)
JobExecutionStatus result = await jobRunner.ExecuteAsync(job).ConfigureAwait(false);
logger.Debug(() => "Replay projection version {@cronus_projection_rebuild}", result);


if (result == JobExecutionStatus.Running)
{
RequestTimeout(new CreateNewProjectionVersion(sagaTimeout.ProjectionVersionRequest, DateTime.UtcNow.AddSeconds(30)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ private bool CanReplay(string hash, IProjectionVersioningPolicy policy)

private bool CanRebuild(ProjectionVersion currentLiveVersion)
{
if (currentLiveVersion is null)
return false;

bool hashMatchesCurrentLiveVersion = currentLiveVersion.Hash.Equals(currentLiveVersion.Hash);
bool hasRebuildingVersion = state.Versions.HasRebuildingVersion();

Expand Down

0 comments on commit 1186428

Please sign in to comment.