Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event Hubs - Don't pass cancelled token to TryExecute when draining #38067

Merged
merged 20 commits into from Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -44,13 +44,14 @@ internal class PartitionProcessor : IEventProcessor, IDisposable
private Task _cachedEventsBackgroundTask;
private CancellationTokenSource _cachedEventsBackgroundTaskCts;
private SemaphoreSlim _cachedEventsGuard;
private readonly CancellationToken _disposingToken;

/// <summary>
/// When we have a minimum batch size greater than 1, this class manages caching events.
/// </summary>
internal PartitionProcessorEventsManager CachedEventsManager { get; }

public PartitionProcessor(EventHubOptions options, ITriggeredFunctionExecutor executor, ILogger logger, bool singleDispatch)
public PartitionProcessor(EventHubOptions options, ITriggeredFunctionExecutor executor, ILogger logger, bool singleDispatch, CancellationToken disposingToken)
{
_executor = executor;
_singleDispatch = singleDispatch;
Expand All @@ -59,6 +60,7 @@ public PartitionProcessor(EventHubOptions options, ITriggeredFunctionExecutor ex
_firstFunctionInvocation = true;
_maxWaitTime = options.MaxWaitTime;
_minimumBatchesEnabled = options.MinEventBatchSize > 1; // 1 is the default
_disposingToken = disposingToken;

// Events are only cached when building a batch of minimum size.
if (_minimumBatchesEnabled)
Expand Down Expand Up @@ -135,7 +137,7 @@ public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnume
TriggerDetails = eventHubTriggerInput.GetTriggerDetails(context)
};

await _executor.TryExecuteAsync(input, linkedCts.Token).ConfigureAwait(false);
await _executor.TryExecuteAsync(input, _disposingToken).ConfigureAwait(false);
_firstFunctionInvocation = false;
eventToCheckpoint = events[i];
}
Expand Down Expand Up @@ -168,7 +170,7 @@ public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnume
_logger.LogDebug($"Partition Processor received events and is attempting to invoke function ({details})");

UpdateCheckpointContext(triggerEvents, context);
await TriggerExecute(triggerEvents, context, linkedCts.Token).ConfigureAwait(false);
await TriggerExecute(triggerEvents, context, _disposingToken).ConfigureAwait(false);
eventToCheckpoint = triggerEvents.Last();

// If there is a background timer task, cancel it and dispose of the cancellation token. If there
Expand Down Expand Up @@ -201,7 +203,7 @@ public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnume
else
{
UpdateCheckpointContext(events, context);
await TriggerExecute(events, context, linkedCts.Token).ConfigureAwait(false);
await TriggerExecute(events, context, _disposingToken).ConfigureAwait(false);
eventToCheckpoint = events.LastOrDefault();
}

Expand Down Expand Up @@ -276,7 +278,7 @@ private async Task MonitorCachedEvents(DateTimeOffset? lastCheckpointTime, Cance
var details = GetOperationDetails(_mostRecentPartitionContext, "MaxWaitTimeElapsed");
_logger.LogDebug($"Partition Processor has waited MaxWaitTime since last invocation and is attempting to invoke function on all held events ({details})");

await TriggerExecute(triggerEvents, _mostRecentPartitionContext, backgroundCancellationTokenSource.Token).ConfigureAwait(false);
await TriggerExecute(triggerEvents, _mostRecentPartitionContext, _disposingToken).ConfigureAwait(false);
if (!backgroundCancellationTokenSource.Token.IsCancellationRequested)
{
await CheckpointAsync(triggerEvents.Last(), _mostRecentPartitionContext).ConfigureAwait(false);
Expand Down
Expand Up @@ -27,6 +27,7 @@ internal sealed partial class EventHubListener : IListener, IEventProcessorFacto
private readonly ILoggerFactory _loggerFactory;
private readonly ILogger _logger;
private string _details;
private CancellationTokenSource _disposingCancellationTokenSource;

public EventHubListener(
string functionId,
Expand All @@ -45,6 +46,7 @@ internal sealed partial class EventHubListener : IListener, IEventProcessorFacto
_checkpointStore = checkpointStore;
_options = options;
_logger = _loggerFactory.CreateLogger<EventHubListener>();
_disposingCancellationTokenSource = new CancellationTokenSource();

EventHubMetricsProvider metricsProvider = new EventHubMetricsProvider(functionId, consumerClient, checkpointStore, _loggerFactory.CreateLogger<EventHubMetricsProvider>());

Expand All @@ -68,20 +70,28 @@ internal sealed partial class EventHubListener : IListener, IEventProcessorFacto
}

/// <summary>
/// Cancel any in progress listen operation.
/// Cancel should be called prior to Dispose. We just validate that we are not already disposed.
/// This is consistent with the Service Bus listener behavior.
/// </summary>
void IListener.Cancel()
{
#pragma warning disable AZC0102
StopAsync(CancellationToken.None).GetAwaiter().GetResult();
#pragma warning restore AZC0102
if (_disposingCancellationTokenSource.IsCancellationRequested)
{
throw new ObjectDisposedException(nameof(IListener));
}
}

void IDisposable.Dispose()
{
_disposingCancellationTokenSource.Cancel();

#pragma warning disable AZC0102
StopAsync(CancellationToken.None).GetAwaiter().GetResult();
#pragma warning restore AZC0102

// No need to dispose the _disposingCancellationTokenSource since we don't create it as a linked token and
// it won't use a timer, so the Dispose method is essentially a no-op. The downside to disposing it is that
// any customers who are trying to use it to cancel their own operations would get an ObjectDisposedException.
}

public async Task StartAsync(CancellationToken cancellationToken)
Expand All @@ -101,7 +111,7 @@ public async Task StopAsync(CancellationToken cancellationToken)

IEventProcessor IEventProcessorFactory.CreatePartitionProcessor()
{
return new PartitionProcessor(_options, _executor, _loggerFactory.CreateLogger<PartitionProcessor>(), _singleDispatch);
return new PartitionProcessor(_options, _executor, _loggerFactory.CreateLogger<PartitionProcessor>(), _singleDispatch, _disposingCancellationTokenSource.Token);
}

public IScaleMonitor GetMonitor()
Expand Down
Expand Up @@ -66,7 +66,8 @@ protected override async Task<EventProcessorCheckpoint> GetCheckpointAsync(strin

if (checkpoint is BlobCheckpointStoreInternal.BlobStorageCheckpoint blobCheckpoint && blobCheckpoint is not null)
{
_lastReadCheckpoint[partitionId] = new CheckpointInfo(blobCheckpoint.Offset ?? -1, blobCheckpoint.SequenceNumber ?? -1, blobCheckpoint.LastModified);
_lastReadCheckpoint[partitionId] = new CheckpointInfo(blobCheckpoint.Offset ?? -1, blobCheckpoint.SequenceNumber ?? -1,
blobCheckpoint.LastModified);
}

return checkpoint;
Expand Down
Expand Up @@ -58,6 +58,7 @@ public async Task EventHub_PocoBinding()

bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
await jobHost.StopAsync();
}

var logs = host.GetTestLoggerProvider().GetAllLogMessages().Select(p => p.FormattedMessage);
Expand All @@ -77,6 +78,7 @@ public async Task EventHub_StringBinding()

bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
await jobHost.StopAsync();

var logs = host.GetTestLoggerProvider().GetAllLogMessages().Select(p => p.FormattedMessage);
CollectionAssert.Contains(logs, $"Input(data)");
Expand All @@ -96,11 +98,24 @@ public async Task EventHub_SingleDispatch()

bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
await jobHost.StopAsync();
}

AssertSingleDispatchLogs(host);
}

[Test]
public async Task EventHub_SingleDispatch_Dispose()
{
await using var producer = new EventHubProducerClient(EventHubsTestEnvironment.Instance.EventHubsConnectionString, _eventHubScope.EventHubName);
await producer.SendAsync(new EventData[] { new EventData(new BinaryData("data")) });
var (_, host) = BuildHost<EventHubTestSingleDispatchJobs_Dispose>(ConfigureTestEventHub);

bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
host.Dispose();
}

[Test]
public async Task EventHub_SingleDispatch_ConsumerGroup()
{
Expand All @@ -122,6 +137,7 @@ public async Task EventHub_SingleDispatch_ConsumerGroup()

bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
await jobHost.StopAsync();
}
}

Expand All @@ -135,6 +151,7 @@ public async Task EventHub_SingleDispatch_BinaryData()

bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
await jobHost.StopAsync();
}

AssertSingleDispatchLogs(host);
Expand All @@ -150,6 +167,7 @@ public async Task EventHub_ProducerClient()

bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
await jobHost.StopAsync();
}
}

Expand All @@ -163,6 +181,7 @@ public async Task EventHub_Collector()

bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
await jobHost.StopAsync();
}
}

Expand All @@ -176,6 +195,7 @@ public async Task EventHub_CollectorPartitionKey()

bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
await jobHost.StopAsync();
}
}

Expand Down Expand Up @@ -294,6 +314,7 @@ public async Task AssertCanSendReceiveMessage(Action<IHostBuilder> hostConfigura

bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
await jobHost.StopAsync();
}
}

Expand All @@ -308,6 +329,7 @@ public async Task EventHub_MultipleDispatch()

bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
await jobHost.StopAsync();
}

AssertMultipleDispatchLogs(host);
Expand All @@ -324,6 +346,7 @@ public async Task EventHub_MultipleDispatch_BinaryData()

bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
await jobHost.StopAsync();
}

AssertMultipleDispatchLogs(host);
Expand Down Expand Up @@ -359,11 +382,24 @@ public async Task EventHub_MultipleDispatch_MinBatchSize()

bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
await jobHost.StopAsync();
}

AssertMultipleDispatchLogsMinBatch(host);
}

[Test]
public async Task EventHub_MultipleDispatch_Dispose()
{
await using var producer = new EventHubProducerClient(EventHubsTestEnvironment.Instance.EventHubsConnectionString, _eventHubScope.EventHubName);
await producer.SendAsync(new EventData[] { new EventData(new BinaryData("data")) });
var (_, host) = BuildHost<EventHubTestMultipleDispatchJobs_Dispose>();

bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
host.Dispose();
}

private static void AssertMultipleDispatchLogsMinBatch(IHost host)
{
IEnumerable<LogMessage> logMessages = host.GetTestLoggerProvider()
Expand Down Expand Up @@ -422,6 +458,7 @@ public async Task EventHub_PartitionKey()
bool result = _eventWait.WaitOne(Timeout);

Assert.True(result);
await jobHost.StopAsync();
}
}

Expand All @@ -447,6 +484,7 @@ public async Task EventHub_InitialOffsetFromStart()
{
bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
await jobHost.StopAsync();
}
}

Expand Down Expand Up @@ -495,6 +533,7 @@ public async Task EventHub_InitialOffsetFromEnd()
try { await sendTask; } catch { /* Ignore, we're not testing sends */ }

Assert.True(result);
await jobHost.StopAsync();
}
}

Expand Down Expand Up @@ -541,6 +580,7 @@ await foreach (PartitionEvent evt in consumer.ReadEventsAsync())
{
bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
await jobHost.StopAsync();
}
}

Expand Down Expand Up @@ -578,6 +618,28 @@ public static void SendEvent_TestHub(string input, [EventHub(TestHubName, Connec
}
}

public class EventHubTestSingleDispatchJobs_Dispose
{
public static async Task SendEvent_TestHub([EventHubTrigger(TestHubName, Connection = TestHubName)] string evt, CancellationToken cancellationToken)
{
_eventWait.Set();
// wait a small amount of time for the host to call dispose
await Task.Delay(2000, CancellationToken.None);
Assert.IsTrue(cancellationToken.IsCancellationRequested);
}
}

public class EventHubTestMultipleDispatchJobs_Dispose
{
public static async Task SendEvent_TestHub([EventHubTrigger(TestHubName, Connection = TestHubName)] string[] evt, CancellationToken cancellationToken)
{
_eventWait.Set();
// wait a small amount of time for the host to call dispose
await Task.Delay(2000, CancellationToken.None);
Assert.IsTrue(cancellationToken.IsCancellationRequested);
}
}

public class EventHubTestCollectorDispatch
{
private static string s_partitionKey = null;
Expand Down