Skip to content

Commit

Permalink
Aggregate metrics before upload (#3482)
Browse files Browse the repository at this point in the history
This will aggregate metrics before the are uploaded. This is because many metrics have a high cardnality, since they have a new entry for each module and/or each rout input/output. 

since we hash those values anyway, pre-aggregating can save a significant amount of bandwidth.

It also updates netstandard to 2.1 to use Hashcode.Combine
Microsoft.Azure.WebJobs.Extensions.EdgeHub.csproj is not updated because it relies on older libraries.
  • Loading branch information
lfitchett committed Sep 10, 2020
1 parent fc1dbb9 commit bdd5cc8
Show file tree
Hide file tree
Showing 14 changed files with 628 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,25 +41,12 @@ public bool Equals(Metric other)
/// <returns>Hash of name and tags.</returns>
int GetMetricKey()
{
return CombineHash(this.Name.GetHashCode(), GetOrderIndependentHash(this.Tags));
return HashCode.Combine(this.Name.GetHashCode(), GetOrderIndependentHash(this.Tags));
}

static int GetOrderIndependentHash<T1, T2>(IEnumerable<KeyValuePair<T1, T2>> dictionary)
{
return CombineHash(dictionary.Select(o => CombineHash(o.Key.GetHashCode(), o.Value.GetHashCode())).OrderBy(h => h).ToArray());
}

// TODO: replace with "return HashCode.Combine();"
// when upgraded to .net standard 2.1: https://docs.microsoft.com/en-us/dotnet/api/system.hashcode.combine?view=netstandard-2.1
static int CombineHash(params int[] hashes)
{
int hash = 17;
foreach (int h in hashes)
{
hash = hash * 31 + h;
}

return hash;
return dictionary.Select(o => HashCode.Combine(o.Key.GetHashCode(), o.Value.GetHashCode())).OrderBy(h => h).Aggregate(0, HashCode.Combine);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ namespace Microsoft.Azure.Devices.Edge.Agent.Diagnostics
using Microsoft.Azure.Devices.Edge.Agent.Diagnostics.Publisher;
using Microsoft.Azure.Devices.Edge.Agent.Diagnostics.Storage;
using Microsoft.Azure.Devices.Edge.Agent.Diagnostics.Util;
using Microsoft.Azure.Devices.Edge.Agent.Diagnostics.Util.Aggregation;
using Microsoft.Azure.Devices.Edge.Util;
using Microsoft.Azure.Devices.Edge.Util.Concurrency;
using Microsoft.Azure.Devices.Edge.Util.Metrics;
Expand All @@ -29,6 +30,7 @@ public class MetricsWorker : IDisposable
readonly AsyncLock scrapeUploadLock = new AsyncLock();
static readonly ILogger Log = Logger.Factory.CreateLogger<MetricsScraper>();
readonly MetricTransformer metricFilter;
readonly MetricAggregator metricAggregator;

PeriodicTask scrape;
PeriodicTask upload;
Expand All @@ -53,6 +55,59 @@ public MetricsWorker(IMetricsScraper scraper, IMetricsStorage storage, IMetricsP
("from", name => name.CreateSha256()),
("to_route_input", name => name.CreateSha256()),
("from_route_output", name => name.CreateSha256()));

#pragma warning disable SA1111 // Closing parenthesis should be on line of last parameter
this.metricAggregator = new MetricAggregator(
new AggregationTemplate("edgehub_gettwin_total", "id", new Summer()),
new AggregationTemplate(
"edgehub_messages_received_total",
("route_output", new Summer()),
("id", new Summer())
),
new AggregationTemplate(
"edgehub_messages_sent_total",
("from", new Summer()),
("to", new Summer()),
("from_route_output", new Summer()),
("to_route_input", new Summer())
),
new AggregationTemplate(
new string[]
{
"edgehub_message_size_bytes",
"edgehub_message_size_bytes_sum",
"edgehub_message_size_bytes_count"
},
"id",
new Averager()),
new AggregationTemplate(
new string[]
{
"edgehub_message_process_duration_seconds",
"edgehub_message_process_duration_seconds_sum",
"edgehub_message_process_duration_seconds_count",
},
("from", new Averager()),
("to", new Averager())
),
new AggregationTemplate(
"edgehub_direct_methods_total",
("from", new Summer()),
("to", new Summer())
),
new AggregationTemplate("edgehub_queue_length", "endpoint", new Summer()),
new AggregationTemplate(
new string[]
{
"edgehub_messages_dropped_total",
"edgehub_messages_unack_total",
},
("from", new Summer()),
("from_route_output", new Summer())
),
new AggregationTemplate("edgehub_client_connect_failed_total", "id", new Summer())
);
#pragma warning restore SA1111 // Closing parenthesis should be on line of last parameter
}

public void Start(TimeSpan scrapingInterval, TimeSpan uploadInterval)
Expand All @@ -68,7 +123,10 @@ internal async Task Scrape(CancellationToken cancellationToken)
{
Log.LogInformation("Scraping Metrics");
IEnumerable<Metric> scrapedMetrics = await this.scraper.ScrapeEndpointsAsync(cancellationToken);

scrapedMetrics = this.metricFilter.TransformMetrics(scrapedMetrics);
scrapedMetrics = this.metricAggregator.AggregateMetrics(scrapedMetrics);

Log.LogInformation("Storing Metrics");
await this.storage.StoreMetricsAsync(scrapedMetrics);
Log.LogInformation("Scraped and Stored Metrics");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright (c) Microsoft. All rights reserved.

namespace Microsoft.Azure.Devices.Edge.Agent.Diagnostics.Util.Aggregation
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

// This class is used to temporaraly compare similar metrics. It doesn't include values, and
// the aggregate tag is removed
public class AggregateMetric
{
public DateTime TimeGeneratedUtc { get; }
public string Name { get; }
public IReadOnlyDictionary<string, string> Tags { get; }

Lazy<int> hash;

public AggregateMetric(Metric metric, string aggregateTag)
{
this.Name = metric.Name;
this.TimeGeneratedUtc = metric.TimeGeneratedUtc;
this.Tags = new Dictionary<string, string>(metric.Tags.Where(t => t.Key != aggregateTag));

this.hash = new Lazy<int>(this.Hash);
}

public Metric ToMetric(double value)
{
return new Metric(this.TimeGeneratedUtc, this.Name, value, this.Tags);
}

public override bool Equals(object other)
{
return this.GetHashCode() == other.GetHashCode();
}

public override int GetHashCode()
{
return this.hash.Value;
}

int Hash()
{
return HashCode.Combine(
this.Name.GetHashCode(),
this.TimeGeneratedUtc.GetHashCode(),
this.Tags.Select(o => HashCode.Combine(
o.Key.GetHashCode(),
o.Value.GetHashCode()))
.OrderBy(h => h)
.Aggregate(0, HashCode.Combine));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (c) Microsoft. All rights reserved.

namespace Microsoft.Azure.Devices.Edge.Agent.Diagnostics.Util.Aggregation
{
using System;
using System.Collections.Generic;
using System.Text;

public class AggregationTemplate
{
public IEnumerable<string> TargetMetricNames { get; }

public (string targetTag, IAggregator aggregator)[] TagsToAggregate { get; }

public AggregationTemplate(string targetName, string targetTag, IAggregator aggregator)
: this(new string[] { targetName }, (targetTag, aggregator))
{
}

public AggregationTemplate(string targetName, params (string targetTag, IAggregator aggregator)[] tagsToAggregate)
: this(new string[] { targetName }, tagsToAggregate)
{
}

public AggregationTemplate(IEnumerable<string> targeMetricNames, string targetTag, IAggregator aggregator)
: this(targeMetricNames, (targetTag, aggregator))
{
}

public AggregationTemplate(IEnumerable<string> targeMetricNames, params (string targetTag, IAggregator aggregator)[] tagsToAggregate)
{
this.TargetMetricNames = targeMetricNames;
this.TagsToAggregate = tagsToAggregate;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright (c) Microsoft. All rights reserved.

namespace Microsoft.Azure.Devices.Edge.Agent.Diagnostics.Util.Aggregation
{
using System;
using System.Collections.Generic;
using System.Text;

public class Summer : IAggregator
{
double sum = 0;

public IAggregator New()
{
return new Summer();
}

public void PutValue(double value)
{
this.sum += value;
}

public double GetAggregate()
{
return this.sum;
}
}

public class Multiplier : IAggregator
{
double product = 1;

public IAggregator New()
{
return new Multiplier();
}

public void PutValue(double value)
{
this.product *= value;
}

public double GetAggregate()
{
return this.product;
}
}

public class Averager : IAggregator
{
double sum = 0;
double count = 0;

public IAggregator New()
{
return new Averager();
}

public void PutValue(double value)
{
this.sum += value;
this.count++;
}

public double GetAggregate()
{
return this.sum / this.count;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright (c) Microsoft. All rights reserved.

namespace Microsoft.Azure.Devices.Edge.Agent.Diagnostics.Util.Aggregation
{
using System;
using System.Collections.Generic;
using System.Text;

public interface IAggregator
{
public void PutValue(double value);

public double GetAggregate();

public IAggregator New();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright (c) Microsoft. All rights reserved.

namespace Microsoft.Azure.Devices.Edge.Agent.Diagnostics.Util.Aggregation
{
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Akka.Event;
using Microsoft.Azure.Devices.Edge.Util;
using Newtonsoft.Json;

/// <summary>
/// This class acts as a pass through for a group of metrics.
/// It will aggregate metrics that share a given tag.
/// </summary>
public class MetricAggregator
{
AggregationTemplate[] metricsToAggregate;

public MetricAggregator(params AggregationTemplate[] metricsToAggregate)
{
this.metricsToAggregate = metricsToAggregate;
}

// Will aggregate all metrics for all aggregtion templates
public IEnumerable<Metric> AggregateMetrics(IEnumerable<Metric> metrics)
{
// Aggregate is way overused in this class, but this Aggregate function is from linq
return this.metricsToAggregate.Aggregate(metrics, this.AggregateMetric);
}

// Will aggregate metrics for a single aggregation template
IEnumerable<Metric> AggregateMetric(IEnumerable<Metric> metrics, AggregationTemplate aggregation)
{
return aggregation.TagsToAggregate.Aggregate(metrics, (m, tagAggregation) => this.AggregateTag(m, aggregation.TargetMetricNames, tagAggregation.targetTag, tagAggregation.aggregator));
}

// Will aggregate metrics for a single tag of a single template
IEnumerable<Metric> AggregateTag(IEnumerable<Metric> metrics, IEnumerable<string> targetMetricNames, string targetTag, IAggregator aggregator)
{
var aggregateValues = new DefaultDictionary<AggregateMetric, IAggregator>(_ => aggregator.New());
foreach (Metric metric in metrics)
{
// if metric is the aggregation target and it has a tag that should be aggregated
if (targetMetricNames.Contains(metric.Name) && metric.Tags.ContainsKey(targetTag))
{
var aggregateMetric = new AggregateMetric(metric, targetTag);
aggregateValues[aggregateMetric].PutValue(metric.Value);
}
else
{
yield return metric;
}
}

// aggregate all and construct new metrics from result
foreach (var aggregatePair in aggregateValues)
{
double aggregatedValue = aggregatePair.Value.GetAggregate();
yield return aggregatePair.Key.ToMetric(aggregatedValue);
}
}
}
}
Loading

0 comments on commit bdd5cc8

Please sign in to comment.