Skip to content

Commit

Permalink
Metrics Support for EdgeHub (take #2) (#1347)
Browse files Browse the repository at this point in the history
* Add new Metrics types

* Cleanup

* remove current metrics

* Fix usings

* Fix build

* Cleanup

* Add Metrics

* Fix bugs

* Cleanup

* Cleanup

* Cleanup fxcop

* Update edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.CloudProxy/CloudProxy.cs

Co-Authored-By: Mike Yagley <myagley@gmail.com>

* Update edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/device/DeviceMessageHandler.cs

Co-Authored-By: Mike Yagley <myagley@gmail.com>

* Update edge-util/src/Microsoft.Azure.Devices.Edge.Util/metrics/IEdgeMetrics.cs

Co-Authored-By: Mike Yagley <myagley@gmail.com>

* Update edge-util/src/Microsoft.Azure.Devices.Edge.Util/metrics/MetricsConstants.cs

Co-Authored-By: Mike Yagley <myagley@gmail.com>

* Convert metrics duration to seconds
  • Loading branch information
varunpuranik committed Jul 16, 2019
1 parent 3adf3f5 commit f93c6f3
Show file tree
Hide file tree
Showing 56 changed files with 1,266 additions and 450 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace Microsoft.Azure.Devices.Edge.Hub.Amqp.LinkHandlers
using Microsoft.Azure.Devices.Edge.Hub.Core.Device;
using Microsoft.Azure.Devices.Edge.Hub.Core.Identity;
using Microsoft.Azure.Devices.Edge.Util;
using Microsoft.Azure.Devices.Edge.Util.Metrics;
using Microsoft.Extensions.Logging;

/// <summary>
Expand Down Expand Up @@ -61,6 +62,7 @@ protected override async Task OnMessageReceived(AmqpMessage amqpMessage)
List<IMessage> outgoingMessages = messages.Select(m => this.MessageConverter.ToMessage(m)).ToList();
outgoingMessages.ForEach(this.AddMessageSystemProperties);
await this.DeviceListener.ProcessDeviceMessageBatchAsync(outgoingMessages);
Metrics.AddMessages(this.Identity, messages.Count);
Events.ProcessedMessages(messages, this);
}
catch (Exception e) when (!e.IsFatal())
Expand Down Expand Up @@ -145,5 +147,16 @@ internal static void ProcessedMessages(IList<AmqpMessage> messages, EventsLinkHa
Log.LogDebug((int)EventIds.ProcessedMessages, $"EventsLinkHandler processed {messages.Count} messages for {handler.ClientId}");
}
}

static class Metrics
{
static readonly IMetricsCounter MessagesMeter = Util.Metrics.Metrics.Instance.CreateCounter(
"messages_received",
"Messages received from clients",
new List<string> { "protocol", "id" });

public static void AddMessages(IIdentity identity, long count)
=> MessagesMeter.Increment(count, new[] { "amqp", identity.Id });
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ namespace Microsoft.Azure.Devices.Edge.Hub.Amqp.LinkHandlers
using Microsoft.Azure.Devices.Edge.Hub.Core;
using Microsoft.Azure.Devices.Edge.Hub.Core.Device;
using Microsoft.Azure.Devices.Edge.Hub.Core.Identity;
using Microsoft.Azure.Devices.Edge.Util.Metrics;

/// <summary>
/// This handler is used to send messages to modules
Expand Down Expand Up @@ -36,5 +37,22 @@ protected override async Task OnOpenAsync(TimeSpan timeout)
await base.OnOpenAsync(timeout);
await this.DeviceListener.AddSubscription(DeviceSubscription.ModuleMessages);
}

protected override void OnMessageSent(IMessage message) => Metrics.AddMessage(this.Identity, message);

static class Metrics
{
static readonly IMetricsCounter MessagesMeter = Util.Metrics.Metrics.Instance.CreateCounter(
"messages_sent",
"Messages sent to module",
new List<string> { "protocol", "from", "to" });

public static void AddMessage(IIdentity identity, IMessage message)
{
string from = message.GetSenderId();
string to = identity.Id;
MessagesMeter.Increment(1, new[] { "amqp", from, to });
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public Task SendMessage(IMessage message)
}

Events.MessageSent(this, message);
this.OnMessageSent(message);
}
catch (Exception ex)
{
Expand All @@ -68,6 +69,10 @@ public Task SendMessage(IMessage message)
return Task.CompletedTask;
}

protected virtual void OnMessageSent(IMessage message)
{
}

protected override Task OnOpenAsync(TimeSpan timeout)
{
switch (this.QualityOfService)
Expand Down
117 changes: 100 additions & 17 deletions edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.CloudProxy/CloudProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ namespace Microsoft.Azure.Devices.Edge.Hub.CloudProxy
using Microsoft.Azure.Devices.Edge.Hub.Core;
using Microsoft.Azure.Devices.Edge.Hub.Core.Cloud;
using Microsoft.Azure.Devices.Edge.Util;
using Microsoft.Azure.Devices.Edge.Util.Metrics;
using Microsoft.Azure.Devices.Shared;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
Expand Down Expand Up @@ -109,10 +110,14 @@ public async Task<IMessage> GetTwinAsync()
this.timer.Reset();
try
{
Twin twin = await this.client.GetTwinAsync();
Events.GetTwin(this);
IMessageConverter<Twin> converter = this.messageConverterProvider.Get<Twin>();
return converter.ToMessage(twin);
using (Metrics.TimeGetTwin(this.clientId))
{
Twin twin = await this.client.GetTwinAsync();
Events.GetTwin(this);
Metrics.AddGetTwin(this.clientId);
IMessageConverter<Twin> converter = this.messageConverterProvider.Get<Twin>();
return converter.ToMessage(twin);
}
}
catch (Exception ex)
{
Expand All @@ -130,8 +135,13 @@ public async Task SendMessageAsync(IMessage inputMessage)
this.timer.Reset();
try
{
await this.client.SendEventAsync(message);
Events.SendMessage(this);
using (Metrics.TimeMessageSend(this.clientId))
{
Metrics.MessageProcessingLatency(this.clientId, inputMessage);
await this.client.SendEventAsync(message);
Events.SendMessage(this);
Metrics.AddSentMessages(this.clientId, 1);
}
}
catch (Exception ex)
{
Expand All @@ -144,13 +154,22 @@ public async Task SendMessageAsync(IMessage inputMessage)
public async Task SendMessageBatchAsync(IEnumerable<IMessage> inputMessages)
{
IMessageConverter<Message> converter = this.messageConverterProvider.Get<Message>();
IEnumerable<Message> messages = Preconditions.CheckNotNull(inputMessages, nameof(inputMessages))
.Select(inputMessage => converter.FromMessage(inputMessage));
IList<Message> messages = Preconditions.CheckNotNull(inputMessages, nameof(inputMessages))
.Select(inputMessage =>
{
Metrics.MessageProcessingLatency(this.clientId, inputMessage);
return converter.FromMessage(inputMessage);
})
.ToList();
this.timer.Reset();
try
{
await this.client.SendEventBatchAsync(messages);
Events.SendMessage(this);
using (Metrics.TimeMessageSend(this.clientId))
{
await this.client.SendEventBatchAsync(messages);
Events.SendMessage(this);
Metrics.AddSentMessages(this.clientId, messages.Count);
}
}
catch (Exception ex)
{
Expand All @@ -167,8 +186,12 @@ public async Task UpdateReportedPropertiesAsync(IMessage reportedPropertiesMessa
this.timer.Reset();
try
{
await this.client.UpdateReportedPropertiesAsync(reported);
Events.UpdateReportedProperties(this);
using (Metrics.TimeReportedPropertiesUpdate(this.clientId))
{
await this.client.UpdateReportedPropertiesAsync(reported);
Metrics.AddUpdateReportedProperties(this.clientId);
Events.UpdateReportedProperties(this);
}
}
catch (Exception e)
{
Expand Down Expand Up @@ -301,7 +324,7 @@ public CloudReceiver(CloudProxy cloudProxy, ICloudListener cloudListener)
this.cloudProxy = Preconditions.CheckNotNull(cloudProxy, nameof(cloudProxy));
this.cloudListener = Preconditions.CheckNotNull(cloudListener, nameof(cloudListener));
IMessageConverter<TwinCollection> converter = cloudProxy.messageConverterProvider.Get<TwinCollection>();
this.desiredUpdateHandler = new DesiredPropertyUpdateHandler(cloudListener, converter, cloudProxy);
this.desiredUpdateHandler = new DesiredPropertyUpdateHandler(cloudListener, converter);
}

public void StartListening()
Expand Down Expand Up @@ -406,16 +429,13 @@ class DesiredPropertyUpdateHandler
{
readonly ICloudListener listener;
readonly IMessageConverter<TwinCollection> converter;
readonly CloudProxy cloudProxy;

public DesiredPropertyUpdateHandler(
ICloudListener listener,
IMessageConverter<TwinCollection> converter,
CloudProxy cloudProxy)
IMessageConverter<TwinCollection> converter)
{
this.listener = listener;
this.converter = converter;
this.cloudProxy = cloudProxy;
}

public Task OnDesiredPropertyUpdates(TwinCollection desiredProperties)
Expand Down Expand Up @@ -584,5 +604,68 @@ internal static void CloudReceiverNull(string clientId, string operation)
Log.LogWarning((int)EventIds.CloudReceiverNull, Invariant($"Cannot complete operation {operation} for device {clientId} because cloud receiver is null"));
}
}

static class Metrics
{
static readonly IMetricsTimer MessagesTimer = Util.Metrics.Metrics.Instance.CreateTimer(
"message_send_duration_seconds",
"Time taken to send a message",
new List<string> { "from", "to" });

static readonly IMetricsCounter SentMessagesCounter = Util.Metrics.Metrics.Instance.CreateCounter(
"messages_sent",
"Messages sent from edge hub",
new List<string> { "from", "to" });

static readonly IMetricsTimer GetTwinTimer = Util.Metrics.Metrics.Instance.CreateTimer(
"gettwin_duration_seconds",
"Time taken to get twin",
new List<string> { "source", "id" });

static readonly IMetricsCounter GetTwinCounter = Util.Metrics.Metrics.Instance.CreateCounter(
"gettwin",
"Get twin calls",
new List<string> { "source", "id" });

static readonly IMetricsTimer ReportedPropertiesTimer = Util.Metrics.Metrics.Instance.CreateTimer(
"reported_properties_update_duration_seconds",
"Time taken to update reported properties",
new List<string> { "target", "id" });

static readonly IMetricsCounter ReportedPropertiesCounter = Util.Metrics.Metrics.Instance.CreateCounter(
"reported_properties",
"Reported properties update calls",
new List<string> { "target", "id" });

static readonly IMetricsDuration MessagesProcessLatency = Util.Metrics.Metrics.Instance.CreateDuration(
"message_process_duration",
"Time taken to process message in EdgeHub",
new List<string> { "from", "to" });

public static IDisposable TimeMessageSend(string id) => MessagesTimer.GetTimer(new[] { id, "upstream" });

public static void AddSentMessages(string id, int count) => SentMessagesCounter.Increment(count, new[] { id, "upstream" });

public static IDisposable TimeGetTwin(string id) => GetTwinTimer.GetTimer(new[] { "upstream", id });

public static void AddGetTwin(string id) => GetTwinCounter.Increment(1, new[] { "upstream", id });

public static IDisposable TimeReportedPropertiesUpdate(string id) => ReportedPropertiesTimer.GetTimer(new[] { "upstream", id });

public static void AddUpdateReportedProperties(string id) => ReportedPropertiesCounter.Increment(1, new[] { "upstream", id });

public static void MessageProcessingLatency(string id, IMessage message)
{
if (message.SystemProperties != null
&& message.SystemProperties.TryGetValue(SystemProperties.EnqueuedTime, out string enqueuedTimeString)
&& DateTime.TryParse(enqueuedTimeString, out DateTime enqueuedTime))
{
TimeSpan duration = DateTime.UtcNow - enqueuedTime.ToUniversalTime();
MessagesProcessLatency.Set(
duration.TotalSeconds,
new[] { id, "upstream" });
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ namespace Microsoft.Azure.Devices.Edge.Hub.Core
using System.Collections.ObjectModel;
using System.Linq;
using System.Threading.Tasks;
using App.Metrics;
using App.Metrics.Gauge;
using Microsoft.Azure.Devices.Edge.Hub.Core.Cloud;
using Microsoft.Azure.Devices.Edge.Hub.Core.Device;
using Microsoft.Azure.Devices.Edge.Hub.Core.Identity;
Expand Down Expand Up @@ -37,7 +35,6 @@ public class ConnectionManager : IConnectionManager
this.maxClients = Preconditions.CheckRange(maxClients, 1, nameof(maxClients));
this.credentialsCache = Preconditions.CheckNotNull(credentialsCache, nameof(credentialsCache));
this.identityProvider = Preconditions.CheckNotNull(identityProvider, nameof(identityProvider));
Util.Metrics.RegisterGaugeCallback(() => Metrics.SetConnectedClientCountGauge(this));
}

public event EventHandler<IIdentity> CloudConnectionEstablished;
Expand Down Expand Up @@ -502,21 +499,5 @@ internal static void GetCloudConnection(IIdentity identity, Try<ICloudConnection
}
}
}

static class Metrics
{
static readonly GaugeOptions ConnectedClientGaugeOptions = new GaugeOptions
{
Name = "EdgeHubConnectedClientGauge",
MeasurementUnit = Unit.Events
};

public static void SetConnectedClientCountGauge(ConnectionManager connectionManager)
{
// Subtract EdgeHub from the list of connected clients
int connectedClients = connectionManager.GetConnectedClients().Count() - 1;
Util.Metrics.SetGauge(ConnectedClientGaugeOptions, connectedClients);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Hub.Core
{
public static class MessageHelper
{
public static string GetSenderId(this IMessage message)
{
if (message.SystemProperties.TryGetValue(SystemProperties.ConnectionDeviceId, out string deviceId))
{
return message.SystemProperties.TryGetValue(SystemProperties.ConnectionModuleId, out string moduleId)
? $"{deviceId}/{moduleId}"
: deviceId;
}

return string.Empty;
}
}
}
Loading

0 comments on commit f93c6f3

Please sign in to comment.