Skip to content

Commit

Permalink
fix: Fix overflow of processed count when rebuilding projcetion.
Browse files Browse the repository at this point in the history
Tip: You should be able to using it only with Cronus.Api 8.0.5 or higher
  • Loading branch information
leksyCode committed Jun 15, 2022
1 parent 832f377 commit a9bd003
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 11 deletions.
6 changes: 3 additions & 3 deletions src/Elders.Cronus/Projections/Rebuilding/ProgressTracker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ public class ProgressTracker
private readonly ILogger<ProgressTracker> logger;

public string ProjectionName { get; set; }
public long Processed { get; set; }
public long TotalEvents { get; set; }
public ulong Processed { get; set; }
public ulong TotalEvents { get; set; }


public ProgressTracker(IMessageCounter messageCounter, CronusContext context, IPublisher<ISignal> signalPublisher, ProjectionVersionHelper projectionVersionHelper, ILogger<ProgressTracker> logger)
Expand All @@ -39,7 +39,7 @@ public async Task InitializeAsync(ProjectionVersion version)
ProjectionName = version.ProjectionName;
IEnumerable<Type> projectionHandledEventTypes = projectionVersionHelper.GetInvolvedEventTypes(ProjectionName.GetTypeByContract());
foreach (var eventType in projectionHandledEventTypes)
TotalEvents += await messageCounter.GetCountAsync(eventType).ConfigureAwait(false);
TotalEvents += (ulong)await messageCounter.GetCountAsync(eventType).ConfigureAwait(false);

Processed = 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,11 @@ private async Task<bool> RebuildEventsAsync(string eventTypeId, IClusterOperatio
{
RebuildProjection_JobData.EventPaging paging = Data.EventTypePaging.Where(et => et.Type.Equals(eventTypeId, StringComparison.OrdinalIgnoreCase)).FirstOrDefault();

long pagingProcessedCount = Data.EventTypePaging.Select(p => p.ProcessedCount).DefaultIfEmpty(0).Sum();
progressTracker.Processed = pagingProcessedCount;
unchecked
{
ulong pagingProcessedCount = Data.EventTypePaging.Select(p => p.ProcessedCount).DefaultIfEmpty().Aggregate((x, y) => x + y);
progressTracker.Processed = pagingProcessedCount;
}

string paginationToken = paging?.PaginationToken;
LoadIndexRecordsResult indexRecordsResult = await eventToAggregateIndex.EnumerateRecordsAsync(eventTypeId, paginationToken).ConfigureAwait(false); // TODO: Think about cassandra exception here. Such exceptions MUST be handled in the concrete impl of the IndexStore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void Init(EventPaging progress)

public partial class EventPaging
{
public EventPaging(string eventTypeId, string paginationToken, long processedCount, long totalCount)
public EventPaging(string eventTypeId, string paginationToken, ulong processedCount, ulong totalCount)
{
Type = eventTypeId;
PaginationToken = paginationToken;
Expand All @@ -66,9 +66,9 @@ public EventPaging(string eventTypeId, string paginationToken, long processedCou

public string PaginationToken { get; set; }

public long ProcessedCount { get; set; }
public ulong ProcessedCount { get; set; }

public long TotalCount { get; set; }
public ulong TotalCount { get; set; }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ public class RebuildProjectionProgress : ISignal
{
public RebuildProjectionProgress() { }

public RebuildProjectionProgress(string tenant, string projectionTypeId, long processedCount, long totalCount)
public RebuildProjectionProgress(string tenant, string projectionTypeId, ulong processedCount, ulong totalCount)
{
Tenant = tenant;
ProjectionTypeId = projectionTypeId;
Expand All @@ -22,9 +22,9 @@ public RebuildProjectionProgress(string tenant, string projectionTypeId, long pr
public string ProjectionTypeId { get; set; }

[DataMember(Order = 2)]
public long ProcessedCount { get; set; }
public ulong ProcessedCount { get; set; }

[DataMember(Order = 3)]
public long TotalCount { get; set; }
public ulong TotalCount { get; set; }
}
}

0 comments on commit a9bd003

Please sign in to comment.