Skip to content

Commit

Permalink
feat: Tries to stay away from optimizations that could lead to the co…
Browse files Browse the repository at this point in the history
…nnections overflow
  • Loading branch information
leksyCode committed Aug 10, 2022
1 parent 30a244f commit c88b07a
Show file tree
Hide file tree
Showing 5 changed files with 6 additions and 26 deletions.
10 changes: 2 additions & 8 deletions src/Elders.Cronus/EventStore/Index/ProjectionIndex.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ public async Task IndexAsync(CronusMessage message)
if (message.IsRepublished)
projectionTypes = message.RecipientHandlers.Intersect(projectionsContainer.Items.Select(t => t.GetContractId())).Select(dc => dc.GetTypeByContract());

List<Task> projectionSaveTasks = new List<Task>();
Type messagePayloadType = message.Payload.GetType();
foreach (var projectionType in projectionTypes)
{
Expand All @@ -42,11 +41,9 @@ public async Task IndexAsync(CronusMessage message)

if (isInterested)
{
projectionSaveTasks.Add(projection.SaveAsync(projectionType, message));
await projection.SaveAsync(projectionType, message).ConfigureAwait(false);
}
}

await Task.WhenAll(projectionSaveTasks).ConfigureAwait(false);
}

public async Task IndexAsync(AggregateCommit aggregateCommit, ProjectionVersion version)
Expand All @@ -55,7 +52,6 @@ public async Task IndexAsync(AggregateCommit aggregateCommit, ProjectionVersion

foreach (var eventData in eventDataList)
{
List<Task> projectionSaveTasks = new List<Task>();
Type messagePayloadType = eventData.Item1.GetType();
foreach (var projectionType in projectionsContainer.Items)
{
Expand All @@ -69,11 +65,9 @@ public async Task IndexAsync(AggregateCommit aggregateCommit, ProjectionVersion

if (isInterested)
{
projectionSaveTasks.Add(projection.SaveAsync(projectionType, eventData.Item1, eventData.Item2, version));
await projection.SaveAsync(projectionType, eventData.Item1, eventData.Item2, version).ConfigureAwait(false);
}
}

await Task.WhenAll(projectionSaveTasks).ConfigureAwait(false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,12 @@ protected override async Task<JobExecutionStatus> RunJobAsync(IClusterOperations

LoadAggregateCommitsResult result = await eventStorePlayer.LoadAggregateCommitsAsync(Data.PaginationToken).ConfigureAwait(false);

List<Task> indexTasks = new List<Task>();

logger.Info(() => $"Loaded aggregate commits count ${result.Commits.Count} using pagination token {result.PaginationToken}");
foreach (var aggregateCommit in result.Commits)
{
indexTasks.Add(index.IndexAsync(aggregateCommit));
await index.IndexAsync(aggregateCommit).ConfigureAwait(false);
}

await Task.WhenAll(indexTasks).ConfigureAwait(false);

Data.PaginationToken = result.PaginationToken;
Data = await cluster.PingAsync(Data, cancellationToken).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ protected override async Task<JobExecutionStatus> RunJobAsync(IClusterOperations
string mess = Encoding.UTF8.GetString(indexRecord.AggregateRootId);
IAggregateRootId arId = GetAggregateRootId(mess);
EventStream stream = await eventStore.LoadAsync(arId).ConfigureAwait(false);
List<Task> incrementTasks = new List<Task>();

foreach (AggregateCommit arCommit in stream.Commits)
{
Expand All @@ -84,11 +83,10 @@ protected override async Task<JobExecutionStatus> RunJobAsync(IClusterOperations
}

if (eventTypeId.Equals(@event.GetType().GetContractId(), StringComparison.OrdinalIgnoreCase))
incrementTasks.Add(messageCounter.IncrementAsync(eventType));
await messageCounter.IncrementAsync(eventType).ConfigureAwait(false);
}
}

await Task.WhenAll(incrementTasks).ConfigureAwait(false);
}

Data.MarkPaginationTokenAsProcessed(eventTypeId, indexRecordsResult.PaginationToken);
Expand Down
6 changes: 1 addition & 5 deletions src/Elders.Cronus/Migrations/CopyEventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,14 @@ public override async Task RunAsync(IEnumerable<IMigration<AggregateCommitRaw>>
{
int counter = 0;
var arCommits = source.LoadAggregateCommitsRawAsync(5000).ConfigureAwait(false);
List<Task> appendTasks = new List<Task>();

await foreach (AggregateCommitRaw sourceCommit in arCommits)
{
if (counter % 10000 == 0) logger.Info(() => $"[Migrations] Migrated records: {counter}");

appendTasks.Add(target.AppendAsync(sourceCommit));
await target.AppendAsync(sourceCommit).ConfigureAwait(false);

counter++;
}

await Task.WhenAll(appendTasks).ConfigureAwait(false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ public async Task<IEnumerable<EventStream>> LoadEventsAsync(IEnumerable<IndexRec

public async Task SaveAggregateCommitsAsync(IEnumerable<EventStream> eventStreams, string eventType, RebuildProjection_JobData Data)
{
List<Task> indexingTasks = new List<Task>();

foreach (EventStream stream in eventStreams)
{
if (Data.IsCanceled)
Expand All @@ -62,14 +60,11 @@ public async Task SaveAggregateCommitsAsync(IEnumerable<EventStream> eventStream
{
foreach (AggregateCommit arCommit in stream.Commits)
{
Task indexAction = progressTracker.CompleteActionWithProgressSignalAsync(() => index.IndexAsync(arCommit, Data.Version), eventType);
indexingTasks.Add(indexAction);
await progressTracker.CompleteActionWithProgressSignalAsync(() => index.IndexAsync(arCommit, Data.Version)).ConfigureAwait(false);
}
}
catch (Exception ex) when (logger.WarnException(ex, () => $"{stream} was skipped when rebuilding {Data.Version.ProjectionName}.")) { }
}

await Task.WhenAll(indexingTasks.ToArray()).ConfigureAwait(false);
}

private bool ShouldLoadAggregate(int aggreagteRootIdHash)
Expand Down

0 comments on commit c88b07a

Please sign in to comment.