Skip to content

Commit

Permalink
Log Orchestrator GetLogsResponse Cancellation
Browse files Browse the repository at this point in the history
Adding cancellation timeout to GetLogsAsync to enable cancellation on retries
  • Loading branch information
juanfranblanco committed May 13, 2022
1 parent c24083b commit 9387381
Showing 1 changed file with 21 additions and 18 deletions.
39 changes: 21 additions & 18 deletions src/Nethereum.BlockchainProcessing/LogProcessing/LogOrchestrator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,24 +49,21 @@ public class LogOrchestrator : IBlockchainProcessingOrchestrator

var getLogsResponse = await GetLogsAsync(progress, nextBlockNumberFrom, toNumber).ConfigureAwait(false);

if (getLogsResponse == null) return progress;
if (getLogsResponse == null || cancellationToken.IsCancellationRequested) return progress; //allowing all the logs to be processed if not cancelled before hand

var logs = getLogsResponse.Value.Logs;

if (!cancellationToken.IsCancellationRequested) //allowing all the logs to be processed if not cancelled before hand
if (logs != null)
{
if (logs != null)
{
logs = logs.Sort();
await InvokeLogProcessorsAsync(logs).ConfigureAwait(false);
}
progress.BlockNumberProcessTo = getLogsResponse.Value.To;
if (blockProgressRepository != null)
{
await blockProgressRepository.UpsertProgressAsync(progress.BlockNumberProcessTo.Value);
}

logs = logs.Sort();
await InvokeLogProcessorsAsync(logs).ConfigureAwait(false);
}
progress.BlockNumberProcessTo = getLogsResponse.Value.To;
if (blockProgressRepository != null)
{
await blockProgressRepository.UpsertProgressAsync(progress.BlockNumberProcessTo.Value);
}

}

}
Expand Down Expand Up @@ -104,11 +101,14 @@ public GetLogsResponse(BigInteger from, BigInteger to, FilterLog[] logs)
public BigInteger To { get; set;}
}

private async Task<GetLogsResponse?> GetLogsAsync(OrchestrationProgress progress, BigInteger fromBlock, BigInteger toBlock, int retryRequestNumber = 0, int retryNullLogsRequestNumber = 0)
private async Task<GetLogsResponse?> GetLogsAsync(OrchestrationProgress progress, BigInteger fromBlock, BigInteger toBlock, CancellationToken cancellationToken = default(CancellationToken), int retryRequestNumber = 0, int retryNullLogsRequestNumber = 0)
{
try
{


if (cancellationToken.IsCancellationRequested) return null; // check cancellation on entry as this is recursive

var adjustedToBlock =
_blockRangeRequestStrategy.GeBlockNumberToRequestTo(fromBlock, toBlock,
retryRequestNumber);
Expand All @@ -117,10 +117,12 @@ public GetLogsResponse(BigInteger from, BigInteger to, FilterLog[] logs)

var logs = await EthApi.Filters.GetLogs.SendRequestAsync(_filterInput).ConfigureAwait(false);

if (cancellationToken.IsCancellationRequested) return null; // check cancellation after logs as this might be a long call

//If we don't get any, lets retry in case there is an issue with the node.
if(logs == null && retryNullLogsRequestNumber < MaxGetLogsNullRetries)
if (logs == null && retryNullLogsRequestNumber < MaxGetLogsNullRetries)
{
return await GetLogsAsync(progress, fromBlock, toBlock, 0, retryNullLogsRequestNumber + 1).ConfigureAwait(false);
return await GetLogsAsync(progress, fromBlock, toBlock, cancellationToken, 0, retryNullLogsRequestNumber + 1).ConfigureAwait(false);
}
retryRequestNumber = 0;
retryNullLogsRequestNumber = 0;
Expand All @@ -129,14 +131,15 @@ public GetLogsResponse(BigInteger from, BigInteger to, FilterLog[] logs)
}
catch(Exception ex)
{
if (retryRequestNumber >= MaxGetLogsRetries)

if (retryRequestNumber >= MaxGetLogsRetries || cancellationToken.IsCancellationRequested)
{
progress.Exception = ex;
return null;
}
else
{
return await GetLogsAsync(progress, fromBlock, toBlock, retryRequestNumber + 1).ConfigureAwait(false);
return await GetLogsAsync(progress, fromBlock, toBlock, cancellationToken, retryRequestNumber + 1).ConfigureAwait(false);
}
}
}
Expand Down

0 comments on commit 9387381

Please sign in to comment.