Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/ConductorSharp.Client/ConductorSharp.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<Authors>Codaxy</Authors>
<Company>Codaxy</Company>
<PackageId>ConductorSharp.Client</PackageId>
<Version>3.4.0</Version>
<Version>3.4.1</Version>
<Description>Client library for Netflix Conductor, with some additional quality of life features.</Description>
<RepositoryUrl>https://github.com/codaxy/conductor-sharp</RepositoryUrl>
<PackageTags>netflix;conductor</PackageTags>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,20 @@

namespace ConductorSharp.Engine.Behaviors
{
// TODO: Consider removing this
public class RequestResponseLoggingBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
where TRequest : IRequest<TResponse>
{
private readonly ILogger<RequestResponseLoggingBehavior<TRequest, TResponse>> _logger;
private readonly ConductorSharpExecutionContext _context;

public RequestResponseLoggingBehavior(ILogger<RequestResponseLoggingBehavior<TRequest, TResponse>> logger)
public RequestResponseLoggingBehavior(
ILogger<RequestResponseLoggingBehavior<TRequest, TResponse>> logger,
ConductorSharpExecutionContext context
)
{
_logger = logger;
_context = context;
}

public async Task<TResponse> Handle(TRequest request, RequestHandlerDelegate<TResponse> next, CancellationToken cancellationToken)
Expand Down Expand Up @@ -52,14 +58,9 @@ public async Task<TResponse> Handle(TRequest request, RequestHandlerDelegate<TRe

return response;
}
catch (TaskCanceledException)
catch (OperationCanceledException) when (_context.TaskId != null)
{
_logger.LogWarning(
$"Request {{Request}} cancelled with payload {{@{requestName}}} and with id {{RequestId}}",
requestName,
request,
requestId
);
// Simply rethrow and do not log in order for cancellation notifier to work
throw;
}
catch (Exception exc)
Expand Down
2 changes: 1 addition & 1 deletion src/ConductorSharp.Engine/ConductorSharp.Engine.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<Authors>Codaxy</Authors>
<Company>Codaxy</Company>
<PackageId>ConductorSharp.Engine</PackageId>
<Version>3.4.0</Version>
<Version>3.4.1</Version>
<Description>Client library for Netflix Conductor, with some additional quality of life features.</Description>
<RepositoryUrl>https://github.com/codaxy/conductor-sharp</RepositoryUrl>
<PackageTags>netflix;conductor</PackageTags>
Expand Down
51 changes: 27 additions & 24 deletions src/ConductorSharp.Engine/ExecutionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -140,21 +140,7 @@ private async Task PollAndHandle(TaskToWorker scheduledWorker, CancellationToken
return;
}

try
{
using var tokenHolder = _cancellationNotifier.GetCancellationToken(pollResponse.TaskId, cancellationToken);
await ProcessPolledTask(pollResponse, workerId, scheduledWorker, tokenHolder.CancellationToken);
}
catch (TaskCanceledException)
{
_logger.LogWarning(
"Polled task {Task}(id={TaskId}) of workflow {Workflow}(id={WorkflowId}) is cancelled",
pollResponse.TaskDefName,
pollResponse.TaskId,
pollResponse.WorkflowType,
pollResponse.WorkflowInstanceId
);
}
await ProcessPolledTask(pollResponse, workerId, scheduledWorker, cancellationToken);
}

private async Task ProcessPolledTask(
Expand All @@ -164,6 +150,8 @@ private async Task ProcessPolledTask(
CancellationToken cancellationToken
)
{
using var tokenHolder = _cancellationNotifier.GetCancellationToken(pollResponse.TaskId, cancellationToken);

try
{
if (!string.IsNullOrEmpty(pollResponse.ExternalInputPayloadStoragePath))
Expand All @@ -178,7 +166,7 @@ CancellationToken cancellationToken
);

// TODO: iffy
var file = await _externalPayloadService.GetExternalStorageDataAsync(externalStorageLocation.Path, cancellationToken);
var file = await _externalPayloadService.GetExternalStorageDataAsync(externalStorageLocation.Path, tokenHolder.CancellationToken);

using TextReader textReader = new StreamReader(file.Stream);
var json = await textReader.ReadToEndAsync();
Expand Down Expand Up @@ -211,7 +199,7 @@ CancellationToken cancellationToken
context.WorkerId = workerId;
}

var response = await mediator.Send(inputData, cancellationToken);
var response = await mediator.Send(inputData, tokenHolder.CancellationToken);

await _taskManager.UpdateAsync(
new TaskResult
Expand All @@ -221,13 +209,28 @@ await _taskManager.UpdateAsync(
OutputData = SerializationHelper.ObjectToDictionary(response, ConductorConstants.IoJsonSerializerSettings),
WorkflowInstanceId = pollResponse.WorkflowInstanceId
},
cancellationToken
tokenHolder.CancellationToken
);
}
catch (TaskCanceledException)
catch (OperationCanceledException) when (tokenHolder.IsCancellationRequestedByNotifier)
{
// Propagate this exception to outer handler
throw;
_logger.LogWarning(
"Polled task {Task}(id={TaskId}) of workflow {Workflow}(id={WorkflowId}) is cancelled",
pollResponse.TaskDefName,
pollResponse.TaskId,
pollResponse.WorkflowType,
pollResponse.WorkflowInstanceId
);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) // This is fine since we know cancellationToken comes from background service
{
_logger.LogWarning(
"Cancelling task {Task}(id={TaskId}) of workflow {Workflow}(id={WorkflowId}) due to background service shutdown",
pollResponse.TaskDefName,
pollResponse.TaskId,
pollResponse.WorkflowType,
pollResponse.WorkflowInstanceId
);
}
catch (Exception exception)
{
Expand Down Expand Up @@ -256,10 +259,10 @@ await Task.WhenAll(
OutputData = SerializationHelper.ObjectToDictionary(errorMessage, ConductorConstants.IoJsonSerializerSettings),
WorkflowInstanceId = pollResponse?.WorkflowInstanceId
},
cancellationToken
tokenHolder.CancellationToken
),
_taskManager.LogAsync(pollResponse.TaskId, exception.Message, cancellationToken),
_taskManager.LogAsync(pollResponse.TaskId, exception.StackTrace, cancellationToken)
_taskManager.LogAsync(pollResponse.TaskId, exception.Message, tokenHolder.CancellationToken),
_taskManager.LogAsync(pollResponse.TaskId, exception.StackTrace, tokenHolder.CancellationToken)
]
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ public interface ICancellationNotifier
public interface ICancellationTokenHolder : IDisposable
{
CancellationToken CancellationToken { get; }
bool IsCancellationRequestedByNotifier { get; }
}

ICancellationTokenHolder GetCancellationToken(string taskId, CancellationToken engineCancellationToken);
Expand Down
2 changes: 2 additions & 0 deletions src/ConductorSharp.Engine/Service/NoOpCancellationNotifier.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ internal class PassthroughCancellationTokenHolder(CancellationToken cancellation
{
public CancellationToken CancellationToken { get; } = cancellationToken;

public bool IsCancellationRequestedByNotifier => false;

public void Dispose() { }
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<Version>3.4.0</Version>
<Version>3.4.1</Version>
<Authors>Codaxy</Authors>
<Company>Codaxy</Company>
</PropertyGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ internal class CancellationTokenSourceHolder : ICancellationNotifier.ICancellati

public CancellationToken CancellationToken { get; }

public bool IsCancellationRequestedByNotifier => _notifier.IsCancellationRequested(_taskId);

public CancellationTokenSourceHolder(CancellationToken cancellationToken, string taskId, KafkaCancellationNotifier notifier)
{
CancellationToken = cancellationToken;
Expand All @@ -26,10 +28,21 @@ public CancellationTokenSourceHolder(CancellationToken cancellationToken, string
public void Dispose() => _notifier.ClearTaskCts(_taskId);
}

private class TaskCancellationInfo
{
public TaskCancellationInfo(CancellationTokenSource cancellationTokenSource)
{
CancellationTokenSource = cancellationTokenSource;
}

public CancellationTokenSource CancellationTokenSource { get; }
public bool IsCancellationRequested { get; set; }
}

private readonly HashSet<string> _tasks;
private readonly object _lock = new();
private readonly ILogger<KafkaCancellationNotifier> _logger;
private readonly Dictionary<string, CancellationTokenSource> _taskIdToCtsMap = new();
private readonly Dictionary<string, TaskCancellationInfo> _taskIdToInfoMap = new();

public KafkaCancellationNotifier(IEnumerable<TaskToWorker> tasks, ILogger<KafkaCancellationNotifier> logger)
{
Expand All @@ -39,8 +52,8 @@ public KafkaCancellationNotifier(IEnumerable<TaskToWorker> tasks, ILogger<KafkaC

public ICancellationNotifier.ICancellationTokenHolder GetCancellationToken(string taskId, CancellationToken engineCancellationToken)
{
var cts = CreateCts(taskId, engineCancellationToken);
return new CancellationTokenSourceHolder(cts.Token, taskId, this);
var token = CreateTaskCancellationInfoAndGetToken(taskId, engineCancellationToken);
return new CancellationTokenSourceHolder(token, taskId, this);
}

public void HandleKafkaEvent(TaskStatusModel taskStatusModel)
Expand All @@ -52,56 +65,65 @@ public void HandleKafkaEvent(TaskStatusModel taskStatusModel)
)
return;

var cts = GetCts(taskStatusModel.TaskId);
if (cts is null)
TryToCancelTask(taskStatusModel);
}

private CancellationToken CreateTaskCancellationInfoAndGetToken(string taskId, CancellationToken engineCancellationToken = default)
{
CancellationToken token;

lock (_lock)
{
_logger.LogWarning(
"Unable to cancel task {TaskId} of workflow {WorkflowId}",
taskStatusModel.TaskId,
taskStatusModel.WorkflowInstanceId
var info = _taskIdToInfoMap[taskId] = new TaskCancellationInfo(
CancellationTokenSource.CreateLinkedTokenSource(engineCancellationToken)
);
return;
token = info.CancellationTokenSource.Token;
}

cts.Cancel();
return token;
}

private CancellationTokenSource CreateCts(string taskId, CancellationToken engineCancellationToken = default)
private void TryToCancelTask(TaskStatusModel taskStatusModel)
{
CancellationTokenSource cts;
var stopwatch = Stopwatch.StartNew();
TaskCancellationInfo? info;

lock (_lock)
{
cts = _taskIdToCtsMap[taskId] = CancellationTokenSource.CreateLinkedTokenSource(engineCancellationToken);
info = _taskIdToInfoMap.GetValueOrDefault(taskStatusModel.TaskId);
}
_logger.LogDebug("CancellationTokenSource creation time {ElapsedMs}ms", stopwatch.ElapsedMilliseconds);

return cts;
if (info is null)
{
_logger.LogWarning(
"Unable to cancel task {TaskId} of workflow {WorkflowId}",
taskStatusModel.TaskId,
taskStatusModel.WorkflowInstanceId
);
return;
}

lock (_lock)
{
info.IsCancellationRequested = true;
info.CancellationTokenSource.Cancel();
}
}

private CancellationTokenSource? GetCts(string taskId)
private void ClearTaskCts(string taskId)
{
CancellationTokenSource? cts;
var stopwatch = Stopwatch.StartNew();

lock (_lock)
{
cts = _taskIdToCtsMap.GetValueOrDefault(taskId);
_taskIdToInfoMap[taskId].CancellationTokenSource.Dispose();
_taskIdToInfoMap.Remove(taskId);
}
_logger.LogDebug("CancellationTokenSource get time {ElapsedMs}ms", stopwatch.ElapsedMilliseconds);

return cts;
}

private void ClearTaskCts(string taskId)
private bool IsCancellationRequested(string taskId)
{
var stopwatch = Stopwatch.StartNew();
lock (_lock)
{
_taskIdToCtsMap.Remove(taskId);
return _taskIdToInfoMap[taskId].IsCancellationRequested;
}
_logger.LogDebug("CancellationTokenSource removal time {ElapsedMs}ms", stopwatch.ElapsedMilliseconds);
}
}
}
2 changes: 1 addition & 1 deletion src/ConductorSharp.Patterns/ConductorSharp.Patterns.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<GeneratePackageOnBuild>False</GeneratePackageOnBuild>
<Authors>Codaxy</Authors>
<Company>Codaxy</Company>
<Version>3.4.0</Version>
<Version>3.4.1</Version>
</PropertyGroup>

<ItemGroup>
Expand Down
Loading