diff --git a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/Constants.cs b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/Constants.cs
index 38230ddbf71..a2750b07677 100644
--- a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/Constants.cs
+++ b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/Constants.cs
@@ -18,6 +18,9 @@ public class Constants
public const string IotEdgeIdentityCapability = "iotEdge";
public const string ServiceIdentityRefreshMethodName = "RefreshDeviceScopeIdentityCache";
+
+ public const long MaxMessageSize = 256 * 1024; // matches IoTHub
+
public static readonly Version ConfigSchemaVersion = new Version("1.0");
}
}
diff --git a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/Microsoft.Azure.Devices.Edge.Hub.Core.csproj b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/Microsoft.Azure.Devices.Edge.Hub.Core.csproj
index 58efcbe54ec..029fec4329b 100644
--- a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/Microsoft.Azure.Devices.Edge.Hub.Core.csproj
+++ b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/Microsoft.Azure.Devices.Edge.Hub.Core.csproj
@@ -30,7 +30,6 @@
-
diff --git a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/routing/CloudEndpoint.cs b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/routing/CloudEndpoint.cs
index 3823e067106..91e2bac4eaf 100644
--- a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/routing/CloudEndpoint.cs
+++ b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/routing/CloudEndpoint.cs
@@ -5,6 +5,7 @@ namespace Microsoft.Azure.Devices.Edge.Hub.Core.Routing
using System.Collections.Generic;
using System.Collections.Immutable;
using System.IO;
+ using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using App.Metrics;
@@ -18,6 +19,7 @@ namespace Microsoft.Azure.Devices.Edge.Hub.Core.Routing
using Microsoft.Azure.Devices.Routing.Core.Util;
using Microsoft.Extensions.Logging;
using static System.FormattableString;
+ using Constants = Microsoft.Azure.Devices.Edge.Hub.Core.Constants;
using IMessage = Microsoft.Azure.Devices.Edge.Hub.Core.IMessage;
using IRoutingMessage = Microsoft.Azure.Devices.Routing.Core.IMessage;
using ISinkResult = Microsoft.Azure.Devices.Routing.Core.ISinkResult;
@@ -28,16 +30,28 @@ public class CloudEndpoint : Endpoint
{
readonly Func>> cloudProxyGetterFunc;
readonly Core.IMessageConverter messageConverter;
-
- public CloudEndpoint(string id, Func>> cloudProxyGetterFunc, Core.IMessageConverter messageConverter)
+ readonly int maxBatchSize;
+
+ public CloudEndpoint(
+ string id,
+ Func>> cloudProxyGetterFunc,
+ Core.IMessageConverter messageConverter,
+ int maxBatchSize = 10,
+ int fanoutFactor = 10)
: base(id)
{
+ Preconditions.CheckArgument(maxBatchSize > 0, "MaxBatchSize should be greater than 0");
this.cloudProxyGetterFunc = Preconditions.CheckNotNull(cloudProxyGetterFunc);
this.messageConverter = Preconditions.CheckNotNull(messageConverter);
+ this.maxBatchSize = maxBatchSize;
+ this.FanOutFactor = fanoutFactor;
+ Events.Created(id, maxBatchSize, fanoutFactor);
}
public override string Type => this.GetType().Name;
+ public override int FanOutFactor { get; }
+
public override IProcessor CreateProcessor() => new CloudMessageProcessor(this);
public override void LogUserMetrics(long messageCount, long latencyInMs)
@@ -45,7 +59,7 @@ public override void LogUserMetrics(long messageCount, long latencyInMs)
// TODO - No-op for now
}
- class CloudMessageProcessor : IProcessor
+ internal class CloudMessageProcessor : IProcessor
{
static readonly ISet RetryableExceptions = new HashSet
{
@@ -70,41 +84,89 @@ public async Task ProcessAsync(IRoutingMessage routingMessage, Canc
{
Preconditions.CheckNotNull(routingMessage, nameof(routingMessage));
- if (token.IsCancellationRequested)
- {
- Events.CancelledProcessingMessage(routingMessage);
- var failed = new List { routingMessage };
- var sendFailureDetails = new SendFailureDetails(FailureKind.Transient, new EdgeHubConnectionException($"Cancelled sending messages to IotHub for device {this.cloudEndpoint.Id}"));
+ string id = this.GetIdentity(routingMessage);
+ ISinkResult result = await this.ProcessClientMessagesBatch(id, new List { routingMessage }, token);
+ Events.DoneProcessing(token);
+ return result;
+ }
- return new SinkResult(ImmutableList.Empty, failed, sendFailureDetails);
- }
+ public Task ProcessAsync(ICollection routingMessages, CancellationToken token)
+ {
+ Events.ProcessingMessages(Preconditions.CheckNotNull(routingMessages, nameof(routingMessages)));
+ Task syncResult = this.ProcessByClients(routingMessages, token);
+ Events.DoneProcessing(token);
+ return syncResult;
+ }
- Util.Option identity = this.GetIdentity(routingMessage);
+ public Task CloseAsync(CancellationToken token) => Task.CompletedTask;
- ISinkResult result = await identity.Match(
- id => this.ProcessAsync(routingMessage, id),
- () => this.ProcessNoIdentity(routingMessage));
+ internal static int GetBatchSize(int batchSize, long messageSize) =>
+ Math.Min((int)(Constants.MaxMessageSize / messageSize), batchSize);
- return result;
+ static bool IsRetryable(Exception ex) => ex != null && RetryableExceptions.Contains(ex.GetType());
+
+ static ISinkResult HandleNoIdentity(List routingMessages)
+ {
+ Events.InvalidMessageNoIdentity();
+ return GetSyncResultForInvalidMessages(new InvalidOperationException("Message does not contain device id"), routingMessages);
+ }
+
+ static ISinkResult HandleNoConnection(string identity, List routingMessages)
+ {
+ Events.IoTHubNotConnected(identity);
+ return GetSyncResultForFailedMessages(new EdgeHubConnectionException($"Could not get connection to IoT Hub for {identity}"), routingMessages);
+ }
+
+ static ISinkResult HandleCancelled(List routingMessages)
+ => GetSyncResultForFailedMessages(new EdgeHubConnectionException($"Cancelled sending messages to IotHub"), routingMessages);
+
+ static ISinkResult GetSyncResultForFailedMessages(Exception ex, List routingMessages)
+ {
+ var sendFailureDetails = new SendFailureDetails(FailureKind.Transient, ex);
+ return new SinkResult(ImmutableList.Empty, routingMessages, sendFailureDetails);
+ }
+
+ static ISinkResult GetSyncResultForInvalidMessages(Exception ex, List routingMessages)
+ {
+ List> invalid = routingMessages
+ .Select(m => new InvalidDetails(m, FailureKind.InvalidInput))
+ .ToList();
+ var sendFailureDetails = new SendFailureDetails(FailureKind.InvalidInput, ex);
+ return new SinkResult(ImmutableList.Empty, ImmutableList.Empty, invalid, sendFailureDetails);
}
- public async Task ProcessAsync(ICollection routingMessages, CancellationToken token)
+ async Task ProcessByClients(ICollection routingMessages, CancellationToken token)
{
- Preconditions.CheckNotNull(routingMessages, nameof(routingMessages));
+ var routingMessageGroups = (from r in routingMessages
+ group r by this.GetIdentity(r)
+ into g
+ select new { Id = g.Key, RoutingMessages = g.ToList() })
+ .ToList();
+
var succeeded = new List();
var failed = new List();
var invalid = new List>();
Devices.Routing.Core.Util.Option sendFailureDetails =
Option.None();
- Events.ProcessingMessages(routingMessages);
- foreach (IRoutingMessage routingMessage in routingMessages)
+ Events.ProcessingMessageGroups(routingMessages, routingMessageGroups.Count, this.cloudEndpoint.FanOutFactor);
+
+ foreach (var groupBatch in routingMessageGroups.Batch(this.cloudEndpoint.FanOutFactor))
{
- ISinkResult res = await this.ProcessAsync(routingMessage, token);
- succeeded.AddRange(res.Succeeded);
- failed.AddRange(res.Failed);
- invalid.AddRange(res.InvalidDetailsList);
- sendFailureDetails = res.SendFailureDetails;
+ IEnumerable>> sendTasks = groupBatch
+ .Select(item => this.ProcessClientMessages(item.Id, item.RoutingMessages, token));
+ ISinkResult[] sinkResults = await Task.WhenAll(sendTasks);
+ foreach (ISinkResult res in sinkResults)
+ {
+ succeeded.AddRange(res.Succeeded);
+ failed.AddRange(res.Failed);
+ invalid.AddRange(res.InvalidDetailsList);
+ // Different branches could have different results, but only the first one will be reported
+ if (!sendFailureDetails.HasValue)
+ {
+ sendFailureDetails = res.SendFailureDetails;
+ }
+ }
}
return new SinkResult(
@@ -114,96 +176,109 @@ public async Task ProcessAsync(ICollection routing
sendFailureDetails.GetOrElse(null));
}
- public Task CloseAsync(CancellationToken token)
+ // Process all messages for a particular client
+ async Task> ProcessClientMessages(string id, List routingMessages, CancellationToken token)
{
- // TODO - No-op
- return TaskEx.Done;
- }
-
- static bool IsRetryable(Exception ex) => ex != null && RetryableExceptions.Contains(ex.GetType());
+ var succeeded = new List();
+ var failed = new List();
+ var invalid = new List>();
+ Devices.Routing.Core.Util.Option sendFailureDetails =
+ Option.None();
- Task ProcessNoConnection(string identity, IRoutingMessage routingMessage)
- {
- Events.IoTHubNotConnected(identity);
- var failed = new List { routingMessage };
- var sendFailureDetails = new SendFailureDetails(FailureKind.Transient, new EdgeHubConnectionException("IoT Hub is not connected"));
+ // Find the maximum message size, and divide messages into largest batches
+ // not exceeding max allowed IoTHub message size.
+ long maxMessageSize = routingMessages.Select(r => r.Size()).Max();
+ int batchSize = GetBatchSize(Math.Min(this.cloudEndpoint.maxBatchSize, routingMessages.Count), maxMessageSize);
+ foreach (IEnumerable batch in routingMessages.Batch(batchSize))
+ {
+ ISinkResult res = await this.ProcessClientMessagesBatch(id, batch.ToList(), token);
+ succeeded.AddRange(res.Succeeded);
+ failed.AddRange(res.Failed);
+ invalid.AddRange(res.InvalidDetailsList);
+ sendFailureDetails = res.SendFailureDetails;
+ }
- return Task.FromResult((ISinkResult)new SinkResult(ImmutableList.Empty, failed, sendFailureDetails));
+ return new SinkResult(
+ succeeded,
+ failed,
+ invalid,
+ sendFailureDetails.GetOrElse(null));
}
- async Task> ProcessAsync(IRoutingMessage routingMessage, string identity)
+ async Task> ProcessClientMessagesBatch(string id, List routingMessages, CancellationToken token)
{
- IMessage message = this.cloudEndpoint.messageConverter.ToMessage(routingMessage);
- Util.Option cloudProxy = await this.cloudEndpoint.cloudProxyGetterFunc(identity);
+ if (string.IsNullOrEmpty(id))
+ {
+ return HandleNoIdentity(routingMessages);
+ }
+ if (token.IsCancellationRequested)
+ {
+ return HandleCancelled(routingMessages);
+ }
+
+ Util.Option cloudProxy = await this.cloudEndpoint.cloudProxyGetterFunc(id);
ISinkResult result = await cloudProxy.Match(
async cp =>
{
try
{
- using (Metrics.CloudLatency(identity))
+ List messages = routingMessages
+ .Select(r => this.cloudEndpoint.messageConverter.ToMessage(r))
+ .ToList();
+
+ using (Metrics.CloudLatency(id))
{
- await cp.SendMessageAsync(message);
+ if (messages.Count == 1)
+ {
+ await cp.SendMessageAsync(messages[0]);
+ }
+ else
+ {
+ await cp.SendMessageBatchAsync(messages);
+ }
}
- var succeeded = new List { routingMessage };
- Metrics.MessageCount(identity);
-
- return new SinkResult(succeeded);
+ Metrics.MessageCount(id, messages.Count);
+ return new SinkResult(routingMessages);
}
catch (Exception ex)
{
- return this.HandleException(ex, routingMessage);
+ return this.HandleException(ex, id, routingMessages);
}
},
- () => this.ProcessNoConnection(identity, routingMessage));
+ () => Task.FromResult(HandleNoConnection(id, routingMessages)));
return result;
}
- ISinkResult HandleException(Exception ex, IRoutingMessage routingMessage)
+ ISinkResult HandleException(Exception ex, string id, List routingMessages)
{
if (IsRetryable(ex))
{
- var failed = new List { routingMessage };
-
- Events.RetryingMessage(routingMessage, ex);
- var sendFailureDetails = new SendFailureDetails(FailureKind.Transient, new EdgeHubIOException($"Error sending messages to IotHub for device {this.cloudEndpoint.Id}"));
-
- return new SinkResult(ImmutableList.Empty, failed, sendFailureDetails);
+ Events.RetryingMessage(id, ex);
+ return GetSyncResultForFailedMessages(new EdgeHubIOException($"Error sending messages to IotHub for device {this.cloudEndpoint.Id}"), routingMessages);
}
else
{
- Events.InvalidMessage(ex);
- var invalid = new List> { new InvalidDetails(routingMessage, FailureKind.InvalidInput) };
- var sendFailureDetails = new SendFailureDetails(FailureKind.InvalidInput, ex);
-
- return new SinkResult(ImmutableList.Empty, ImmutableList.Empty, invalid, sendFailureDetails);
+ Events.InvalidMessage(id, ex);
+ return GetSyncResultForInvalidMessages(ex, routingMessages);
}
}
- Task ProcessNoIdentity(IRoutingMessage routingMessage)
- {
- Events.InvalidMessageNoIdentity();
- var invalid = new List> { new InvalidDetails(routingMessage, FailureKind.InvalidInput) };
- var sendFailureDetails = new SendFailureDetails(FailureKind.InvalidInput, new InvalidOperationException("Message does not contain client identity"));
- return Task.FromResult((ISinkResult)new SinkResult(ImmutableList.Empty, ImmutableList.Empty, invalid, sendFailureDetails));
- }
-
bool IsTransientException(Exception ex) => ex is EdgeHubIOException || ex is EdgeHubConnectionException;
- Util.Option GetIdentity(IRoutingMessage routingMessage)
+ string GetIdentity(IRoutingMessage routingMessage)
{
if (routingMessage.SystemProperties.TryGetValue(SystemProperties.ConnectionDeviceId, out string deviceId))
{
- return Util.Option.Some(
- routingMessage.SystemProperties.TryGetValue(SystemProperties.ConnectionModuleId, out string moduleId)
- ? $"{deviceId}/{moduleId}"
- : deviceId);
+ return routingMessage.SystemProperties.TryGetValue(SystemProperties.ConnectionModuleId, out string moduleId)
+ ? $"{deviceId}/{moduleId}"
+ : deviceId;
}
Events.DeviceIdNotFound(routingMessage);
- return Util.Option.None();
+ return string.Empty;
}
}
@@ -220,7 +295,9 @@ enum EventIds
InvalidMessage,
ProcessingMessages,
InvalidMessageNoIdentity,
- CancelledProcessing
+ CancelledProcessing,
+ Created,
+ DoneProcessing
}
public static void DeviceIdNotFound(IRoutingMessage routingMessage)
@@ -236,9 +313,22 @@ public static void ProcessingMessages(ICollection routingMessag
Log.LogDebug((int)EventIds.ProcessingMessages, Invariant($"Sending {routingMessages.Count} message(s) upstream."));
}
+ public static void CancelledProcessingMessages(ICollection messages)
+ {
+ if (messages.Count > 0)
+ {
+ IRoutingMessage firstMessage = messages.OrderBy(m => m.Offset).First();
+ Log.LogDebug((int)EventIds.CancelledProcessing, $"Cancelled sending messages from offset {firstMessage.Offset}");
+ }
+ else
+ {
+ Log.LogDebug((int)EventIds.CancelledProcessing, "Cancelled sending messages");
+ }
+ }
+
public static void CancelledProcessingMessage(IRoutingMessage message)
{
- Log.LogDebug((int)EventIds.CancelledProcessing, $"Cancelled sending message {message.Offset}");
+ Log.LogDebug((int)EventIds.CancelledProcessing, $"Cancelled sending messages from offset {message.Offset}");
}
public static void InvalidMessageNoIdentity()
@@ -246,33 +336,42 @@ public static void InvalidMessageNoIdentity()
Log.LogWarning((int)EventIds.InvalidMessageNoIdentity, "Cannot process message with no identity, discarding it.");
}
+ public static void ProcessingMessageGroups(ICollection routingMessages, int groups, int fanoutFactor)
+ {
+ Log.LogDebug((int)EventIds.ProcessingMessages, Invariant($"Sending {routingMessages.Count} message(s) upstream, divided into {groups} groups. Processing maximum {fanoutFactor} groups in parallel."));
+ }
+
+ public static void Created(string id, int maxbatchSize, int fanoutFactor)
+ {
+ Log.LogInformation((int)EventIds.Created, Invariant($"Created cloud endpoint {id} with max batch size {maxbatchSize} and fan-out factor of {fanoutFactor}."));
+ }
+
internal static void IoTHubNotConnected(string id)
{
- Log.LogWarning((int)EventIds.IoTHubNotConnected, Invariant($"Could not get an active Iot Hub connection for device {id}"));
+ Log.LogWarning((int)EventIds.IoTHubNotConnected, Invariant($"Could not get an active Iot Hub connection for client {id}"));
}
- internal static void RetryingMessage(IRoutingMessage message, Exception ex)
+ internal static void RetryingMessage(string id, Exception ex)
{
- if (message.SystemProperties.TryGetValue(SystemProperties.ConnectionDeviceId, out string deviceId))
- {
- string id = message.SystemProperties.TryGetValue(SystemProperties.ConnectionModuleId, out string moduleId)
- ? $"{deviceId}/{moduleId}"
- : deviceId;
+ Log.LogDebug((int)EventIds.RetryingMessages, Invariant($"Retrying sending message from {id} to Iot Hub due to exception {ex.GetType()}:{ex.Message}."));
+ }
+
+ internal static void InvalidMessage(string id, Exception ex)
+ {
+ Log.LogWarning((int)EventIds.InvalidMessage, ex, Invariant($"Non retryable exception occurred while sending message for client {id}."));
+ }
- // TODO - Add more info to this log message
- Log.LogDebug((int)EventIds.RetryingMessages, Invariant($"Retrying sending message from {id} to Iot Hub due to exception {ex.GetType()}:{ex.Message}."));
+ public static void DoneProcessing(CancellationToken token)
+ {
+ if (token.IsCancellationRequested)
+ {
+ Log.LogInformation((int)EventIds.CancelledProcessing, "Stopped sending messages to upstream as the operation was cancelled");
}
else
{
- Log.LogDebug((int)EventIds.RetryingMessages, Invariant($"Retrying sending message to Iot Hub due to exception {ex.GetType()}:{ex.Message}."));
+ Log.LogDebug((int)EventIds.DoneProcessing, "Finished processing messages to upstream");
}
}
-
- internal static void InvalidMessage(Exception ex)
- {
- // TODO - Add more info to this log message
- Log.LogWarning((int)EventIds.InvalidMessage, ex, Invariant($"Non retryable exception occurred while sending message."));
- }
}
static class Metrics
@@ -292,9 +391,11 @@ static class Metrics
RateUnit = TimeUnit.Seconds
};
- public static void MessageCount(string identity) => Util.Metrics.CountIncrement(GetTags(identity), EdgeHubToCloudMessageCountOptions, 1);
+ public static void MessageCount(string identity, int count)
+ => Util.Metrics.CountIncrement(GetTags(identity), EdgeHubToCloudMessageCountOptions, count);
- public static IDisposable CloudLatency(string identity) => Util.Metrics.Latency(GetTags(identity), EdgeHubToCloudMessageLatencyOptions);
+ public static IDisposable CloudLatency(string identity)
+ => Util.Metrics.Latency(GetTags(identity), EdgeHubToCloudMessageLatencyOptions);
static MetricTags GetTags(string id)
{
diff --git a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/routing/EndpointFactory.cs b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/routing/EndpointFactory.cs
index ff4f47dd5df..ae4b29e923f 100644
--- a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/routing/EndpointFactory.cs
+++ b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/routing/EndpointFactory.cs
@@ -16,23 +16,29 @@ public class EndpointFactory : IEndpointFactory
readonly Core.IMessageConverter messageConverter;
readonly string edgeDeviceId;
readonly ConcurrentDictionary cache;
+ readonly int maxBatchSize;
+ readonly int upstreamFanOutFactor;
public EndpointFactory(
IConnectionManager connectionManager,
Core.IMessageConverter messageConverter,
- string edgeDeviceId)
+ string edgeDeviceId,
+ int maxBatchSize,
+ int upstreamFanOutFactor)
{
this.connectionManager = Preconditions.CheckNotNull(connectionManager, nameof(connectionManager));
this.messageConverter = Preconditions.CheckNotNull(messageConverter, nameof(messageConverter));
this.edgeDeviceId = Preconditions.CheckNonWhiteSpace(edgeDeviceId, nameof(edgeDeviceId));
this.cache = new ConcurrentDictionary();
+ this.maxBatchSize = maxBatchSize;
+ this.upstreamFanOutFactor = upstreamFanOutFactor;
}
public Endpoint CreateSystemEndpoint(string endpoint)
{
if (CloudEndpointName.Equals(endpoint, StringComparison.OrdinalIgnoreCase))
{
- return this.cache.GetOrAdd(CloudEndpointName, s => new CloudEndpoint("iothub", id => this.connectionManager.GetCloudConnection(id), this.messageConverter));
+ return this.cache.GetOrAdd(CloudEndpointName, s => new CloudEndpoint("iothub", id => this.connectionManager.GetCloudConnection(id), this.messageConverter, this.maxBatchSize, this.upstreamFanOutFactor));
}
else
{
diff --git a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/routing/RoutingEdgeHub.cs b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/routing/RoutingEdgeHub.cs
index 7dbbc33dc33..3ef4a3637ae 100644
--- a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/routing/RoutingEdgeHub.cs
+++ b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/routing/RoutingEdgeHub.cs
@@ -23,7 +23,6 @@ namespace Microsoft.Azure.Devices.Edge.Hub.Core.Routing
public class RoutingEdgeHub : IEdgeHub
{
- const long MaxMessageSize = 256 * 1024; // matches IoTHub
readonly Router router;
readonly Core.IMessageConverter messageConverter;
readonly IConnectionManager connectionManager;
@@ -161,9 +160,9 @@ internal void AddEdgeSystemProperties(IMessage message)
static void ValidateMessageSize(IRoutingMessage messageToBeValidated)
{
long messageSize = messageToBeValidated.Size();
- if (messageSize > MaxMessageSize)
+ if (messageSize > Constants.MaxMessageSize)
{
- throw new EdgeHubMessageTooLargeException($"Message size is {messageSize} bytes which is greater than the max size {MaxMessageSize} bytes allowed");
+ throw new EdgeHubMessageTooLargeException($"Message size is {messageSize} bytes which is greater than the max size {Constants.MaxMessageSize} bytes allowed");
}
}
diff --git a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Service/DependencyManager.cs b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Service/DependencyManager.cs
index 269fcd6a6f4..95fafc6d2c9 100644
--- a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Service/DependencyManager.cs
+++ b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Service/DependencyManager.cs
@@ -130,6 +130,8 @@ void RegisterRoutingModule(ContainerBuilder builder, (bool isEnabled, bool usePe
bool useV1TwinManager = this.GetConfigurationValueIfExists("TwinManagerVersion")
.Map(v => v.Equals("v1", StringComparison.OrdinalIgnoreCase))
.GetOrElse(true);
+ int maxUpstreamBatchSize = this.configuration.GetValue("MaxUpstreamBatchSize", 10);
+ int upstreamFanOutFactor = this.configuration.GetValue("UpstreamFanOutFactor", 10);
builder.RegisterModule(
new RoutingModule(
@@ -151,7 +153,9 @@ void RegisterRoutingModule(ContainerBuilder builder, (bool isEnabled, bool usePe
cloudOperationTimeout,
minTwinSyncPeriod,
reportedPropertiesSyncFrequency,
- useV1TwinManager));
+ useV1TwinManager,
+ maxUpstreamBatchSize,
+ upstreamFanOutFactor));
}
void RegisterCommonModule(ContainerBuilder builder, bool optimizeForPerformance, (bool isEnabled, bool usePersistentStorage, StoreAndForwardConfiguration config, string storagePath) storeAndForward)
diff --git a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Service/modules/RoutingModule.cs b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Service/modules/RoutingModule.cs
index 0a627509b44..74de9b2d789 100644
--- a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Service/modules/RoutingModule.cs
+++ b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Service/modules/RoutingModule.cs
@@ -46,6 +46,8 @@ public class RoutingModule : Module
readonly Option minTwinSyncPeriod;
readonly Option reportedPropertiesSyncFrequency;
readonly bool useV1TwinManager;
+ readonly int maxUpstreamBatchSize;
+ readonly int upstreamFanOutFactor;
public RoutingModule(
string iotHubName,
@@ -66,7 +68,9 @@ public RoutingModule(
TimeSpan operationTimeout,
Option minTwinSyncPeriod,
Option reportedPropertiesSyncFrequency,
- bool useV1TwinManager)
+ bool useV1TwinManager,
+ int maxUpstreamBatchSize,
+ int upstreamFanOutFactor)
{
this.iotHubName = Preconditions.CheckNonWhiteSpace(iotHubName, nameof(iotHubName));
this.edgeDeviceId = Preconditions.CheckNonWhiteSpace(edgeDeviceId, nameof(edgeDeviceId));
@@ -87,6 +91,8 @@ public RoutingModule(
this.minTwinSyncPeriod = minTwinSyncPeriod;
this.reportedPropertiesSyncFrequency = reportedPropertiesSyncFrequency;
this.useV1TwinManager = useV1TwinManager;
+ this.maxUpstreamBatchSize = maxUpstreamBatchSize;
+ this.upstreamFanOutFactor = upstreamFanOutFactor;
}
protected override void Load(ContainerBuilder builder)
@@ -239,7 +245,7 @@ protected override void Load(ContainerBuilder builder)
{
var messageConverter = c.Resolve>();
IConnectionManager connectionManager = await c.Resolve>();
- return new EndpointFactory(connectionManager, messageConverter, this.edgeDeviceId) as IEndpointFactory;
+ return new EndpointFactory(connectionManager, messageConverter, this.edgeDeviceId, this.maxUpstreamBatchSize, this.upstreamFanOutFactor) as IEndpointFactory;
})
.As>()
.SingleInstance();
diff --git a/edge-hub/src/Microsoft.Azure.Devices.Routing.Core/Endpoint.cs b/edge-hub/src/Microsoft.Azure.Devices.Routing.Core/Endpoint.cs
index 7e400c94d34..678905d2293 100644
--- a/edge-hub/src/Microsoft.Azure.Devices.Routing.Core/Endpoint.cs
+++ b/edge-hub/src/Microsoft.Azure.Devices.Routing.Core/Endpoint.cs
@@ -33,6 +33,8 @@ protected Endpoint(string id, string name, string iotHubName)
public abstract void LogUserMetrics(long messageCount, long latencyInMs);
+ public virtual int FanOutFactor => 1;
+
public bool Equals(Endpoint other)
{
if (ReferenceEquals(null, other))
diff --git a/edge-hub/src/Microsoft.Azure.Devices.Routing.Core/endpoints/StoringAsyncEndpointExecutor.cs b/edge-hub/src/Microsoft.Azure.Devices.Routing.Core/endpoints/StoringAsyncEndpointExecutor.cs
index 9b38572b8f7..2044ac3a0a7 100644
--- a/edge-hub/src/Microsoft.Azure.Devices.Routing.Core/endpoints/StoringAsyncEndpointExecutor.cs
+++ b/edge-hub/src/Microsoft.Azure.Devices.Routing.Core/endpoints/StoringAsyncEndpointExecutor.cs
@@ -9,18 +9,20 @@ namespace Microsoft.Azure.Devices.Routing.Core.Endpoints
using App.Metrics;
using App.Metrics.Counter;
using App.Metrics.Timer;
+ using Microsoft.Azure.Devices.Edge.Util;
using Microsoft.Azure.Devices.Routing.Core.Endpoints.StateMachine;
- using Microsoft.Azure.Devices.Routing.Core.Util;
using Microsoft.Azure.Devices.Routing.Core.Util.Concurrency;
using Microsoft.Extensions.Logging;
+ using Nito.AsyncEx;
using static System.FormattableString;
+ using AsyncLock = Microsoft.Azure.Devices.Routing.Core.Util.Concurrency.AsyncLock;
public class StoringAsyncEndpointExecutor : IEndpointExecutor
{
readonly AtomicBoolean closed = new AtomicBoolean();
readonly IMessageStore messageStore;
readonly Task sendMessageTask;
- readonly ManualResetEvent hasMessagesInQueue = new ManualResetEvent(true);
+ readonly AsyncManualResetEvent hasMessagesInQueue = new AsyncManualResetEvent(true);
readonly ICheckpointer checkpointer;
readonly AsyncEndpointExecutorOptions options;
readonly EndpointExecutorFsm machine;
@@ -126,19 +128,23 @@ async Task SendMessagesPump()
{
Events.StartSendMessagesPump(this);
IMessageIterator iterator = this.messageStore.GetMessageIterator(this.Endpoint.Id, this.checkpointer.Offset + 1);
+ int batchSize = this.options.BatchSize * this.Endpoint.FanOutFactor;
+ var storeMessagesProvider = new StoreMessagesProvider(iterator, this.options.BatchTimeout, batchSize);
while (!this.cts.IsCancellationRequested)
{
try
{
- this.hasMessagesInQueue.WaitOne(this.options.BatchTimeout);
- IMessage[] messages = (await iterator.GetNext(this.options.BatchSize)).ToArray();
- await this.ProcessMessages(messages);
- Events.SendMessagesSuccess(this, messages);
- Metrics.DrainedCountIncrement(this.Endpoint.Id, messages.Length);
-
- // If store has no messages, then reset the hasMessagesInQueue flag.
- if (messages.Length == 0)
+ await this.hasMessagesInQueue.WaitAsync(this.options.BatchTimeout);
+ IMessage[] messages = await storeMessagesProvider.GetMessages();
+ if (messages.Length > 0)
{
+ await this.ProcessMessages(messages);
+ Events.SendMessagesSuccess(this, messages);
+ Metrics.DrainedCountIncrement(this.Endpoint.Id, messages.Length);
+ }
+ else
+ {
+ // If store has no messages, then reset the hasMessagesInQueue flag.
this.hasMessagesInQueue.Reset();
}
}
@@ -172,6 +178,73 @@ void Dispose(bool disposing)
}
}
+ // This class is used to prefetch messages from the store before they are needed.
+ // As soon as the previous batch is consumed, the next batch is fetched.
+ // A pump is started as soon as the object is created, and it keeps the messages list populated.
+ internal class StoreMessagesProvider
+ {
+ readonly IMessageIterator iterator;
+ readonly int batchSize;
+ readonly AsyncLock messagesLock = new AsyncLock();
+ readonly AsyncManualResetEvent messagesResetEvent = new AsyncManualResetEvent(true);
+ readonly TimeSpan timeout;
+ readonly Task populateTask;
+ List messagesList;
+
+ public StoreMessagesProvider(IMessageIterator iterator, TimeSpan timeout, int batchSize)
+ {
+ this.iterator = iterator;
+ this.batchSize = batchSize;
+ this.timeout = timeout;
+ this.messagesList = new List(this.batchSize);
+ this.populateTask = this.PopulatePump();
+ }
+
+ public async Task GetMessages()
+ {
+ List currentMessagesList;
+ using (await this.messagesLock.LockAsync())
+ {
+ currentMessagesList = this.messagesList;
+ this.messagesList = new List(this.batchSize);
+ this.messagesResetEvent.Set();
+ }
+
+ return currentMessagesList.ToArray();
+ }
+
+ async Task PopulatePump()
+ {
+ while (true)
+ {
+ try
+ {
+ await this.messagesResetEvent.WaitAsync(this.timeout);
+ while (this.messagesList.Count < this.batchSize)
+ {
+ int curBatchSize = this.batchSize - this.messagesList.Count;
+ IList messages = (await this.iterator.GetNext(curBatchSize)).ToList();
+ if (!messages.Any())
+ {
+ break;
+ }
+
+ using (await this.messagesLock.LockAsync())
+ {
+ this.messagesList.AddRange(messages);
+ }
+ }
+
+ this.messagesResetEvent.Reset();
+ }
+ catch (Exception e)
+ {
+ Events.ErrorInPopulatePump(e);
+ }
+ }
+ }
+ }
+
static class Events
{
const int IdStart = Routing.EventIds.StoringAsyncEndpointExecutor;
@@ -191,6 +264,7 @@ enum EventIds
Close,
CloseSuccess,
CloseFailure,
+ ErrorInPopulatePump
}
public static void AddMessageSuccess(StoringAsyncEndpointExecutor executor, long offset)
@@ -263,6 +337,11 @@ public static void CloseFailure(StoringAsyncEndpointExecutor executor, Exception
{
Log.LogError((int)EventIds.CloseFailure, ex, "[CloseFailure] Close failed. EndpointId: {0}, EndpointName: {1}", executor.Endpoint.Id, executor.Endpoint.Name);
}
+
+ public static void ErrorInPopulatePump(Exception ex)
+ {
+ Log.LogWarning((int)EventIds.ErrorInPopulatePump, ex, "Error in populate messages pump");
+ }
}
static class Metrics
diff --git a/edge-hub/src/Microsoft.Azure.Devices.Routing.Core/endpoints/statemachine/EndpointExecutorFsm.cs b/edge-hub/src/Microsoft.Azure.Devices.Routing.Core/endpoints/statemachine/EndpointExecutorFsm.cs
index fcb1a552743..744ed71d60a 100644
--- a/edge-hub/src/Microsoft.Azure.Devices.Routing.Core/endpoints/statemachine/EndpointExecutorFsm.cs
+++ b/edge-hub/src/Microsoft.Azure.Devices.Routing.Core/endpoints/statemachine/EndpointExecutorFsm.cs
@@ -284,7 +284,7 @@ static async Task EnterSendingAsync(EndpointExecutorFsm thisPtr)
TimeSpan retryAfter;
ICollection messages = EmptyMessages;
Stopwatch stopwatch = Stopwatch.StartNew();
-
+ TimeSpan endpointTimeout = TimeSpan.FromMilliseconds(thisPtr.config.Timeout.TotalMilliseconds * thisPtr.Endpoint.FanOutFactor);
try
{
Preconditions.CheckNotNull(thisPtr.currentSendCommand);
@@ -294,7 +294,8 @@ static async Task EnterSendingAsync(EndpointExecutorFsm thisPtr)
{
ISinkResult result;
Events.Send(thisPtr, thisPtr.currentSendCommand.Messages, messages);
- using (var cts = new CancellationTokenSource(thisPtr.config.Timeout))
+
+ using (var cts = new CancellationTokenSource(endpointTimeout))
{
result = await thisPtr.processor.ProcessAsync(messages, cts.Token);
}
@@ -890,16 +891,16 @@ static void SetProcessingInternalCounters(EndpointExecutorFsm fsm, string status
Log.LogError((int)EventIds.CounterFailure, "[LogEventsProcessedCounterFailed] {0}", error);
}
- TimeSpan totalTime = messages.Select(m => m.DequeuedTime).Aggregate(TimeSpan.Zero, (span, time) => span + (fsm.systemTime.UtcNow - time));
- long averageLatencyInMs = totalTime < TimeSpan.Zero ? 0L : (long)(totalTime.TotalMilliseconds / messages.Count);
+ double totalTimeMSecs = messages.Select(m => m.DequeuedTime).Aggregate(0D, (span, time) => span + (fsm.systemTime.UtcNow - time).TotalMilliseconds);
+ long averageLatencyInMs = totalTimeMSecs < 0 ? 0L : (long)(totalTimeMSecs / messages.Count);
if (!Routing.PerfCounter.LogEventProcessingLatency(fsm.Endpoint.IotHubName, fsm.Endpoint.Name, fsm.Endpoint.Type, status, averageLatencyInMs, out error))
{
Log.LogError((int)EventIds.CounterFailure, "[LogEventProcessingLatencyCounterFailed] {0}", error);
}
- TimeSpan messageE2EProcessingLatencyTotal = messages.Select(m => m.EnqueuedTime).Aggregate(TimeSpan.Zero, (span, time) => span + (fsm.systemTime.UtcNow - time));
- long averageE2ELatencyInMs = messageE2EProcessingLatencyTotal < TimeSpan.Zero ? 0L : (long)(messageE2EProcessingLatencyTotal.TotalMilliseconds / messages.Count);
+ double messageE2EProcessingLatencyTotalMSecs = messages.Select(m => m.EnqueuedTime).Aggregate(0D, (span, time) => span + (fsm.systemTime.UtcNow - time).TotalMilliseconds);
+ long averageE2ELatencyInMs = messageE2EProcessingLatencyTotalMSecs < 0 ? 0L : (long)(messageE2EProcessingLatencyTotalMSecs / messages.Count);
if (!Routing.PerfCounter.LogE2EEventProcessingLatency(fsm.Endpoint.IotHubName, fsm.Endpoint.Name, fsm.Endpoint.Type, status, averageE2ELatencyInMs, out error))
{
@@ -921,8 +922,8 @@ static void SetSuccessfulEgressUserMetricCounter(EndpointExecutorFsm fsm, IColle
}
// calculate average latency
- TimeSpan totalTime = messages.Select(m => m.EnqueuedTime).Aggregate(TimeSpan.Zero, (span, time) => span + (fsm.systemTime.UtcNow - time));
- long averageLatencyInMs = totalTime < TimeSpan.Zero ? 0L : (long)(totalTime.TotalMilliseconds / messages.Count);
+ double totalTimeMSecs = messages.Select(m => m.EnqueuedTime).Aggregate(0D, (span, time) => span + (fsm.systemTime.UtcNow - time).TotalMilliseconds);
+ long averageLatencyInMs = totalTimeMSecs < 0 ? 0L : (long)(totalTimeMSecs / messages.Count);
fsm.Endpoint.LogUserMetrics(messages.Count, averageLatencyInMs);
}
diff --git a/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/routing/CloudEndpointTest.cs b/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/routing/CloudEndpointTest.cs
index 255d908b9da..eb538a72672 100644
--- a/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/routing/CloudEndpointTest.cs
+++ b/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/routing/CloudEndpointTest.cs
@@ -62,7 +62,7 @@ public void CloudMessageProcessor_CloseAsyncTest()
IProcessor moduleMessageProcessor = cloudEndpoint.CreateProcessor();
Task result = moduleMessageProcessor.CloseAsync(CancellationToken.None);
- Assert.Equal(TaskEx.Done, result);
+ Assert.Equal(Task.CompletedTask, result);
}
[Fact]
diff --git a/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/routing/CloudMessageProcessorTests.cs b/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/routing/CloudMessageProcessorTests.cs
index b8d1d543761..9c4cc52ac55 100644
--- a/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/routing/CloudMessageProcessorTests.cs
+++ b/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/routing/CloudMessageProcessorTests.cs
@@ -3,6 +3,7 @@ namespace Microsoft.Azure.Devices.Edge.Hub.Core.Test.Routing
{
using System;
using System.Collections.Generic;
+ using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
@@ -95,7 +96,7 @@ Task