Skip to content

Commit

Permalink
Revert "feature,fix (device-client) Handle Twin failures using Amqp (#…
Browse files Browse the repository at this point in the history
…1796)"

This reverts commit 2c01037.
  • Loading branch information
abhipsaMisra committed Mar 19, 2021
1 parent 366b8e9 commit 2886502
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 79 deletions.
6 changes: 1 addition & 5 deletions e2e/test/iothub/twin/TwinE2ETests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using Microsoft.Azure.Devices.Shared;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

namespace Microsoft.Azure.Devices.E2ETests.Twins
{
Expand Down Expand Up @@ -366,10 +367,6 @@ public async Task Twin_ClientHandlesRejectionInvalidPropertyName_MqttWs()
.ConfigureAwait(false);
}

// These tests worked earlier for Amqp and AmqpWs since it was catching a wrong exception
// To Do: Fix Update reported properties method behavior (breaking change) to wait for response
// and we should be able to enable these tests then.
[Ignore]
[LoggedTestMethod]
public async Task Twin_ClientHandlesRejectionInvalidPropertyName_Amqp()
{
Expand All @@ -378,7 +375,6 @@ public async Task Twin_ClientHandlesRejectionInvalidPropertyName_Amqp()
.ConfigureAwait(false);
}

[Ignore]
[LoggedTestMethod]
public async Task Twin_ClientHandlesRejectionInvalidPropertyName_AmqpWs()
{
Expand Down
46 changes: 20 additions & 26 deletions iothub/device/src/Transport/Amqp/AmqpTransportHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,7 @@ public override async Task EnableTwinPatchAsync(CancellationToken cancellationTo
try
{
cancellationToken.ThrowIfCancellationRequested();
string correlationId = AmqpTwinMessageType.Put + Guid.NewGuid().ToString();
await _amqpUnit.SendTwinMessageAsync(AmqpTwinMessageType.Put, correlationId, null, _operationTimeout).ConfigureAwait(false);
await _amqpUnit.SendTwinMessageAsync(AmqpTwinMessageType.Put, Guid.NewGuid().ToString(), null, _operationTimeout).ConfigureAwait(false);
}
finally
{
Expand Down Expand Up @@ -356,7 +355,7 @@ public override async Task<Twin> SendTwinGetAsync(CancellationToken cancellation
try
{
await EnableTwinPatchAsync(cancellationToken).ConfigureAwait(false);
Twin twin = await RoundTripTwinMessageAsync(AmqpTwinMessageType.Get, null, cancellationToken).ConfigureAwait(false);
Twin twin = await RoundTripTwinMessage(AmqpTwinMessageType.Get, null, cancellationToken).ConfigureAwait(false);
if (twin == null)
{
throw new InvalidOperationException("Service rejected the message");
Expand All @@ -376,19 +375,19 @@ public override async Task SendTwinPatchAsync(TwinCollection reportedProperties,
try
{
await EnableTwinPatchAsync(cancellationToken).ConfigureAwait(false);
await RoundTripTwinMessageAsync(AmqpTwinMessageType.Patch, reportedProperties, cancellationToken).ConfigureAwait(false);
Twin twin = await RoundTripTwinMessage(AmqpTwinMessageType.Patch, reportedProperties, cancellationToken).ConfigureAwait(false);
}
finally
{
Logging.Exit(this, reportedProperties, cancellationToken, $"{nameof(SendTwinPatchAsync)}");
}
}

private async Task<Twin> RoundTripTwinMessageAsync(AmqpTwinMessageType amqpTwinMessageType, TwinCollection reportedProperties, CancellationToken cancellationToken)
private async Task<Twin> RoundTripTwinMessage(AmqpTwinMessageType amqpTwinMessageType, TwinCollection reportedProperties, CancellationToken cancellationToken)
{
Logging.Enter(this, cancellationToken, $"{nameof(RoundTripTwinMessageAsync)}");
Logging.Enter(this, cancellationToken, $"{nameof(RoundTripTwinMessage)}");

string correlationId = amqpTwinMessageType + Guid.NewGuid().ToString();
string correlationId = Guid.NewGuid().ToString();
Twin response = null;

try
Expand All @@ -406,6 +405,10 @@ private async Task<Twin> RoundTripTwinMessageAsync(AmqpTwinMessageType amqpTwinM
// Consider that the task may have faulted or been canceled.
// We re-await the task so that any exceptions/cancellation is rethrown.
response = await receivingTask.ConfigureAwait(false);
if (response == null)
{
throw new InvalidOperationException("Service response is null");
}
}
else
{
Expand All @@ -416,7 +419,7 @@ private async Task<Twin> RoundTripTwinMessageAsync(AmqpTwinMessageType amqpTwinM
finally
{
_twinResponseCompletions.TryRemove(correlationId, out _);
Logging.Exit(this, cancellationToken, $"{nameof(RoundTripTwinMessageAsync)}");
Logging.Exit(this, cancellationToken, $"{nameof(RoundTripTwinMessage)}");
}

return response;
Expand Down Expand Up @@ -519,28 +522,19 @@ private async Task DisposeMessageAsync(string lockToken, AmqpIoTDisposeActions o

private void TwinMessageListener(Twin twin, string correlationId, TwinCollection twinCollection)
{
if (correlationId == null)
if (correlationId != null)
{
// This is desired property updates, so call the callback with TwinCollection.
_onDesiredStatePatchListener(twinCollection);
// It is a GET, just complete the task.
TaskCompletionSource<Twin> task;
if (_twinResponseCompletions.TryRemove(correlationId, out task))
{
task.SetResult(twin);
}
}
else
{
if (correlationId.StartsWith(AmqpTwinMessageType.Get.ToString(), StringComparison.OrdinalIgnoreCase) ||
correlationId.StartsWith(AmqpTwinMessageType.Patch.ToString(), StringComparison.OrdinalIgnoreCase))
{
// For Get and Patch, complete the task.
TaskCompletionSource<Twin> task;
if (_twinResponseCompletions.TryRemove(correlationId, out task))
{
task.SetResult(twin);
}
else
{
// This can happen if we received a message from service with correlation Id that was not set by SDK or does not exist in dictionary.
Logging.Info("Could not remove correlation id to complete the task awaiter for a twin operation.", nameof(TwinMessageListener));
}
}
// It is a PATCH, just call the callback with the TwinCollection
_onDesiredStatePatchListener(twinCollection);
}
}

Expand Down
87 changes: 39 additions & 48 deletions iothub/device/src/Transport/AmqpIoT/AmqpIoTReceivingLink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ internal class AmqpIoTReceivingLink
private Action<Message> _onEventsReceived;
private Action<Message> _onDeviceMessageReceived;
private Action<MethodRequestInternal> _onMethodReceived;
private Action<Twin, string, TwinCollection> _onTwinMessageReceived;
private Action<Twin, string, TwinCollection> _onDesiredPropertyReceived;

public AmqpIoTReceivingLink(ReceivingAmqpLink receivingAmqpLink)
{
Expand Down Expand Up @@ -257,98 +257,89 @@ private void DisposeDelivery(AmqpMessage amqpMessage, bool settled, Accepted acc

internal void RegisterTwinListener(Action<Twin, string, TwinCollection> onDesiredPropertyReceived)
{
_onTwinMessageReceived = onDesiredPropertyReceived;
_receivingAmqpLink.RegisterMessageListener(OnTwinChangesReceived);
_onDesiredPropertyReceived = onDesiredPropertyReceived;
_receivingAmqpLink.RegisterMessageListener(OnDesiredPropertyReceived);
}

private void OnTwinChangesReceived(AmqpMessage amqpMessage)
private void OnDesiredPropertyReceived(AmqpMessage amqpMessage)
{
if (Logging.IsEnabled)
{
Logging.Enter(this, amqpMessage, $"{nameof(OnTwinChangesReceived)}");
Logging.Enter(this, amqpMessage, $"{nameof(OnDesiredPropertyReceived)}");
}

try
{
_receivingAmqpLink.DisposeDelivery(amqpMessage, true, AmqpIoTConstants.AcceptedOutcome);
string correlationId = amqpMessage.Properties?.CorrelationId?.ToString();
int status = GetStatus(amqpMessage);

if (!VerifyResponseMessage(amqpMessage))
{
_onDesiredPropertyReceived.Invoke(null, correlationId, null);
}

Twin twin = null;
TwinCollection twinProperties = null;

if (status >= 400)
if (correlationId != null)
{
// Handle failures
_onTwinMessageReceived.Invoke(null, correlationId, null);
if (correlationId.StartsWith(AmqpTwinMessageType.Get.ToString(), StringComparison.OrdinalIgnoreCase))
if (amqpMessage.BodyStream != null)
{
string error = null;
using (var reader = new StreamReader(amqpMessage.BodyStream, System.Text.Encoding.UTF8))
// This a result of a GET TWIN so return (set) the full twin
using (StreamReader reader = new StreamReader(amqpMessage.BodyStream, System.Text.Encoding.UTF8))
{
error = reader.ReadToEnd();
};
// Retry for Http status code request timeout, Too many requests and server errors
throw new IotHubException(error, status >= 500 || status == 429 || status == 408);
string body = reader.ReadToEnd();
var properties = JsonConvert.DeserializeObject<TwinProperties>(body);
twin = new Twin(properties);
}
}
else
{
// This is a desired property ack from the service
twin = new Twin();
}
}
else
{
if (correlationId == null)
// No correlationId, this is a PATCH sent by the sevice so return (set) the TwinCollection

using (StreamReader reader = new StreamReader(amqpMessage.BodyStream, System.Text.Encoding.UTF8))
{
// Here we are getting desired property update notifications and want to handle it first
using var reader = new StreamReader(amqpMessage.BodyStream, System.Text.Encoding.UTF8);
string patch = reader.ReadToEnd();
twinProperties = JsonConvert.DeserializeObject<TwinCollection>(patch);
}
else if (correlationId.StartsWith(AmqpTwinMessageType.Get.ToString(), StringComparison.OrdinalIgnoreCase))
{
// This a response of a GET TWIN so return (set) the full twin
using var reader = new StreamReader(amqpMessage.BodyStream, System.Text.Encoding.UTF8);
string body = reader.ReadToEnd();
var properties = JsonConvert.DeserializeObject<TwinProperties>(body);
twin = new Twin(properties);
}
else if (correlationId.StartsWith(AmqpTwinMessageType.Patch.ToString(), StringComparison.OrdinalIgnoreCase))
{
// This can be used to coorelate success response with updating reported properties
// However currently we do not have it as request response style implementation
Logging.Info("Updated twin reported properties successfully", nameof(OnTwinChangesReceived));
}
else if (correlationId.StartsWith(AmqpTwinMessageType.Put.ToString(), StringComparison.OrdinalIgnoreCase))
{
// This is an acknowledgement received from service for subscribing to desired property updates
Logging.Info("Subscribed for twin successfully", nameof(OnTwinChangesReceived));
}
else
{
// This shouldn't happen
Logging.Info("Received a correlation Id for Twin operation that does not match Get, Patch or Put request", nameof(OnTwinChangesReceived));
}
_onTwinMessageReceived.Invoke(twin, correlationId, twinProperties);
}
_onDesiredPropertyReceived.Invoke(twin, correlationId, twinProperties);
}
finally
{
if (Logging.IsEnabled)
{
Logging.Exit(this, amqpMessage, $"{nameof(OnTwinChangesReceived)}");
Logging.Exit(this, amqpMessage, $"{nameof(OnDesiredPropertyReceived)}");
}
}
}

#endregion Twin handling

internal static int GetStatus(AmqpMessage response)
internal static bool VerifyResponseMessage(AmqpMessage response)
{
bool retVal = true;
if (response != null)
{
if (response.MessageAnnotations.Map.TryGetValue(AmqpIoTConstants.ResponseStatusName, out int status))
{
return status;
if (status >= 400)
{
retVal = false;
}
}
}
return -1;
else
{
retVal = false;
}
return retVal;
}
}
}

0 comments on commit 2886502

Please sign in to comment.