Skip to content

Commit

Permalink
[v3.x] Adds Metric Logging support for GRPC workers (#8000)
Browse files Browse the repository at this point in the history
  • Loading branch information
github-actions[bot] committed Dec 15, 2021
1 parent 7d03108 commit 3e71d6e
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 103 deletions.
50 changes: 33 additions & 17 deletions src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs
Expand Up @@ -13,6 +13,7 @@
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Microsoft.Azure.WebJobs.Logging;
using Microsoft.Azure.WebJobs.Script.Description;
using Microsoft.Azure.WebJobs.Script.Diagnostics;
using Microsoft.Azure.WebJobs.Script.Eventing;
Expand Down Expand Up @@ -75,18 +76,18 @@ internal class GrpcWorkerChannel : IRpcWorkerChannel, IDisposable
private System.Timers.Timer _timer;

internal GrpcWorkerChannel(
string workerId,
IScriptEventManager eventManager,
RpcWorkerConfig workerConfig,
IWorkerProcess rpcWorkerProcess,
ILogger logger,
IMetricsLogger metricsLogger,
int attemptCount,
IEnvironment environment,
IOptionsMonitor<ScriptApplicationHostOptions> applicationHostOptions,
ISharedMemoryManager sharedMemoryManager,
IFunctionDataCache functionDataCache,
IOptions<WorkerConcurrencyOptions> workerConcurrencyOptions)
string workerId,
IScriptEventManager eventManager,
RpcWorkerConfig workerConfig,
IWorkerProcess rpcWorkerProcess,
ILogger logger,
IMetricsLogger metricsLogger,
int attemptCount,
IEnvironment environment,
IOptionsMonitor<ScriptApplicationHostOptions> applicationHostOptions,
ISharedMemoryManager sharedMemoryManager,
IFunctionDataCache functionDataCache,
IOptions<WorkerConcurrencyOptions> workerConcurrencyOptions)
{
_workerId = workerId;
_eventManager = eventManager;
Expand Down Expand Up @@ -122,7 +123,7 @@ internal class GrpcWorkerChannel : IRpcWorkerChannel, IDisposable
.Subscribe(async (msg) => await InvokeResponse(msg.Message.InvocationResponse)));

_inboundWorkerEvents.Where(msg => msg.MessageType == MsgType.WorkerStatusResponse)
.Subscribe((msg) => ReceiveWorkerStatusResponse(msg.Message.RequestId, msg.Message.WorkerStatusResponse));
.Subscribe((msg) => ReceiveWorkerStatusResponse(msg.Message.RequestId, msg.Message.WorkerStatusResponse));

_startLatencyMetric = metricsLogger?.LatencyEvent(string.Format(MetricEventNames.WorkerInitializeLatency, workerConfig.Description.Language, attemptCount));

Expand Down Expand Up @@ -656,14 +657,29 @@ internal void Log(GrpcEvent msg)
// Restore the execution context from the original invocation. This allows AsyncLocal state to flow to loggers.
System.Threading.ExecutionContext.Run(context.AsyncExecutionContext, (s) =>
{
if (rpcLog.Exception != null)
if (rpcLog.LogCategory == RpcLogCategory.CustomMetric)
{
var exception = new Workers.Rpc.RpcException(rpcLog.Message, rpcLog.Exception.Message, rpcLog.Exception.StackTrace);
context.Logger.Log(logLevel, new EventId(0, rpcLog.EventId), rpcLog.Message, exception, (state, exc) => state);
if (rpcLog.PropertiesMap.TryGetValue(LogConstants.NameKey, out var metricName)
&& rpcLog.PropertiesMap.TryGetValue(LogConstants.MetricValueKey, out var metricValue))
{
// Strip off the name/value entries in the dictionary passed to Log Message and include the rest as the property bag passed to the backing ILogger
var rpcLogProperties = rpcLog.PropertiesMap
.Where(i => i.Key != LogConstants.NameKey && i.Key != LogConstants.MetricValueKey)
.ToDictionary(i => i.Key, i => i.Value.ToObject());
context.Logger.LogMetric(metricName.String, metricValue.Double, rpcLogProperties);
}
}
else
{
context.Logger.Log(logLevel, new EventId(0, rpcLog.EventId), rpcLog.Message, null, (state, exc) => state);
if (rpcLog.Exception != null)
{
var exception = new Workers.Rpc.RpcException(rpcLog.Message, rpcLog.Exception.Message, rpcLog.Exception.StackTrace);
context.Logger.Log(logLevel, new EventId(0, rpcLog.EventId), rpcLog.Message, exception, (state, exc) => state);
}
else
{
context.Logger.Log(logLevel, new EventId(0, rpcLog.EventId), rpcLog.Message, null, (state, exc) => state);
}
}
}, null);
}
Expand Down
Expand Up @@ -24,93 +24,35 @@ internal static class GrpcMessageConversionExtensions
{
private static readonly JsonSerializerSettings _datetimeSerializerSettings = new JsonSerializerSettings { DateParseHandling = DateParseHandling.None };

public static object ToObject(this TypedData typedData)
{
switch (typedData.DataCase)
{
case RpcDataType.Bytes:
case RpcDataType.Stream:
return typedData.Bytes.ToByteArray();
case RpcDataType.String:
return typedData.String;
case RpcDataType.Json:
return JsonConvert.DeserializeObject(typedData.Json, _datetimeSerializerSettings);
case RpcDataType.Http:
return GrpcMessageExtensionUtilities.ConvertFromHttpMessageToExpando(typedData.Http);
case RpcDataType.Int:
return typedData.Int;
case RpcDataType.Double:
return typedData.Double;
case RpcDataType.None:
return null;
default:
public static object ToObject(this TypedData typedData) =>
typedData.DataCase switch
{
RpcDataType.None => null,
RpcDataType.String => typedData.String,
RpcDataType.Json => JsonConvert.DeserializeObject(typedData.Json, _datetimeSerializerSettings),
RpcDataType.Bytes or RpcDataType.Stream => typedData.Bytes.ToByteArray(),
RpcDataType.Http => GrpcMessageExtensionUtilities.ConvertFromHttpMessageToExpando(typedData.Http),
RpcDataType.Int => typedData.Int,
RpcDataType.Double => typedData.Double,
// TODO better exception
throw new InvalidOperationException("Unknown RpcDataType");
}
}

public static async Task<TypedData> ToRpc(this object value, ILogger logger, GrpcCapabilities capabilities)
{
TypedData typedData = new TypedData();
if (value == null)
{
return typedData;
}

if (value is byte[] arr)
{
typedData.Bytes = ByteString.CopyFrom(arr);
}
else if (value is JObject jobj)
{
typedData.Json = jobj.ToString(Formatting.None);
}
else if (value is string str)
{
typedData.String = str;
}
else if (value.GetType().IsArray && IsTypedDataCollectionSupported(capabilities))
{
typedData = value.ToRpcCollection();
}
else if (value is HttpRequest request)
{
typedData = await request.ToRpcHttp(logger, capabilities);
}
else
{
typedData = value.ToRpcDefault();
}

return typedData;
}

internal static TypedData ToRpcCollection(this object value)
{
TypedData typedData;
if (value is byte[][] arrBytes)
{
typedData = arrBytes.ToRpcByteArray();
}
else if (value is string[] arrStr)
{
typedData = arrStr.ToRpcStringArray();
}
else if (value is double[] arrDouble)
{
typedData = arrDouble.ToRpcDoubleArray();
}
else if (value is long[] arrLong)
{
typedData = arrLong.ToRpcLongArray();
}
else
{
typedData = value.ToRpcDefault();
}
_ => throw new InvalidOperationException($"Unknown RpcDataType: {typedData.DataCase}")
};

return typedData;
}
public static async Task<TypedData> ToRpc(this object value, ILogger logger, GrpcCapabilities capabilities) =>
value switch
{
null => new TypedData(),
byte[] arr => new TypedData() { Bytes = ByteString.CopyFrom(arr) },
JObject jobj => new TypedData() { Json = jobj.ToString(Formatting.None) },
string str => new TypedData() { String = str },
double dbl => new TypedData() { Double = dbl },
HttpRequest request => await request.ToRpcHttp(logger, capabilities),
byte[][] arrBytes when IsTypedDataCollectionSupported(capabilities) => arrBytes.ToRpcByteArray(),
string[] arrStr when IsTypedDataCollectionSupported(capabilities) => arrStr.ToRpcStringArray(),
double[] arrDouble when IsTypedDataCollectionSupported(capabilities) => arrDouble.ToRpcDoubleArray(),
long[] arrLong when IsTypedDataCollectionSupported(capabilities) => arrLong.ToRpcLongArray(),
_ => value.ToRpcDefault(),
};

internal static async Task<TypedData> ToRpcHttp(this HttpRequest request, ILogger logger, GrpcCapabilities capabilities)
{
Expand Down Expand Up @@ -406,4 +348,4 @@ public static BindingInfo ToBindingInfo(this BindingMetadata bindingMetadata)
return bindingInfo;
}
}
}
}
Expand Up @@ -14,6 +14,7 @@ public class InboundGrpcEventExtensionsTests
[Theory]
[InlineData(RpcLogCategory.System)]
[InlineData(RpcLogCategory.User)]
[InlineData(RpcLogCategory.CustomMetric)]
public void TestLogCategories(RpcLogCategory categoryToTest)
{
InboundGrpcEvent inboundEvent = new InboundGrpcEvent(Guid.NewGuid().ToString(), new Grpc.Messages.StreamingMessage
Expand Down

0 comments on commit 3e71d6e

Please sign in to comment.