Skip to content

Commit

Permalink
Eliminating 30 sec delay when M2M acknowledgment is interrupted by di…
Browse files Browse the repository at this point in the history
…sconnection (#6836) (#6857)

This fix is related to IcM 333616549:

- when sending higher amounts of M2M messages via MQTT to a module, it can happen (within a few hours) that reauthentication (following a token expiration) happens when a M2M message got forwarded to the module, but the ACK was not sent back.
- when a M2M message is being sent, there is a timeout of 30 sec (configurable) for the ACK
- when a connection is closed the ACK cannot be sent back
- when edge is waiting for an ACK, its message pump (to that given route/module) is blocked

The problem of the customer is that time-to-time the M2M messages get delayed for 30seconds (then it catches up with the messages accumulated during that 30seconds period). They drive some sort of dashboard and this behavior is not acceptable.

The root of the problem was that they use MQTT protocol. Around token expiration/reauthentication their message pump got stuck by the reason listed above - waiting for ACK for 30 sec when the connection was closed (the reopened in a very short time)

This fix adds a new Task to the exit-condition when the code is waiting for the ACK. So far, the two conditions were:
- the ACK is received
- a timeout occurs

Now the third condition is that when the device handler object gets closed (for any reason, e.g. because of the reauthentication)

Tested by:
- temporarily modified edge to execute token expiration every 15 second
- modified edge to fail every token check, causing the module to be disconnected every 15 seconds

Used two clients doing M2M, 1 msg/sec, the receiver client holds back the ACK for 800ms (to increase the odds to run into a disconnection)

Kept the code running for ~30 mins, the delay was not experienced anymore. Also, double-checked by temporarily added logs that the device handler was interrupted by the newly added task.

(cherry picked from commit 7a0051a)

## Azure IoT Edge PR checklist:
  • Loading branch information
vipeller committed Jan 19, 2023
1 parent 596e515 commit e32cfce
Showing 1 changed file with 9 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ namespace Microsoft.Azure.Devices.Edge.Hub.Core.Device
using System.Collections.Generic;
using System.Diagnostics;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Edge.Hub.Core.Cloud;
using Microsoft.Azure.Devices.Edge.Hub.Core.Identity;
Expand All @@ -27,6 +28,7 @@ class DeviceMessageHandler : IDeviceListener, IDeviceProxy
readonly IConnectionManager connectionManager;
readonly TimeSpan messageAckTimeout;
readonly AsyncLock serializeMessagesLock = new AsyncLock();
readonly CancellationTokenSource handlerClosed = new CancellationTokenSource();
readonly Option<string> modelId;
IDeviceProxy underlyingProxy;

Expand Down Expand Up @@ -485,7 +487,7 @@ public async Task SendMessageAsync(IMessage message, string input)
Metrics.MessageProcessingLatency(this.Identity, message);
await this.underlyingProxy.SendMessageAsync(message, input);

Task completedTask = await Task.WhenAny(taskCompletionSource.Task, Task.Delay(this.messageAckTimeout));
Task completedTask = await Task.WhenAny(taskCompletionSource.Task, Task.Delay(this.messageAckTimeout, this.handlerClosed.Token));
if (completedTask != taskCompletionSource.Task)
{
Events.MessageFeedbackTimedout(this.Identity, lockToken);
Expand All @@ -511,7 +513,7 @@ public async Task<DirectMethodResponse> InvokeMethodAsync(DirectMethodRequest re
await this.underlyingProxy.InvokeMethodAsync(request);
Events.MethodCallSentToClient(this.Identity, request.Id, request.CorrelationId);

Task completedTask = await Task.WhenAny(taskCompletion.Task, Task.Delay(request.ResponseTimeout));
Task completedTask = await Task.WhenAny(taskCompletion.Task, Task.Delay(request.ResponseTimeout, this.handlerClosed.Token));
if (completedTask != taskCompletion.Task)
{
Events.MethodResponseTimedout(this.Identity, request.Id, request.CorrelationId);
Expand All @@ -526,7 +528,11 @@ public async Task<DirectMethodResponse> InvokeMethodAsync(DirectMethodRequest re

public Task SendTwinUpdate(IMessage twin) => this.underlyingProxy.SendTwinUpdate(twin);

public Task CloseAsync(Exception ex) => this.underlyingProxy.CloseAsync(ex);
public Task CloseAsync(Exception ex)
{
this.handlerClosed.Cancel();
return this.underlyingProxy.CloseAsync(ex);
}

public void SetInactive() => this.underlyingProxy.SetInactive();

Expand Down

0 comments on commit e32cfce

Please sign in to comment.