Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event Hubs - Don't pass cancelled token to TryExecute when draining #38067

Merged
merged 20 commits into from Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -28,8 +28,6 @@ internal sealed partial class EventHubListener
/// </summary>
internal class PartitionProcessor : IEventProcessor, IDisposable
{
private readonly CancellationTokenSource _cts = new();

private readonly ITriggeredFunctionExecutor _executor;
private readonly bool _singleDispatch;
private readonly ILogger _logger;
Expand Down Expand Up @@ -72,8 +70,7 @@ public PartitionProcessor(EventHubOptions options, ITriggeredFunctionExecutor ex

public Task CloseAsync(EventProcessorHostPartition context, ProcessingStoppedReason reason)
{
// signal cancellation for any in progress executions and clear the cached events
_cts.Cancel();
// clear the cached events
CachedEventsManager?.ClearEventCache();

_logger.LogDebug(GetOperationDetails(context, $"CloseAsync, {reason}"));
Expand Down Expand Up @@ -104,7 +101,7 @@ public Task ProcessErrorAsync(EventProcessorHostPartition context, Exception err
/// <returns></returns>
public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnumerable<EventData> messages, CancellationToken partitionProcessingCancellationToken)
{
using CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(_cts.Token, partitionProcessingCancellationToken);
using CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(_functionExecutionToken, partitionProcessingCancellationToken);
_mostRecentPartitionContext = context;
var events = messages.ToArray();
EventData eventToCheckpoint = null;
Expand Down Expand Up @@ -151,10 +148,10 @@ public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnume
try
{
// Try to acquire the semaphore. This protects the cached events.
if (!_cachedEventsGuard.Wait(0, linkedCts.Token))
if (!_cachedEventsGuard.Wait(0, _functionExecutionToken))
{
// This will throw if the cancellation token is canceled.
await _cachedEventsGuard.WaitAsync(linkedCts.Token).ConfigureAwait(false);
await _cachedEventsGuard.WaitAsync(_functionExecutionToken).ConfigureAwait(false);
}
acquiredSemaphore = true;

Expand Down Expand Up @@ -188,7 +185,7 @@ public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnume
if (_cachedEventsBackgroundTaskCts == null && CachedEventsManager.HasCachedEvents)
{
// If there are events waiting to be processed, and no background task running, start a monitoring cycle.
_cachedEventsBackgroundTaskCts = CancellationTokenSource.CreateLinkedTokenSource(_cts.Token);
_cachedEventsBackgroundTaskCts = CancellationTokenSource.CreateLinkedTokenSource(linkedCts.Token);
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
_cachedEventsBackgroundTask = MonitorCachedEvents(context.ProcessorHost.GetLastReadCheckpoint(context.PartitionId)?.LastModified, _cachedEventsBackgroundTaskCts);
}
}
Expand Down Expand Up @@ -410,7 +407,6 @@ protected virtual void Dispose(bool disposing)
{
if (disposing)
{
_cts.Dispose();
_cachedEventsBackgroundTaskCts?.Dispose();
_cachedEventsGuard?.Dispose();
}
Expand Down
Expand Up @@ -409,19 +409,15 @@ public async Task ProcessEvents_OwnershipLost_DoesNotCheckpoint()
}

int execution = 0;
var cts = new CancellationTokenSource();

executor.Setup(p => p.TryExecuteAsync(It.IsAny<TriggeredFunctionData>(), It.IsAny<CancellationToken>())).ReturnsAsync(() =>
{
if (execution == 0)
{
eventProcessor.CloseAsync(partitionContext, ProcessingStoppedReason.OwnershipLost).GetAwaiter().GetResult();
}
var result = results[execution++];
return result;
});

await eventProcessor.ProcessEventsAsync(partitionContext, events, cts.Token);
// Pass a cancellation token that is already signaled to simulate ownership loss
await eventProcessor.ProcessEventsAsync(partitionContext, events, new CancellationToken(true));

processor.Verify(
p => p.CheckpointAsync(partitionContext.PartitionId, It.IsAny<EventData>(), It.IsAny<CancellationToken>()),
Expand Down Expand Up @@ -457,15 +453,12 @@ public async Task ProcessEvents_Succeeds_ShuttingDown_DoesNotCheckpoint()

executor.Setup(p => p.TryExecuteAsync(It.IsAny<TriggeredFunctionData>(), It.IsAny<CancellationToken>())).ReturnsAsync(() =>
{
if (execution == 0)
{
eventProcessor.CloseAsync(partitionContext, ProcessingStoppedReason.Shutdown).GetAwaiter().GetResult();
}
var result = results[execution++];
return result;
});

await eventProcessor.ProcessEventsAsync(partitionContext, events, CancellationToken.None);
// Pass a cancellation token that is already signaled to simulate shutdown
await eventProcessor.ProcessEventsAsync(partitionContext, events, new CancellationToken(true));

processor.Verify(
p => p.CheckpointAsync(partitionContext.PartitionId, It.IsAny<EventData>(), It.IsAny<CancellationToken>()),
Expand Down Expand Up @@ -501,15 +494,12 @@ public async Task ProcessEvents_Fails_ShuttingDown_DoesNotCheckpoint()

executor.Setup(p => p.TryExecuteAsync(It.IsAny<TriggeredFunctionData>(), It.IsAny<CancellationToken>())).ReturnsAsync(() =>
{
if (execution == 0)
{
eventProcessor.CloseAsync(partitionContext, ProcessingStoppedReason.Shutdown).GetAwaiter().GetResult();
}
var result = results[execution++];
return result;
});

await eventProcessor.ProcessEventsAsync(partitionContext, events, CancellationToken.None);
// Pass a cancellation token that is already signaled to simulate shutdown
await eventProcessor.ProcessEventsAsync(partitionContext, events, new CancellationToken(true));

processor.Verify(
p => p.CheckpointAsync(partitionContext.PartitionId, It.IsAny<EventData>(), It.IsAny<CancellationToken>()),
Expand Down