Skip to content

Commit

Permalink
[Cosmos DB] Adds more Change Feed logs (#753)
Browse files Browse the repository at this point in the history
This PR adds back the CosmosDBTriggerHealthMonitor leveraging the new Notification APIs to log:

When a lease is acquired
When a lease is released
When changes are delivered to a Function (before user code is processed)
When there are errors handling a lease, due to user unhandled exceptions or any Cosmos related failures
  • Loading branch information
ealsur committed Nov 19, 2021
1 parent 70509a4 commit 4ae40e8
Show file tree
Hide file tree
Showing 5 changed files with 293 additions and 20 deletions.
14 changes: 11 additions & 3 deletions src/WebJobs.Extensions.CosmosDB/Constants.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Text;
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB
{
public static class Constants
{
public const string DefaultConnectionStringName = "CosmosDB";
}

internal static class Events
{
public static readonly EventId OnError = new EventId(1, "OnTriggerError");
public static readonly EventId OnAcquire = new EventId(2, "OnTriggerAcquire");
public static readonly EventId OnRelease = new EventId(3, "OnTriggerRelease");
public static readonly EventId OnDelivery = new EventId(4, "OnTriggerDelivery");
public static readonly EventId OnListenerStopError = new EventId(5, "OnTriggerListenerStopError");
public static readonly EventId OnScaling = new EventId(6, "OnScaling");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Net;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB
{
internal class CosmosDBTriggerHealthMonitor
{
private readonly ILogger logger;

public CosmosDBTriggerHealthMonitor(ILogger logger)
{
this.logger = logger;
}

public Task OnErrorAsync(string leaseToken, Exception exception)
{
switch (exception)
{
case ChangeFeedProcessorUserException userException:
this.logger.LogWarning(Events.OnError, userException.InnerException, "Lease {LeaseToken} encountered an unhandled user exception during processing.", leaseToken);
this.logger.LogDebug(Events.OnError, "Lease {LeaseToken} has error diagnostics {Diagnostics}", leaseToken, userException.ChangeFeedProcessorContext.Diagnostics);
break;
case CosmosException cosmosException when cosmosException.StatusCode == HttpStatusCode.RequestTimeout || cosmosException.StatusCode == HttpStatusCode.ServiceUnavailable:
this.logger.LogWarning(Events.OnError, cosmosException, "Lease {LeaseToken} experiencing transient connectivity issues.", leaseToken);
break;
default:
this.logger.LogError(Events.OnError, exception, "Lease {LeaseToken} experienced an error during processing.", leaseToken);
break;
}

if (exception is CosmosException asCosmosException)
{
this.logger.LogDebug(Events.OnError, "Lease {LeaseToken} has error diagnostics {Diagnostics}", leaseToken, asCosmosException.Diagnostics);
}

return Task.CompletedTask;
}

public Task OnLeaseAcquireAsync(string leaseToken)
{
this.logger.LogDebug(Events.OnAcquire, "Lease {LeaseToken} was acquired to start processing.", leaseToken);
return Task.CompletedTask;
}

public Task OnLeaseReleaseAsync(string leaseToken)
{
this.logger.LogDebug(Events.OnRelease, "Lease {LeaseToken} was released.", leaseToken);
return Task.CompletedTask;
}

public void OnChangesDelivered(ChangeFeedProcessorContext context)
{
this.logger.LogDebug(Events.OnDelivery, "Events delivered to lease {LeaseToken} with diagnostics {Diagnostics}", context.LeaseToken, context.Diagnostics);
}
}
}
42 changes: 27 additions & 15 deletions src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ internal class CosmosDBTriggerListener<T> : IListener, IScaleMonitor<CosmosDBTri
private readonly string _processorName;
private readonly string _functionId;
private readonly ScaleMonitorDescriptor _scaleMonitorDescriptor;
private readonly CosmosDBTriggerHealthMonitor _healthMonitor;
private ChangeFeedProcessor _host;
private ChangeFeedProcessorBuilder _hostBuilder;
private int _listenerStatus;
Expand Down Expand Up @@ -68,6 +69,7 @@ public CosmosDBTriggerListener(
this._leaseContainer = leaseContainer;
this._cosmosDBAttribute = cosmosDBAttribute;
this._scaleMonitorDescriptor = new ScaleMonitorDescriptor($"{_functionId}-CosmosDBTrigger-{_monitoredContainer.Database.Id}-{_monitoredContainer.Id}".ToLower());
this._healthMonitor = new CosmosDBTriggerHealthMonitor(logger);
}

public ScaleMonitorDescriptor Descriptor => this._scaleMonitorDescriptor;
Expand Down Expand Up @@ -132,7 +134,7 @@ public async Task StopAsync(CancellationToken cancellationToken)
}
catch (Exception ex)
{
this._logger.LogWarning($"Stopping the observer failed, potentially it was never started. Exception: {ex.Message}.");
this._logger.LogWarning(Events.OnListenerStopError, "Stopping the observer failed, potentially it was never started. Exception: {Exception}.", ex);
}
}

Expand All @@ -151,6 +153,9 @@ internal virtual void InitializeBuilder()
if (this._hostBuilder == null)
{
this._hostBuilder = this._monitoredContainer.GetChangeFeedProcessorBuilder<T>(this._processorName, this.ProcessChangesAsync)
.WithErrorNotification(this._healthMonitor.OnErrorAsync)
.WithLeaseAcquireNotification(this._healthMonitor.OnLeaseAcquireAsync)
.WithLeaseReleaseNotification(this._healthMonitor.OnLeaseReleaseAsync)
.WithInstanceName(this._hostName)
.WithLeaseContainer(this._leaseContainer);

Expand Down Expand Up @@ -208,9 +213,16 @@ internal virtual void InitializeBuilder()
}
}

private Task ProcessChangesAsync(IReadOnlyCollection<T> docs, CancellationToken cancellationToken)
private async Task ProcessChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<T> docs, CancellationToken cancellationToken)
{
return this._executor.TryExecuteAsync(new TriggeredFunctionData() { TriggerValue = docs }, cancellationToken);
this._healthMonitor.OnChangesDelivered(context);
FunctionResult result = await this._executor.TryExecuteAsync(new TriggeredFunctionData() { TriggerValue = docs }, cancellationToken);
if (!result.Succeeded
&& result.Exception != null)
{
ChangeFeedProcessorUserException userException = new ChangeFeedProcessorUserException(result.Exception, context);
await this._healthMonitor.OnErrorAsync(context.LeaseToken, userException);
}
}

public async Task<CosmosDBTriggerMetrics> GetMetricsAsync()
Expand Down Expand Up @@ -238,7 +250,7 @@ public async Task<CosmosDBTriggerMetrics> GetMetricsAsync()
{
if (!TryHandleCosmosException(e))
{
_logger.LogWarning("Unable to handle {0}: {1}", e.GetType().ToString(), e.Message);
_logger.LogWarning(Events.OnScaling, "Unable to handle {0}: {1}", e.GetType().ToString(), e.Message);
if (e is InvalidOperationException)
{
throw;
Expand Down Expand Up @@ -267,7 +279,7 @@ public async Task<CosmosDBTriggerMetrics> GetMetricsAsync()
errormsg = e.ToString();
}

_logger.LogWarning(errormsg);
_logger.LogWarning(Events.OnScaling, errormsg);
}

return new CosmosDBTriggerMetrics
Expand Down Expand Up @@ -314,8 +326,8 @@ private ScaleStatus GetScaleStatusCore(int workerCount, CosmosDBTriggerMetrics[]
if (partitionCount > 0 && partitionCount < workerCount)
{
status.Vote = ScaleVote.ScaleIn;
_logger.LogInformation(string.Format($"WorkerCount ({workerCount}) > PartitionCount ({partitionCount})."));
_logger.LogInformation(string.Format($"Number of instances ({workerCount}) is too high relative to number " +
_logger.LogInformation(Events.OnScaling, string.Format($"WorkerCount ({workerCount}) > PartitionCount ({partitionCount})."));
_logger.LogInformation(Events.OnScaling, string.Format($"Number of instances ({workerCount}) is too high relative to number " +
$"of partitions for container ({this._monitoredContainer.Id}, {partitionCount})."));
return status;
}
Expand All @@ -331,8 +343,8 @@ private ScaleStatus GetScaleStatusCore(int workerCount, CosmosDBTriggerMetrics[]
if (latestRemainingWork > workerCount * 1000)
{
status.Vote = ScaleVote.ScaleOut;
_logger.LogInformation(string.Format($"RemainingWork ({latestRemainingWork}) > WorkerCount ({workerCount}) * 1,000."));
_logger.LogInformation(string.Format($"Remaining work for container ({this._monitoredContainer.Id}, {latestRemainingWork}) " +
_logger.LogInformation(Events.OnScaling, string.Format($"RemainingWork ({latestRemainingWork}) > WorkerCount ({workerCount}) * 1,000."));
_logger.LogInformation(Events.OnScaling, string.Format($"Remaining work for container ({this._monitoredContainer.Id}, {latestRemainingWork}) " +
$"is too high relative to the number of instances ({workerCount})."));
return status;
}
Expand All @@ -341,8 +353,8 @@ private ScaleStatus GetScaleStatusCore(int workerCount, CosmosDBTriggerMetrics[]
if (documentsWaiting && partitionCount > 0 && partitionCount > workerCount)
{
status.Vote = ScaleVote.ScaleOut;
_logger.LogInformation(string.Format($"CosmosDB container '{this._monitoredContainer.Id}' has documents waiting to be processed."));
_logger.LogInformation(string.Format($"There are {workerCount} instances relative to {partitionCount} partitions."));
_logger.LogInformation(Events.OnScaling, string.Format($"CosmosDB container '{this._monitoredContainer.Id}' has documents waiting to be processed."));
_logger.LogInformation(Events.OnScaling, string.Format($"There are {workerCount} instances relative to {partitionCount} partitions."));
return status;
}

Expand All @@ -351,7 +363,7 @@ private ScaleStatus GetScaleStatusCore(int workerCount, CosmosDBTriggerMetrics[]
if (isIdle)
{
status.Vote = ScaleVote.ScaleIn;
_logger.LogInformation(string.Format($"'{this._monitoredContainer.Id}' is idle."));
_logger.LogInformation(Events.OnScaling, string.Format($"'{this._monitoredContainer.Id}' is idle."));
return status;
}

Expand All @@ -365,7 +377,7 @@ private ScaleStatus GetScaleStatusCore(int workerCount, CosmosDBTriggerMetrics[]
if (remainingWorkIncreasing)
{
status.Vote = ScaleVote.ScaleOut;
_logger.LogInformation($"Remaining work is increasing for '{this._monitoredContainer.Id}'.");
_logger.LogInformation(Events.OnScaling, $"Remaining work is increasing for '{this._monitoredContainer.Id}'.");
return status;
}

Expand All @@ -377,11 +389,11 @@ private ScaleStatus GetScaleStatusCore(int workerCount, CosmosDBTriggerMetrics[]
if (remainingWorkDecreasing)
{
status.Vote = ScaleVote.ScaleIn;
_logger.LogInformation($"Remaining work is decreasing for '{this._monitoredContainer.Id}'.");
_logger.LogInformation(Events.OnScaling, $"Remaining work is decreasing for '{this._monitoredContainer.Id}'.");
return status;
}

_logger.LogInformation($"CosmosDB container '{this._monitoredContainer.Id}' is steady.");
_logger.LogInformation(Events.OnScaling, $"CosmosDB container '{this._monitoredContainer.Id}' is steady.");

return status;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
</PropertyGroup>
<Import Project="..\..\build\common.props" />
<PropertyGroup>
<Version>$(CosmosDBVersion)-preview2</Version>
<Version>$(CosmosDBVersion)-preview3</Version>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|AnyCPU'">
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
Expand All @@ -19,7 +19,7 @@
<WarningsAsErrors />
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.Cosmos" Version="3.20.1" />
<PackageReference Include="Microsoft.Azure.Cosmos" Version="3.23.0" />
<PackageReference Include="Microsoft.Azure.WebJobs" Version="3.0.31" />
<PackageReference Include="Microsoft.CSharp" Version="4.5.0" />
<PackageReference Include="Microsoft.Extensions.Azure" Version="1.1.0" />
Expand Down
Loading

0 comments on commit 4ae40e8

Please sign in to comment.