Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event Hubs - Don't pass cancelled token to TryExecute when draining #38067

Merged
merged 20 commits into from Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -9,6 +9,7 @@
using Microsoft.Azure.WebJobs.Description;
using Microsoft.Azure.WebJobs.EventHubs.Listeners;
using Microsoft.Azure.WebJobs.EventHubs.Processor;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.Host.Bindings;
using Microsoft.Azure.WebJobs.Host.Config;
using Microsoft.Azure.WebJobs.Host.Configuration;
Expand All @@ -28,19 +29,22 @@ internal class EventHubExtensionConfigProvider : IExtensionConfigProvider
private readonly IConverterManager _converterManager;
private readonly IWebJobsExtensionConfiguration<EventHubExtensionConfigProvider> _configuration;
private readonly EventHubClientFactory _clientFactory;
private readonly IDrainModeManager _drainModeManager;

public EventHubExtensionConfigProvider(
IOptions<EventHubOptions> options,
ILoggerFactory loggerFactory,
IConverterManager converterManager,
IWebJobsExtensionConfiguration<EventHubExtensionConfigProvider> configuration,
EventHubClientFactory clientFactory)
EventHubClientFactory clientFactory,
IDrainModeManager drainModeManager)
{
_options = options;
_loggerFactory = loggerFactory;
_converterManager = converterManager;
_configuration = configuration;
_clientFactory = clientFactory;
_drainModeManager = drainModeManager;
}

internal Action<ExceptionReceivedEventArgs> ExceptionHandler { get; set; }
Expand Down Expand Up @@ -71,7 +75,12 @@ public void Initialize(ExtensionConfigContext context)
.AddOpenConverter<OpenType.Poco, EventData>(ConvertPocoToEventData);

// register our trigger binding provider
var triggerBindingProvider = new EventHubTriggerAttributeBindingProvider(_converterManager, _options, _loggerFactory, _clientFactory);
var triggerBindingProvider = new EventHubTriggerAttributeBindingProvider(
_converterManager,
_options,
_loggerFactory,
_clientFactory,
_drainModeManager);
context.AddBindingRule<EventHubTriggerAttribute>()
.BindToTrigger(triggerBindingProvider);

Expand Down
Expand Up @@ -28,8 +28,6 @@ internal sealed partial class EventHubListener
/// </summary>
internal class PartitionProcessor : IEventProcessor, IDisposable
{
private readonly CancellationTokenSource _cts = new();

private readonly ITriggeredFunctionExecutor _executor;
private readonly bool _singleDispatch;
private readonly ILogger _logger;
Expand All @@ -44,13 +42,15 @@ internal class PartitionProcessor : IEventProcessor, IDisposable
private Task _cachedEventsBackgroundTask;
private CancellationTokenSource _cachedEventsBackgroundTaskCts;
private SemaphoreSlim _cachedEventsGuard;
private readonly CancellationToken _functionExecutionToken;
private readonly CancellationTokenSource _ownershipLostTokenSource;

/// <summary>
/// When we have a minimum batch size greater than 1, this class manages caching events.
/// </summary>
internal PartitionProcessorEventsManager CachedEventsManager { get; }

public PartitionProcessor(EventHubOptions options, ITriggeredFunctionExecutor executor, ILogger logger, bool singleDispatch)
public PartitionProcessor(EventHubOptions options, ITriggeredFunctionExecutor executor, ILogger logger, bool singleDispatch, CancellationToken functionExecutionToken)
{
_executor = executor;
_singleDispatch = singleDispatch;
Expand All @@ -59,6 +59,8 @@ public PartitionProcessor(EventHubOptions options, ITriggeredFunctionExecutor ex
_firstFunctionInvocation = true;
_maxWaitTime = options.MaxWaitTime;
_minimumBatchesEnabled = options.MinEventBatchSize > 1; // 1 is the default
_functionExecutionToken = functionExecutionToken;
_ownershipLostTokenSource = new CancellationTokenSource();

// Events are only cached when building a batch of minimum size.
if (_minimumBatchesEnabled)
Expand All @@ -70,8 +72,12 @@ public PartitionProcessor(EventHubOptions options, ITriggeredFunctionExecutor ex

public Task CloseAsync(EventProcessorHostPartition context, ProcessingStoppedReason reason)
{
// signal cancellation for any in progress executions and clear the cached events
_cts.Cancel();
if (reason == ProcessingStoppedReason.OwnershipLost)
{
_ownershipLostTokenSource.Cancel();
}

// clear the cached events
CachedEventsManager?.ClearEventCache();

_logger.LogDebug(GetOperationDetails(context, $"CloseAsync, {reason}"));
Expand All @@ -98,11 +104,10 @@ public Task ProcessErrorAsync(EventProcessorHostPartition context, Exception err
/// </summary>
/// <param name="context">The partition information for this partition.</param>
/// <param name="messages">The events to process.</param>
/// <param name="partitionProcessingCancellationToken">The cancellation token to respect if processing for the partition is canceled.</param>
/// <returns></returns>
public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnumerable<EventData> messages, CancellationToken partitionProcessingCancellationToken)
public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnumerable<EventData> messages)
{
using CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(_cts.Token, partitionProcessingCancellationToken);
using CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(_functionExecutionToken, _ownershipLostTokenSource.Token);
_mostRecentPartitionContext = context;
var events = messages.ToArray();
EventData eventToCheckpoint = null;
Expand Down Expand Up @@ -135,7 +140,7 @@ public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnume
TriggerDetails = eventHubTriggerInput.GetTriggerDetails(context)
};

await _executor.TryExecuteAsync(input, linkedCts.Token).ConfigureAwait(false);
await _executor.TryExecuteAsync(input, _functionExecutionToken).ConfigureAwait(false);
_firstFunctionInvocation = false;
eventToCheckpoint = events[i];
}
Expand Down Expand Up @@ -168,7 +173,7 @@ public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnume
_logger.LogDebug($"Partition Processor received events and is attempting to invoke function ({details})");

UpdateCheckpointContext(triggerEvents, context);
await TriggerExecute(triggerEvents, context, linkedCts.Token).ConfigureAwait(false);
await TriggerExecute(triggerEvents, context, _functionExecutionToken).ConfigureAwait(false);
eventToCheckpoint = triggerEvents.Last();

// If there is a background timer task, cancel it and dispose of the cancellation token. If there
Expand All @@ -186,7 +191,8 @@ public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnume
if (_cachedEventsBackgroundTaskCts == null && CachedEventsManager.HasCachedEvents)
{
// If there are events waiting to be processed, and no background task running, start a monitoring cycle.
_cachedEventsBackgroundTaskCts = CancellationTokenSource.CreateLinkedTokenSource(_cts.Token);
// Don't reference linkedCts in the class level background task, as it will be disposed when the method goes out of scope.
_cachedEventsBackgroundTaskCts = CancellationTokenSource.CreateLinkedTokenSource(_functionExecutionToken, _ownershipLostTokenSource.Token);
_cachedEventsBackgroundTask = MonitorCachedEvents(context.ProcessorHost.GetLastReadCheckpoint(context.PartitionId)?.LastModified, _cachedEventsBackgroundTaskCts);
}
}
Expand All @@ -201,7 +207,7 @@ public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnume
else
{
UpdateCheckpointContext(events, context);
await TriggerExecute(events, context, linkedCts.Token).ConfigureAwait(false);
await TriggerExecute(events, context, _functionExecutionToken).ConfigureAwait(false);
eventToCheckpoint = events.LastOrDefault();
}

Expand Down Expand Up @@ -276,7 +282,7 @@ private async Task MonitorCachedEvents(DateTimeOffset? lastCheckpointTime, Cance
var details = GetOperationDetails(_mostRecentPartitionContext, "MaxWaitTimeElapsed");
_logger.LogDebug($"Partition Processor has waited MaxWaitTime since last invocation and is attempting to invoke function on all held events ({details})");

await TriggerExecute(triggerEvents, _mostRecentPartitionContext, backgroundCancellationTokenSource.Token).ConfigureAwait(false);
await TriggerExecute(triggerEvents, _mostRecentPartitionContext, _functionExecutionToken).ConfigureAwait(false);
if (!backgroundCancellationTokenSource.Token.IsCancellationRequested)
{
await CheckpointAsync(triggerEvents.Last(), _mostRecentPartitionContext).ConfigureAwait(false);
Expand Down Expand Up @@ -408,7 +414,6 @@ protected virtual void Dispose(bool disposing)
{
if (disposing)
{
_cts.Dispose();
_cachedEventsBackgroundTaskCts?.Dispose();
_cachedEventsGuard?.Dispose();
}
Expand Down
Expand Up @@ -7,6 +7,7 @@
using Azure.Messaging.EventHubs.Primitives;
using Microsoft.Azure.WebJobs.EventHubs.Processor;
using Microsoft.Azure.WebJobs.Extensions.EventHubs.Listeners;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Listeners;
using Microsoft.Azure.WebJobs.Host.Scale;
Expand All @@ -27,6 +28,9 @@ internal sealed partial class EventHubListener : IListener, IEventProcessorFacto
private readonly ILoggerFactory _loggerFactory;
private readonly ILogger _logger;
private string _details;
private CancellationTokenSource _functionExecutionCancellationTokenSource;
private readonly IDrainModeManager _drainModeManager;
private volatile bool _disposed;

public EventHubListener(
string functionId,
Expand All @@ -36,7 +40,8 @@ internal sealed partial class EventHubListener : IListener, IEventProcessorFacto
IEventHubConsumerClient consumerClient,
BlobCheckpointStoreInternal checkpointStore,
EventHubOptions options,
ILoggerFactory loggerFactory)
ILoggerFactory loggerFactory,
IDrainModeManager drainModeManager)
{
_loggerFactory = loggerFactory;
_executor = executor;
Expand All @@ -45,6 +50,8 @@ internal sealed partial class EventHubListener : IListener, IEventProcessorFacto
_checkpointStore = checkpointStore;
_options = options;
_logger = _loggerFactory.CreateLogger<EventHubListener>();
_functionExecutionCancellationTokenSource = new CancellationTokenSource();
_drainModeManager = drainModeManager;

EventHubMetricsProvider metricsProvider = new EventHubMetricsProvider(functionId, consumerClient, checkpointStore, _loggerFactory.CreateLogger<EventHubMetricsProvider>());

Expand All @@ -68,20 +75,29 @@ internal sealed partial class EventHubListener : IListener, IEventProcessorFacto
}

/// <summary>
/// Cancel any in progress listen operation.
/// Cancel should be called prior to Dispose. We just validate that we are not already disposed.
/// This is consistent with the Service Bus listener behavior.
/// </summary>
void IListener.Cancel()
{
#pragma warning disable AZC0102
StopAsync(CancellationToken.None).GetAwaiter().GetResult();
#pragma warning restore AZC0102
if (_disposed)
{
throw new ObjectDisposedException(nameof(IListener));
}
}

void IDisposable.Dispose()
{
_functionExecutionCancellationTokenSource.Cancel();

#pragma warning disable AZC0102
StopAsync(CancellationToken.None).GetAwaiter().GetResult();
#pragma warning restore AZC0102

// No need to dispose the _disposingCancellationTokenSource since we don't create it as a linked token and
// it won't use a timer, so the Dispose method is essentially a no-op. The downside to disposing it is that
// any customers who are trying to use it to cancel their own operations would get an ObjectDisposedException.
_disposed = true;
}

public async Task StartAsync(CancellationToken cancellationToken)
Expand All @@ -94,14 +110,19 @@ public async Task StartAsync(CancellationToken cancellationToken)

public async Task StopAsync(CancellationToken cancellationToken)
{
if (!_drainModeManager.IsDrainModeEnabled)
{
_functionExecutionCancellationTokenSource.Cancel();
}

await _eventProcessorHost.StopProcessingAsync(cancellationToken).ConfigureAwait(false);

_logger.LogDebug($"EventHub listener stopped ({_details})");
}

IEventProcessor IEventProcessorFactory.CreatePartitionProcessor()
{
return new PartitionProcessor(_options, _executor, _loggerFactory.CreateLogger<PartitionProcessor>(), _singleDispatch);
return new PartitionProcessor(_options, _executor, _loggerFactory.CreateLogger<PartitionProcessor>(), _singleDispatch, _functionExecutionCancellationTokenSource.Token);
}

public IScaleMonitor GetMonitor()
Expand Down
Expand Up @@ -66,7 +66,8 @@ protected override async Task<EventProcessorCheckpoint> GetCheckpointAsync(strin

if (checkpoint is BlobCheckpointStoreInternal.BlobStorageCheckpoint blobCheckpoint && blobCheckpoint is not null)
{
_lastReadCheckpoint[partitionId] = new CheckpointInfo(blobCheckpoint.Offset ?? -1, blobCheckpoint.SequenceNumber ?? -1, blobCheckpoint.LastModified);
_lastReadCheckpoint[partitionId] = new CheckpointInfo(blobCheckpoint.Offset ?? -1, blobCheckpoint.SequenceNumber ?? -1,
blobCheckpoint.LastModified);
}

return checkpoint;
Expand Down Expand Up @@ -112,7 +113,7 @@ protected override Task OnProcessingEventBatchAsync(IEnumerable<EventData> event
return Task.CompletedTask;
}

return partition.EventProcessor.ProcessEventsAsync(partition, events, cancellationToken);
return partition.EventProcessor.ProcessEventsAsync(partition, events);
}

protected override async Task OnInitializingPartitionAsync(EventProcessorHostPartition partition, CancellationToken cancellationToken)
Expand Down
Expand Up @@ -15,6 +15,6 @@ internal interface IEventProcessor
Task CloseAsync(EventProcessorHostPartition context, ProcessingStoppedReason reason);
Task OpenAsync(EventProcessorHostPartition context);
Task ProcessErrorAsync(EventProcessorHostPartition context, Exception error);
Task ProcessEventsAsync(EventProcessorHostPartition context, IEnumerable<EventData> messages, CancellationToken cancellationToken);
Task ProcessEventsAsync(EventProcessorHostPartition context, IEnumerable<EventData> messages);
}
}
Expand Up @@ -7,6 +7,7 @@
using Azure.Messaging.EventHubs.Core;
using Azure.Messaging.EventHubs.Primitives;
using Microsoft.Azure.WebJobs.EventHubs.Listeners;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.Host.Bindings;
using Microsoft.Azure.WebJobs.Host.Listeners;
using Microsoft.Azure.WebJobs.Host.Triggers;
Expand All @@ -21,17 +22,20 @@ internal class EventHubTriggerAttributeBindingProvider : ITriggerBindingProvider
private readonly IOptions<EventHubOptions> _options;
private readonly EventHubClientFactory _clientFactory;
private readonly IConverterManager _converterManager;
private readonly IDrainModeManager _drainModeManager;

public EventHubTriggerAttributeBindingProvider(
IConverterManager converterManager,
IOptions<EventHubOptions> options,
ILoggerFactory loggerFactory,
EventHubClientFactory clientFactory)
EventHubClientFactory clientFactory,
IDrainModeManager drainModeManager)
{
_converterManager = converterManager;
_options = options;
_clientFactory = clientFactory;
_loggerFactory = loggerFactory;
_drainModeManager = drainModeManager;
}

[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")]
Expand Down Expand Up @@ -67,7 +71,8 @@ public Task<ITriggerBinding> TryCreateAsync(TriggerBindingProviderContext contex
_clientFactory.GetEventHubConsumerClient(attribute.EventHubName, attribute.Connection, attribute.ConsumerGroup),
checkpointStore,
options,
_loggerFactory);
_loggerFactory,
_drainModeManager);
return Task.FromResult(listener);
};
#pragma warning disable 618
Expand Down