Skip to content

Commit

Permalink
fix: Fix projection job progress
Browse files Browse the repository at this point in the history
  • Loading branch information
oleksii202 committed Jul 28, 2022
1 parent 2519585 commit acc90e0
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
using System.Threading.Tasks;
using Elders.Cronus.EventStore.Index;
using Elders.Cronus.EventStore;
using System;

namespace Elders.Cronus.Projections.Rebuilding
{
public interface IRebuildingProjectionRepository
{
public Task<IEnumerable<EventStream>> LoadEventsAsync(IEnumerable<IndexRecord> indexRecords, ProjectionVersion version);
public Task SaveAggregateCommitsAsync(IEnumerable<EventStream> eventStreams, RebuildProjection_JobData jobData
);
public Task SaveAggregateCommitsAsync(IEnumerable<EventStream> eventStreams, string eventType, RebuildProjection_JobData jobData);
}
}
32 changes: 25 additions & 7 deletions src/Elders.Cronus/Projections/Rebuilding/ProgressTracker.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Elders.Cronus.EventStore;
using Elders.Cronus.MessageProcessing;
Expand All @@ -16,12 +17,12 @@ public class ProgressTracker
private readonly ILogger<ProgressTracker> logger;

public string ProjectionName { get; set; }
public ulong Processed { get; set; }
public Dictionary<string, ulong> EventTypeProcessed { get; set; }
public ulong TotalEvents { get; set; }


public ProgressTracker(IMessageCounter messageCounter, CronusContext context, IPublisher<ISystemSignal> signalPublisher, ProjectionVersionHelper projectionVersionHelper, ILogger<ProgressTracker> logger)
{
EventTypeProcessed = new Dictionary<string, ulong>();
tenant = context.Tenant;
this.messageCounter = messageCounter;
this.signalPublisher = signalPublisher;
Expand All @@ -35,34 +36,41 @@ public ProgressTracker(IMessageCounter messageCounter, CronusContext context, IP
/// <param name="version">Projection version that should be initialized</param>
public async Task InitializeAsync(ProjectionVersion version)
{
EventTypeProcessed = new Dictionary<string, ulong>();

ProjectionName = version.ProjectionName;
IEnumerable<Type> projectionHandledEventTypes = projectionVersionHelper.GetInvolvedEventTypes(ProjectionName.GetTypeByContract());
foreach (var eventType in projectionHandledEventTypes)
{
TotalEvents += (ulong)await messageCounter.GetCountAsync(eventType).ConfigureAwait(false);

Processed = 0;
EventTypeProcessed.Add(eventType.GetContractId(), 0);
}
}

/// <summary>
/// Finishes the action and sending incrementing progress signal
/// </summary>
/// <param name="action"></param>
/// <returns></returns>
public async Task CompleteActionWithProgressSignalAsync(Func<Task> action)
public async Task CompleteActionWithProgressSignalAsync(Func<Task> action, string eventType)
{
try
{
await action().ConfigureAwait(false);

var progressSignalche = new RebuildProjectionProgress(tenant, ProjectionName, ++Processed, TotalEvents);
var progressSignalche = GetProgressSignal();
signalPublisher.Publish(progressSignalche);

EventTypeProcessed.TryGetValue(eventType, out ulong processed);
EventTypeProcessed[eventType] = ++processed;

}
catch (Exception ex) when (logger.ErrorException(ex, () => $"Error when saving aggregate commit for projection {ProjectionName}")) { }
}

public RebuildProjectionProgress GetProgressSignal()
{
return new RebuildProjectionProgress(tenant, ProjectionName, Processed, TotalEvents);
return new RebuildProjectionProgress(tenant, ProjectionName, CountTotalProcessedEvents(), TotalEvents);
}

public RebuildProjectionFinished GetProgressFinishedSignal()
Expand All @@ -74,5 +82,15 @@ public RebuildProjectionStarted GetProgressStartedSignal()
{
return new RebuildProjectionStarted(tenant, ProjectionName);
}

public ulong CountTotalProcessedEvents()
{
ulong totalProcessed = 0;
foreach (var typeProcessed in EventTypeProcessed)
{
totalProcessed += typeProcessed.Value;
}
return totalProcessed;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,11 @@ private async Task<bool> RebuildEventsAsync(string eventTypeId, IClusterOperatio
{
RebuildProjection_JobData.EventPaging paging = Data.EventTypePaging.Where(et => et.Type.Equals(eventTypeId, StringComparison.OrdinalIgnoreCase)).FirstOrDefault();

unchecked
{
ulong pagingProcessedCount = Data.EventTypePaging.Select(p => p.ProcessedCount).DefaultIfEmpty().Aggregate((x, y) => x + y);
progressTracker.Processed = pagingProcessedCount;
}

string paginationToken = paging?.PaginationToken;
LoadIndexRecordsResult indexRecordsResult = await eventToAggregateIndex.EnumerateRecordsAsync(eventTypeId, paginationToken).ConfigureAwait(false); // TODO: Think about cassandra exception here. Such exceptions MUST be handled in the concrete impl of the IndexStore
IEnumerable<EventStream> eventStreams = await rebuildingRepository.LoadEventsAsync(indexRecordsResult.Records, Data.Version).ConfigureAwait(false);

await rebuildingRepository.SaveAggregateCommitsAsync(eventStreams, Data).ConfigureAwait(false);
await rebuildingRepository.SaveAggregateCommitsAsync(eventStreams, eventTypeId, Data).ConfigureAwait(false);

await NotifyClusterAsync(eventTypeId, indexRecordsResult.PaginationToken, cluster, cancellationToken).ConfigureAwait(false);

Expand All @@ -127,7 +121,8 @@ private async Task<bool> RebuildEventsAsync(string eventTypeId, IClusterOperatio

private async Task NotifyClusterAsync(string eventTypeId, string paginationToken, IClusterOperations cluster, CancellationToken cancellationToken)
{
var progress = new RebuildProjection_JobData.EventPaging(eventTypeId, paginationToken, progressTracker.Processed, progressTracker.TotalEvents);
ulong totalProcessed = progressTracker.CountTotalProcessedEvents();
var progress = new RebuildProjection_JobData.EventPaging(eventTypeId, paginationToken, totalProcessed, progressTracker.TotalEvents);

Data.MarkEventTypeProgress(progress);
Data.Timestamp = DateTimeOffset.UtcNow;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public async Task<IEnumerable<EventStream>> LoadEventsAsync(IEnumerable<IndexRec
return streamTasks.Select(t => t.Result);
}

public async Task SaveAggregateCommitsAsync(IEnumerable<EventStream> eventStreams, RebuildProjection_JobData Data)
public async Task SaveAggregateCommitsAsync(IEnumerable<EventStream> eventStreams, string eventType, RebuildProjection_JobData Data)
{
List<Task> indexingTasks = new List<Task>();

Expand All @@ -62,11 +62,11 @@ public async Task SaveAggregateCommitsAsync(IEnumerable<EventStream> eventStream
{
foreach (AggregateCommit arCommit in stream.Commits)
{
Task indexAction = progressTracker.CompleteActionWithProgressSignalAsync(() => index.IndexAsync(arCommit, Data.Version));
Task indexAction = progressTracker.CompleteActionWithProgressSignalAsync(() => index.IndexAsync(arCommit, Data.Version), eventType);
indexingTasks.Add(indexAction);
}
}
catch (Exception ex) when (logger.WarnException(ex, () => $"{stream.ToString()} was skipped when rebuilding {Data.Version.ProjectionName}.")) { }
catch (Exception ex) when (logger.WarnException(ex, () => $"{stream} was skipped when rebuilding {Data.Version.ProjectionName}.")) { }
}

await Task.WhenAll(indexingTasks.ToArray()).ConfigureAwait(false);
Expand Down

0 comments on commit acc90e0

Please sign in to comment.