Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EdgeHub: Upstream perf improvements #1006

Merged
merged 27 commits into from
Apr 3, 2019
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
77eb78b
Fix StoringAsyncEndpointExecutor to read in parallel to processing
varunpuranik Mar 24, 2019
d52b34b
Add parallel processing to CloudEndpoint
varunpuranik Mar 24, 2019
bd33ea2
Cleanup
varunpuranik Mar 24, 2019
7c34b9c
Fix metrics calculation
varunpuranik Mar 24, 2019
1d288ad
Add StoreMessageProvider tests
varunpuranik Mar 25, 2019
214939f
Fix names
varunpuranik Mar 25, 2019
c5e5ff0
Cleanup
varunpuranik Mar 26, 2019
8a7d9dd
Add more tests
varunpuranik Mar 26, 2019
4ac8da1
Fix build
varunpuranik Mar 26, 2019
0bf6daa
Merge branch 'master' into batch3
varunpuranik Mar 26, 2019
a6daed3
Merge branch 'master' into batch3
varunpuranik Mar 26, 2019
44e4779
Fix casing
varunpuranik Mar 26, 2019
f838229
Fix tests
varunpuranik Mar 26, 2019
a4abc34
Add tests for batch
varunpuranik Mar 26, 2019
013162c
Merge branch 'batch3' of https://github.com/varunpuranik/iotedge into…
varunpuranik Mar 26, 2019
149facf
Fix stylecop issues
varunpuranik Mar 26, 2019
72e0e74
Merge branch 'master' into batch3
varunpuranik Mar 26, 2019
a2a8b8c
Fix routing tests
varunpuranik Mar 27, 2019
4ebc65b
Merge branch 'master' into batch3
varunpuranik Mar 27, 2019
7342f28
Merge branch 'master' into batch3
varunpuranik Apr 1, 2019
e228076
Increase the endpoint timeout based on Fanout factor
varunpuranik Apr 1, 2019
524518a
Fix build
varunpuranik Apr 1, 2019
09f35c9
Fix getBatchSize calculation
varunpuranik Apr 1, 2019
21e4214
Add comment to StoreMessageProvider
varunpuranik Apr 1, 2019
300172f
Merge branch 'master' into batch3
varunpuranik Apr 3, 2019
1dcef77
Merge branch 'master' into batch3
varunpuranik Apr 3, 2019
e1f143b
Merge branch 'master' into batch3
varunpuranik Apr 3, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ public class Constants

public const string IotEdgeIdentityCapability = "iotEdge";
public const string ServiceIdentityRefreshMethodName = "RefreshDeviceScopeIdentityCache";

public const long MaxMessageSize = 256 * 1024; // matches IoTHub

public static readonly Version ConfigSchemaVersion = new Version("1.0");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
<PackageReference Include="App.Metrics" Version="3.0.0" />
<PackageReference Include="JetBrains.Annotations" Version="2018.3.0" />
<PackageReference Include="Microsoft.Azure.Devices.Client" Version="1.18.1" />
<PackageReference Include="Nito.AsyncEx" Version="5.0.0-pre-05" />
<PackageReference Include="System.ValueTuple" Version="4.5.0" />
</ItemGroup>

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,29 @@ public class EndpointFactory : IEndpointFactory
readonly Core.IMessageConverter<IRoutingMessage> messageConverter;
readonly string edgeDeviceId;
readonly ConcurrentDictionary<string, Endpoint> cache;
readonly int maxBatchSize;
readonly int upstreamFanOutFactor;

public EndpointFactory(
IConnectionManager connectionManager,
Core.IMessageConverter<IRoutingMessage> messageConverter,
string edgeDeviceId)
string edgeDeviceId,
int maxBatchSize,
int upstreamFanOutFactor)
{
this.connectionManager = Preconditions.CheckNotNull(connectionManager, nameof(connectionManager));
this.messageConverter = Preconditions.CheckNotNull(messageConverter, nameof(messageConverter));
this.edgeDeviceId = Preconditions.CheckNonWhiteSpace(edgeDeviceId, nameof(edgeDeviceId));
this.cache = new ConcurrentDictionary<string, Endpoint>();
this.maxBatchSize = maxBatchSize;
this.upstreamFanOutFactor = upstreamFanOutFactor;
}

public Endpoint CreateSystemEndpoint(string endpoint)
{
if (CloudEndpointName.Equals(endpoint, StringComparison.OrdinalIgnoreCase))
{
return this.cache.GetOrAdd(CloudEndpointName, s => new CloudEndpoint("iothub", id => this.connectionManager.GetCloudConnection(id), this.messageConverter));
return this.cache.GetOrAdd(CloudEndpointName, s => new CloudEndpoint("iothub", id => this.connectionManager.GetCloudConnection(id), this.messageConverter, this.maxBatchSize, this.upstreamFanOutFactor));
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ namespace Microsoft.Azure.Devices.Edge.Hub.Core.Routing

public class RoutingEdgeHub : IEdgeHub
{
const long MaxMessageSize = 256 * 1024; // matches IoTHub
readonly Router router;
readonly Core.IMessageConverter<IRoutingMessage> messageConverter;
readonly IConnectionManager connectionManager;
Expand Down Expand Up @@ -161,9 +160,9 @@ internal void AddEdgeSystemProperties(IMessage message)
static void ValidateMessageSize(IRoutingMessage messageToBeValidated)
{
long messageSize = messageToBeValidated.Size();
if (messageSize > MaxMessageSize)
if (messageSize > Constants.MaxMessageSize)
{
throw new EdgeHubMessageTooLargeException($"Message size is {messageSize} bytes which is greater than the max size {MaxMessageSize} bytes allowed");
throw new EdgeHubMessageTooLargeException($"Message size is {messageSize} bytes which is greater than the max size {Constants.MaxMessageSize} bytes allowed");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ void RegisterRoutingModule(ContainerBuilder builder, (bool isEnabled, bool usePe
bool useV1TwinManager = this.GetConfigurationValueIfExists<string>("TwinManagerVersion")
.Map(v => v.Equals("v1", StringComparison.OrdinalIgnoreCase))
.GetOrElse(true);
int maxUpstreamBatchSize = this.configuration.GetValue("MaxUpstreamBatchSize", 10);
int upstreamFanOutFactor = this.configuration.GetValue("UpstreamFanOutFactor", 10);

builder.RegisterModule(
new RoutingModule(
Expand All @@ -151,7 +153,9 @@ void RegisterRoutingModule(ContainerBuilder builder, (bool isEnabled, bool usePe
cloudOperationTimeout,
minTwinSyncPeriod,
reportedPropertiesSyncFrequency,
useV1TwinManager));
useV1TwinManager,
maxUpstreamBatchSize,
upstreamFanOutFactor));
}

void RegisterCommonModule(ContainerBuilder builder, bool optimizeForPerformance, (bool isEnabled, bool usePersistentStorage, StoreAndForwardConfiguration config, string storagePath) storeAndForward)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public class RoutingModule : Module
readonly Option<TimeSpan> minTwinSyncPeriod;
readonly Option<TimeSpan> reportedPropertiesSyncFrequency;
readonly bool useV1TwinManager;
readonly int maxUpstreamBatchSize;
readonly int upstreamFanOutFactor;

public RoutingModule(
string iotHubName,
Expand All @@ -66,7 +68,9 @@ public class RoutingModule : Module
TimeSpan operationTimeout,
Option<TimeSpan> minTwinSyncPeriod,
Option<TimeSpan> reportedPropertiesSyncFrequency,
bool useV1TwinManager)
bool useV1TwinManager,
int maxUpstreamBatchSize,
int upstreamFanOutFactor)
{
this.iotHubName = Preconditions.CheckNonWhiteSpace(iotHubName, nameof(iotHubName));
this.edgeDeviceId = Preconditions.CheckNonWhiteSpace(edgeDeviceId, nameof(edgeDeviceId));
Expand All @@ -87,6 +91,8 @@ public class RoutingModule : Module
this.minTwinSyncPeriod = minTwinSyncPeriod;
this.reportedPropertiesSyncFrequency = reportedPropertiesSyncFrequency;
this.useV1TwinManager = useV1TwinManager;
this.maxUpstreamBatchSize = maxUpstreamBatchSize;
this.upstreamFanOutFactor = upstreamFanOutFactor;
}

protected override void Load(ContainerBuilder builder)
Expand Down Expand Up @@ -239,7 +245,7 @@ protected override void Load(ContainerBuilder builder)
{
var messageConverter = c.Resolve<Core.IMessageConverter<IRoutingMessage>>();
IConnectionManager connectionManager = await c.Resolve<Task<IConnectionManager>>();
return new EndpointFactory(connectionManager, messageConverter, this.edgeDeviceId) as IEndpointFactory;
return new EndpointFactory(connectionManager, messageConverter, this.edgeDeviceId, this.maxUpstreamBatchSize, this.upstreamFanOutFactor) as IEndpointFactory;
})
.As<Task<IEndpointFactory>>()
.SingleInstance();
Expand Down
2 changes: 2 additions & 0 deletions edge-hub/src/Microsoft.Azure.Devices.Routing.Core/Endpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ protected Endpoint(string id, string name, string iotHubName)

public abstract void LogUserMetrics(long messageCount, long latencyInMs);

public virtual int FanOutFactor => 1;

public bool Equals(Endpoint other)
{
if (ReferenceEquals(null, other))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,20 @@ namespace Microsoft.Azure.Devices.Routing.Core.Endpoints
using App.Metrics;
using App.Metrics.Counter;
using App.Metrics.Timer;
using Microsoft.Azure.Devices.Edge.Util;
using Microsoft.Azure.Devices.Routing.Core.Endpoints.StateMachine;
using Microsoft.Azure.Devices.Routing.Core.Util;
using Microsoft.Azure.Devices.Routing.Core.Util.Concurrency;
using Microsoft.Extensions.Logging;
using Nito.AsyncEx;
using static System.FormattableString;
using AsyncLock = Microsoft.Azure.Devices.Routing.Core.Util.Concurrency.AsyncLock;

public class StoringAsyncEndpointExecutor : IEndpointExecutor
{
readonly AtomicBoolean closed = new AtomicBoolean();
readonly IMessageStore messageStore;
readonly Task sendMessageTask;
readonly ManualResetEvent hasMessagesInQueue = new ManualResetEvent(true);
readonly AsyncManualResetEvent hasMessagesInQueue = new AsyncManualResetEvent(true);
readonly ICheckpointer checkpointer;
readonly AsyncEndpointExecutorOptions options;
readonly EndpointExecutorFsm machine;
Expand Down Expand Up @@ -126,19 +128,23 @@ async Task SendMessagesPump()
{
Events.StartSendMessagesPump(this);
IMessageIterator iterator = this.messageStore.GetMessageIterator(this.Endpoint.Id, this.checkpointer.Offset + 1);
int batchSize = this.options.BatchSize * this.Endpoint.FanOutFactor;
var storeMessagesProvider = new StoreMessagesProvider(iterator, this.options.BatchTimeout, batchSize);
while (!this.cts.IsCancellationRequested)
{
try
{
this.hasMessagesInQueue.WaitOne(this.options.BatchTimeout);
IMessage[] messages = (await iterator.GetNext(this.options.BatchSize)).ToArray();
await this.ProcessMessages(messages);
Events.SendMessagesSuccess(this, messages);
Metrics.DrainedCountIncrement(this.Endpoint.Id, messages.Length);

// If store has no messages, then reset the hasMessagesInQueue flag.
if (messages.Length == 0)
await this.hasMessagesInQueue.WaitAsync(this.options.BatchTimeout);
IMessage[] messages = await storeMessagesProvider.GetMessages();
if (messages.Length > 0)
{
await this.ProcessMessages(messages);
Events.SendMessagesSuccess(this, messages);
Metrics.DrainedCountIncrement(this.Endpoint.Id, messages.Length);
}
else
{
// If store has no messages, then reset the hasMessagesInQueue flag.
this.hasMessagesInQueue.Reset();
}
}
Expand Down Expand Up @@ -172,6 +178,73 @@ void Dispose(bool disposing)
}
}

// This class is used to prefetch messages from the store before they are needed.
// As soon as the previous batch is consumed, the next batch is fetched.
// A pump is started as soon as the object is created, and it keeps the messages list populated.
internal class StoreMessagesProvider
{
readonly IMessageIterator iterator;
readonly int batchSize;
readonly AsyncLock messagesLock = new AsyncLock();
readonly AsyncManualResetEvent messagesResetEvent = new AsyncManualResetEvent(true);
readonly TimeSpan timeout;
readonly Task populateTask;
List<IMessage> messagesList;

public StoreMessagesProvider(IMessageIterator iterator, TimeSpan timeout, int batchSize)
{
this.iterator = iterator;
this.batchSize = batchSize;
this.timeout = timeout;
this.messagesList = new List<IMessage>(this.batchSize);
this.populateTask = this.PopulatePump();
}

public async Task<IMessage[]> GetMessages()
{
List<IMessage> currentMessagesList;
using (await this.messagesLock.LockAsync())
{
currentMessagesList = this.messagesList;
this.messagesList = new List<IMessage>(this.batchSize);
this.messagesResetEvent.Set();
}

return currentMessagesList.ToArray();
}

async Task PopulatePump()
{
while (true)
{
try
{
await this.messagesResetEvent.WaitAsync(this.timeout);
while (this.messagesList.Count < this.batchSize)
{
int curBatchSize = this.batchSize - this.messagesList.Count;
IList<IMessage> messages = (await this.iterator.GetNext(curBatchSize)).ToList();
if (!messages.Any())
{
break;
}

using (await this.messagesLock.LockAsync())
{
this.messagesList.AddRange(messages);
}
}

this.messagesResetEvent.Reset();
}
catch (Exception e)
{
Events.ErrorInPopulatePump(e);
}
}
}
}

static class Events
{
const int IdStart = Routing.EventIds.StoringAsyncEndpointExecutor;
Expand All @@ -191,6 +264,7 @@ enum EventIds
Close,
CloseSuccess,
CloseFailure,
ErrorInPopulatePump
}

public static void AddMessageSuccess(StoringAsyncEndpointExecutor executor, long offset)
Expand Down Expand Up @@ -263,6 +337,11 @@ public static void CloseFailure(StoringAsyncEndpointExecutor executor, Exception
{
Log.LogError((int)EventIds.CloseFailure, ex, "[CloseFailure] Close failed. EndpointId: {0}, EndpointName: {1}", executor.Endpoint.Id, executor.Endpoint.Name);
}

public static void ErrorInPopulatePump(Exception ex)
{
Log.LogWarning((int)EventIds.ErrorInPopulatePump, ex, "Error in populate messages pump");
}
}

static class Metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ static async Task EnterSendingAsync(EndpointExecutorFsm thisPtr)
TimeSpan retryAfter;
ICollection<IMessage> messages = EmptyMessages;
Stopwatch stopwatch = Stopwatch.StartNew();

TimeSpan endpointTimeout = TimeSpan.FromMilliseconds(thisPtr.config.Timeout.TotalMilliseconds * thisPtr.Endpoint.FanOutFactor);
try
{
Preconditions.CheckNotNull(thisPtr.currentSendCommand);
Expand All @@ -294,7 +294,8 @@ static async Task EnterSendingAsync(EndpointExecutorFsm thisPtr)
{
ISinkResult<IMessage> result;
Events.Send(thisPtr, thisPtr.currentSendCommand.Messages, messages);
using (var cts = new CancellationTokenSource(thisPtr.config.Timeout))

using (var cts = new CancellationTokenSource(endpointTimeout))
{
result = await thisPtr.processor.ProcessAsync(messages, cts.Token);
}
Expand Down Expand Up @@ -890,16 +891,16 @@ static void SetProcessingInternalCounters(EndpointExecutorFsm fsm, string status
Log.LogError((int)EventIds.CounterFailure, "[LogEventsProcessedCounterFailed] {0}", error);
}

TimeSpan totalTime = messages.Select(m => m.DequeuedTime).Aggregate(TimeSpan.Zero, (span, time) => span + (fsm.systemTime.UtcNow - time));
long averageLatencyInMs = totalTime < TimeSpan.Zero ? 0L : (long)(totalTime.TotalMilliseconds / messages.Count);
double totalTimeMSecs = messages.Select(m => m.DequeuedTime).Aggregate(0D, (span, time) => span + (fsm.systemTime.UtcNow - time).TotalMilliseconds);
long averageLatencyInMs = totalTimeMSecs < 0 ? 0L : (long)(totalTimeMSecs / messages.Count);
myagley marked this conversation as resolved.
Show resolved Hide resolved

if (!Routing.PerfCounter.LogEventProcessingLatency(fsm.Endpoint.IotHubName, fsm.Endpoint.Name, fsm.Endpoint.Type, status, averageLatencyInMs, out error))
{
Log.LogError((int)EventIds.CounterFailure, "[LogEventProcessingLatencyCounterFailed] {0}", error);
}

TimeSpan messageE2EProcessingLatencyTotal = messages.Select(m => m.EnqueuedTime).Aggregate(TimeSpan.Zero, (span, time) => span + (fsm.systemTime.UtcNow - time));
long averageE2ELatencyInMs = messageE2EProcessingLatencyTotal < TimeSpan.Zero ? 0L : (long)(messageE2EProcessingLatencyTotal.TotalMilliseconds / messages.Count);
double messageE2EProcessingLatencyTotalMSecs = messages.Select(m => m.EnqueuedTime).Aggregate(0D, (span, time) => span + (fsm.systemTime.UtcNow - time).TotalMilliseconds);
long averageE2ELatencyInMs = messageE2EProcessingLatencyTotalMSecs < 0 ? 0L : (long)(messageE2EProcessingLatencyTotalMSecs / messages.Count);

if (!Routing.PerfCounter.LogE2EEventProcessingLatency(fsm.Endpoint.IotHubName, fsm.Endpoint.Name, fsm.Endpoint.Type, status, averageE2ELatencyInMs, out error))
{
Expand All @@ -921,8 +922,8 @@ static void SetSuccessfulEgressUserMetricCounter(EndpointExecutorFsm fsm, IColle
}

// calculate average latency
TimeSpan totalTime = messages.Select(m => m.EnqueuedTime).Aggregate(TimeSpan.Zero, (span, time) => span + (fsm.systemTime.UtcNow - time));
long averageLatencyInMs = totalTime < TimeSpan.Zero ? 0L : (long)(totalTime.TotalMilliseconds / messages.Count);
double totalTimeMSecs = messages.Select(m => m.EnqueuedTime).Aggregate(0D, (span, time) => span + (fsm.systemTime.UtcNow - time).TotalMilliseconds);
long averageLatencyInMs = totalTimeMSecs < 0 ? 0L : (long)(totalTimeMSecs / messages.Count);

fsm.Endpoint.LogUserMetrics(messages.Count, averageLatencyInMs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void CloudMessageProcessor_CloseAsyncTest()

IProcessor moduleMessageProcessor = cloudEndpoint.CreateProcessor();
Task result = moduleMessageProcessor.CloseAsync(CancellationToken.None);
Assert.Equal(TaskEx.Done, result);
Assert.Equal(Task.CompletedTask, result);
}

[Fact]
Expand Down
Loading