diff --git a/src/ConductorSharp.Client/ConductorSharp.Client.csproj b/src/ConductorSharp.Client/ConductorSharp.Client.csproj index 4a573a76..922782ed 100644 --- a/src/ConductorSharp.Client/ConductorSharp.Client.csproj +++ b/src/ConductorSharp.Client/ConductorSharp.Client.csproj @@ -6,7 +6,7 @@ Codaxy Codaxy ConductorSharp.Client - 3.4.0 + 3.4.1 Client library for Netflix Conductor, with some additional quality of life features. https://github.com/codaxy/conductor-sharp netflix;conductor diff --git a/src/ConductorSharp.Engine/Behaviors/RequestResponseLoggingBehavior.cs b/src/ConductorSharp.Engine/Behaviors/RequestResponseLoggingBehavior.cs index a16e1eee..1cd81959 100644 --- a/src/ConductorSharp.Engine/Behaviors/RequestResponseLoggingBehavior.cs +++ b/src/ConductorSharp.Engine/Behaviors/RequestResponseLoggingBehavior.cs @@ -11,14 +11,20 @@ namespace ConductorSharp.Engine.Behaviors { + // TODO: Consider removing this public class RequestResponseLoggingBehavior : IPipelineBehavior where TRequest : IRequest { private readonly ILogger> _logger; + private readonly ConductorSharpExecutionContext _context; - public RequestResponseLoggingBehavior(ILogger> logger) + public RequestResponseLoggingBehavior( + ILogger> logger, + ConductorSharpExecutionContext context + ) { _logger = logger; + _context = context; } public async Task Handle(TRequest request, RequestHandlerDelegate next, CancellationToken cancellationToken) @@ -52,14 +58,9 @@ public async Task Handle(TRequest request, RequestHandlerDelegateCodaxy Codaxy ConductorSharp.Engine - 3.4.0 + 3.4.1 Client library for Netflix Conductor, with some additional quality of life features. https://github.com/codaxy/conductor-sharp netflix;conductor diff --git a/src/ConductorSharp.Engine/ExecutionManager.cs b/src/ConductorSharp.Engine/ExecutionManager.cs index e4306b00..4ca31c49 100644 --- a/src/ConductorSharp.Engine/ExecutionManager.cs +++ b/src/ConductorSharp.Engine/ExecutionManager.cs @@ -140,21 +140,7 @@ private async Task PollAndHandle(TaskToWorker scheduledWorker, CancellationToken return; } - try - { - using var tokenHolder = _cancellationNotifier.GetCancellationToken(pollResponse.TaskId, cancellationToken); - await ProcessPolledTask(pollResponse, workerId, scheduledWorker, tokenHolder.CancellationToken); - } - catch (TaskCanceledException) - { - _logger.LogWarning( - "Polled task {Task}(id={TaskId}) of workflow {Workflow}(id={WorkflowId}) is cancelled", - pollResponse.TaskDefName, - pollResponse.TaskId, - pollResponse.WorkflowType, - pollResponse.WorkflowInstanceId - ); - } + await ProcessPolledTask(pollResponse, workerId, scheduledWorker, cancellationToken); } private async Task ProcessPolledTask( @@ -164,6 +150,8 @@ private async Task ProcessPolledTask( CancellationToken cancellationToken ) { + using var tokenHolder = _cancellationNotifier.GetCancellationToken(pollResponse.TaskId, cancellationToken); + try { if (!string.IsNullOrEmpty(pollResponse.ExternalInputPayloadStoragePath)) @@ -178,7 +166,7 @@ CancellationToken cancellationToken ); // TODO: iffy - var file = await _externalPayloadService.GetExternalStorageDataAsync(externalStorageLocation.Path, cancellationToken); + var file = await _externalPayloadService.GetExternalStorageDataAsync(externalStorageLocation.Path, tokenHolder.CancellationToken); using TextReader textReader = new StreamReader(file.Stream); var json = await textReader.ReadToEndAsync(); @@ -211,7 +199,7 @@ CancellationToken cancellationToken context.WorkerId = workerId; } - var response = await mediator.Send(inputData, cancellationToken); + var response = await mediator.Send(inputData, tokenHolder.CancellationToken); await _taskManager.UpdateAsync( new TaskResult @@ -221,13 +209,28 @@ await _taskManager.UpdateAsync( OutputData = SerializationHelper.ObjectToDictionary(response, ConductorConstants.IoJsonSerializerSettings), WorkflowInstanceId = pollResponse.WorkflowInstanceId }, - cancellationToken + tokenHolder.CancellationToken ); } - catch (TaskCanceledException) + catch (OperationCanceledException) when (tokenHolder.IsCancellationRequestedByNotifier) { - // Propagate this exception to outer handler - throw; + _logger.LogWarning( + "Polled task {Task}(id={TaskId}) of workflow {Workflow}(id={WorkflowId}) is cancelled", + pollResponse.TaskDefName, + pollResponse.TaskId, + pollResponse.WorkflowType, + pollResponse.WorkflowInstanceId + ); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) // This is fine since we know cancellationToken comes from background service + { + _logger.LogWarning( + "Cancelling task {Task}(id={TaskId}) of workflow {Workflow}(id={WorkflowId}) due to background service shutdown", + pollResponse.TaskDefName, + pollResponse.TaskId, + pollResponse.WorkflowType, + pollResponse.WorkflowInstanceId + ); } catch (Exception exception) { @@ -256,10 +259,10 @@ await Task.WhenAll( OutputData = SerializationHelper.ObjectToDictionary(errorMessage, ConductorConstants.IoJsonSerializerSettings), WorkflowInstanceId = pollResponse?.WorkflowInstanceId }, - cancellationToken + tokenHolder.CancellationToken ), - _taskManager.LogAsync(pollResponse.TaskId, exception.Message, cancellationToken), - _taskManager.LogAsync(pollResponse.TaskId, exception.StackTrace, cancellationToken) + _taskManager.LogAsync(pollResponse.TaskId, exception.Message, tokenHolder.CancellationToken), + _taskManager.LogAsync(pollResponse.TaskId, exception.StackTrace, tokenHolder.CancellationToken) ] ); } diff --git a/src/ConductorSharp.Engine/Interface/ICancellationNotifier.cs b/src/ConductorSharp.Engine/Interface/ICancellationNotifier.cs index 66f6e9e1..08ff5171 100644 --- a/src/ConductorSharp.Engine/Interface/ICancellationNotifier.cs +++ b/src/ConductorSharp.Engine/Interface/ICancellationNotifier.cs @@ -8,6 +8,7 @@ public interface ICancellationNotifier public interface ICancellationTokenHolder : IDisposable { CancellationToken CancellationToken { get; } + bool IsCancellationRequestedByNotifier { get; } } ICancellationTokenHolder GetCancellationToken(string taskId, CancellationToken engineCancellationToken); diff --git a/src/ConductorSharp.Engine/Service/NoOpCancellationNotifier.cs b/src/ConductorSharp.Engine/Service/NoOpCancellationNotifier.cs index b0fcb94e..e2c1c5d8 100644 --- a/src/ConductorSharp.Engine/Service/NoOpCancellationNotifier.cs +++ b/src/ConductorSharp.Engine/Service/NoOpCancellationNotifier.cs @@ -10,6 +10,8 @@ internal class PassthroughCancellationTokenHolder(CancellationToken cancellation { public CancellationToken CancellationToken { get; } = cancellationToken; + public bool IsCancellationRequestedByNotifier => false; + public void Dispose() { } } diff --git a/src/ConductorSharp.KafkaCancellationNotifier/ConductorSharp.KafkaCancellationNotifier.csproj b/src/ConductorSharp.KafkaCancellationNotifier/ConductorSharp.KafkaCancellationNotifier.csproj index 3252027b..f49c50b2 100644 --- a/src/ConductorSharp.KafkaCancellationNotifier/ConductorSharp.KafkaCancellationNotifier.csproj +++ b/src/ConductorSharp.KafkaCancellationNotifier/ConductorSharp.KafkaCancellationNotifier.csproj @@ -4,7 +4,7 @@ net6.0 enable enable - 3.4.0 + 3.4.1 Codaxy Codaxy diff --git a/src/ConductorSharp.KafkaCancellationNotifier/Service/KafkaCancellationNotifier.cs b/src/ConductorSharp.KafkaCancellationNotifier/Service/KafkaCancellationNotifier.cs index e4e6b583..7b700e5d 100644 --- a/src/ConductorSharp.KafkaCancellationNotifier/Service/KafkaCancellationNotifier.cs +++ b/src/ConductorSharp.KafkaCancellationNotifier/Service/KafkaCancellationNotifier.cs @@ -16,6 +16,8 @@ internal class CancellationTokenSourceHolder : ICancellationNotifier.ICancellati public CancellationToken CancellationToken { get; } + public bool IsCancellationRequestedByNotifier => _notifier.IsCancellationRequested(_taskId); + public CancellationTokenSourceHolder(CancellationToken cancellationToken, string taskId, KafkaCancellationNotifier notifier) { CancellationToken = cancellationToken; @@ -26,10 +28,21 @@ public CancellationTokenSourceHolder(CancellationToken cancellationToken, string public void Dispose() => _notifier.ClearTaskCts(_taskId); } + private class TaskCancellationInfo + { + public TaskCancellationInfo(CancellationTokenSource cancellationTokenSource) + { + CancellationTokenSource = cancellationTokenSource; + } + + public CancellationTokenSource CancellationTokenSource { get; } + public bool IsCancellationRequested { get; set; } + } + private readonly HashSet _tasks; private readonly object _lock = new(); private readonly ILogger _logger; - private readonly Dictionary _taskIdToCtsMap = new(); + private readonly Dictionary _taskIdToInfoMap = new(); public KafkaCancellationNotifier(IEnumerable tasks, ILogger logger) { @@ -39,8 +52,8 @@ public KafkaCancellationNotifier(IEnumerable tasks, ILoggerFalse Codaxy Codaxy - 3.4.0 + 3.4.1