Skip to content

Commit

Permalink
IoTHub Exception for Get and Patch Twin failures (#1815)
Browse files Browse the repository at this point in the history
  • Loading branch information
bikamani authored and abhipsaMisra committed Mar 19, 2021
1 parent 9d9cc77 commit d96ef56
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 23 deletions.
10 changes: 3 additions & 7 deletions e2e/test/iothub/twin/TwinE2ETests.cs
Expand Up @@ -6,6 +6,7 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.Client.Exceptions;
using Microsoft.Azure.Devices.E2ETests.Helpers;
using Microsoft.Azure.Devices.Shared;
using Microsoft.VisualStudio.TestTools.UnitTesting;
Expand Down Expand Up @@ -366,10 +367,6 @@ public async Task Twin_ClientHandlesRejectionInvalidPropertyName_MqttWs()
.ConfigureAwait(false);
}

// These tests worked earlier for Amqp and AmqpWs since it was catching a wrong exception
// To Do: Fix Update reported properties method behavior (breaking change) to wait for response
// and we should be able to enable these tests then.
[Ignore]
[LoggedTestMethod]
public async Task Twin_ClientHandlesRejectionInvalidPropertyName_Amqp()
{
Expand All @@ -378,7 +375,6 @@ public async Task Twin_ClientHandlesRejectionInvalidPropertyName_Amqp()
.ConfigureAwait(false);
}

[Ignore]
[LoggedTestMethod]
public async Task Twin_ClientHandlesRejectionInvalidPropertyName_AmqpWs()
{
Expand Down Expand Up @@ -713,12 +709,12 @@ await deviceClient
})
.ConfigureAwait(false);
}
catch (Exception)
catch (IotHubException)
{
exceptionThrown = true;
}

Assert.IsTrue(exceptionThrown, "Exception was expected, but not thrown.");
Assert.IsTrue(exceptionThrown, "IotHubException was expected for updating reported property with an invalid property name, but was not thrown.");

Twin serviceTwin = await registryManager.GetTwinAsync(testDevice.Id).ConfigureAwait(false);
Assert.IsFalse(serviceTwin.Properties.Reported.Contains(propName1));
Expand Down
2 changes: 1 addition & 1 deletion iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs
Expand Up @@ -36,7 +36,7 @@ public AmqpConnectionHolder(DeviceIdentity deviceIdentity)
public AmqpUnit CreateAmqpUnit(
DeviceIdentity deviceIdentity,
Func<MethodRequestInternal, Task> onMethodCallback,
Action<Twin, string, TwinCollection> twinMessageListener,
Action<Twin, string, TwinCollection, IotHubException> twinMessageListener,
Func<string, Message, Task> onModuleMessageReceivedCallback,
Func<Message, Task> onDeviceMessageReceivedCallback,
Action onUnitDisconnected)
Expand Down
3 changes: 2 additions & 1 deletion iothub/device/src/Transport/Amqp/AmqpConnectionPool.cs
Expand Up @@ -6,6 +6,7 @@
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Shared;
using Microsoft.Azure.Devices.Client.Transport.AmqpIoT;
using Microsoft.Azure.Devices.Client.Exceptions;

namespace Microsoft.Azure.Devices.Client.Transport.Amqp
{
Expand All @@ -18,7 +19,7 @@ internal class AmqpConnectionPool : IAmqpUnitManager
public AmqpUnit CreateAmqpUnit(
DeviceIdentity deviceIdentity,
Func<MethodRequestInternal, Task> onMethodCallback,
Action<Twin, string, TwinCollection> twinMessageListener,
Action<Twin, string, TwinCollection, IotHubException> twinMessageListener,
Func<string, Message, Task> onModuleMessageReceivedCallback,
Func<Message, Task> onDeviceMessageReceivedCallback,
Action onUnitDisconnected)
Expand Down
17 changes: 15 additions & 2 deletions iothub/device/src/Transport/Amqp/AmqpTransportHandler.cs
Expand Up @@ -6,6 +6,7 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Client.Exceptions;
using Microsoft.Azure.Devices.Client.Transport.AmqpIoT;
using Microsoft.Azure.Devices.Shared;

Expand Down Expand Up @@ -400,8 +401,13 @@ private async Task<Twin> RoundTripTwinMessageAsync(AmqpTwinMessageType amqpTwinM
await _amqpUnit.SendTwinMessageAsync(amqpTwinMessageType, correlationId, reportedProperties, _operationTimeout).ConfigureAwait(false);

var receivingTask = taskCompletionSource.Task;

if (await Task.WhenAny(receivingTask, Task.Delay(TimeSpan.FromSeconds(ResponseTimeoutInSeconds), cancellationToken)).ConfigureAwait(false) == receivingTask)
{
if ((receivingTask.Exception != null) && (receivingTask.Exception.InnerException != null))
{
throw receivingTask.Exception.InnerException;
}
// Task completed within timeout.
// Consider that the task may have faulted or been canceled.
// We re-await the task so that any exceptions/cancellation is rethrown.
Expand Down Expand Up @@ -517,7 +523,7 @@ private async Task DisposeMessageAsync(string lockToken, AmqpIoTDisposeActions o

#region Helpers

private void TwinMessageListener(Twin twin, string correlationId, TwinCollection twinCollection)
private void TwinMessageListener(Twin twin, string correlationId, TwinCollection twinCollection, IotHubException ex = default)
{
if (correlationId == null)
{
Expand All @@ -533,7 +539,14 @@ private void TwinMessageListener(Twin twin, string correlationId, TwinCollection
TaskCompletionSource<Twin> task;
if (_twinResponseCompletions.TryRemove(correlationId, out task))
{
task.SetResult(twin);
if(ex == default)
{
task.SetResult(twin);
}
else
{
task.SetException(ex);
}
}
else
{
Expand Down
8 changes: 4 additions & 4 deletions iothub/device/src/Transport/Amqp/AmqpUnit.cs
Expand Up @@ -19,7 +19,7 @@ internal class AmqpUnit : IDisposable
private readonly DeviceIdentity _deviceIdentity;

private readonly Func<MethodRequestInternal, Task> _onMethodCallback;
private readonly Action<Twin, string, TwinCollection> _twinMessageListener;
private readonly Action<Twin, string, TwinCollection, IotHubException> _twinMessageListener;
private readonly Func<string, Message, Task> _onModuleMessageReceivedCallback;
private readonly Func<Message, Task> _onDeviceMessageReceivedCallback;
private readonly IAmqpConnectionHolder _amqpConnectionHolder;
Expand Down Expand Up @@ -54,7 +54,7 @@ internal class AmqpUnit : IDisposable
DeviceIdentity deviceIdentity,
IAmqpConnectionHolder amqpConnectionHolder,
Func<MethodRequestInternal, Task> onMethodCallback,
Action<Twin, string, TwinCollection> twinMessageListener,
Action<Twin, string, TwinCollection, IotHubException> twinMessageListener,
Func<string, Message, Task> onModuleMessageReceivedCallback,
Func<Message, Task> onDeviceMessageReceivedCallback,
Action onUnitDisconnected)
Expand Down Expand Up @@ -735,13 +735,13 @@ private async Task OpenTwinSenderLinkAsync(AmqpIoTSession amqpIoTSession, string
}
}

private void OnDesiredPropertyReceived(Twin twin, string correlationId, TwinCollection twinCollection)
private void OnDesiredPropertyReceived(Twin twin, string correlationId, TwinCollection twinCollection, IotHubException ex = default)
{
Logging.Enter(this, twin, nameof(OnDesiredPropertyReceived));

try
{
_twinMessageListener?.Invoke(twin, correlationId, twinCollection);
_twinMessageListener?.Invoke(twin, correlationId, twinCollection, ex);
}
finally
{
Expand Down
3 changes: 2 additions & 1 deletion iothub/device/src/Transport/Amqp/AmqpUnitManager.cs
Expand Up @@ -6,6 +6,7 @@
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Shared;
using Microsoft.Azure.Devices.Client.Transport.AmqpIoT;
using Microsoft.Azure.Devices.Client.Exceptions;

namespace Microsoft.Azure.Devices.Client.Transport.Amqp
{
Expand All @@ -29,7 +30,7 @@ internal static AmqpUnitManager GetInstance()
public AmqpUnit CreateAmqpUnit(
DeviceIdentity deviceIdentity,
Func<MethodRequestInternal, Task> onMethodCallback,
Action<Twin, string, TwinCollection> twinMessageListener,
Action<Twin, string, TwinCollection, IotHubException> twinMessageListener,
Func<string, Message, Task> onModuleMessageReceivedCallback,
Func<Message, Task> onDeviceMessageReceivedCallback,
Action onUnitDisconnected)
Expand Down
3 changes: 2 additions & 1 deletion iothub/device/src/Transport/Amqp/IAmqpUnitManager.cs
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Client.Exceptions;
using Microsoft.Azure.Devices.Client.Transport.AmqpIoT;
using Microsoft.Azure.Devices.Shared;

Expand All @@ -13,7 +14,7 @@ internal interface IAmqpUnitManager
AmqpUnit CreateAmqpUnit(
DeviceIdentity deviceIdentity,
Func<MethodRequestInternal, Task> onMethodCallback,
Action<Twin, string, TwinCollection> twinMessageListener,
Action<Twin, string, TwinCollection, IotHubException> twinMessageListener,
Func<string, Message, Task> onModuleMessageReceivedCallback,
Func<Message, Task> onDeviceMessageReceivedCallback,
Action onUnitDisconnected);
Expand Down
14 changes: 8 additions & 6 deletions iothub/device/src/Transport/AmqpIoT/AmqpIoTReceivingLink.cs
Expand Up @@ -23,7 +23,7 @@ internal class AmqpIoTReceivingLink
private Action<Message> _onEventsReceived;
private Action<Message> _onDeviceMessageReceived;
private Action<MethodRequestInternal> _onMethodReceived;
private Action<Twin, string, TwinCollection> _onTwinMessageReceived;
private Action<Twin, string, TwinCollection, IotHubException> _onTwinMessageReceived;

public AmqpIoTReceivingLink(ReceivingAmqpLink receivingAmqpLink)
{
Expand Down Expand Up @@ -255,7 +255,7 @@ private void DisposeDelivery(AmqpMessage amqpMessage, bool settled, Accepted acc

#region Twin handling

internal void RegisterTwinListener(Action<Twin, string, TwinCollection> onDesiredPropertyReceived)
internal void RegisterTwinListener(Action<Twin, string, TwinCollection, IotHubException> onDesiredPropertyReceived)
{
_onTwinMessageReceived = onDesiredPropertyReceived;
_receivingAmqpLink.RegisterMessageListener(OnTwinChangesReceived);
Expand All @@ -280,16 +280,18 @@ private void OnTwinChangesReceived(AmqpMessage amqpMessage)
if (status >= 400)
{
// Handle failures
_onTwinMessageReceived.Invoke(null, correlationId, null);
if (correlationId.StartsWith(AmqpTwinMessageType.Get.ToString(), StringComparison.OrdinalIgnoreCase))
if (correlationId.StartsWith(AmqpTwinMessageType.Get.ToString(), StringComparison.OrdinalIgnoreCase)
|| correlationId.StartsWith(AmqpTwinMessageType.Patch.ToString(), StringComparison.OrdinalIgnoreCase))
{
string error = null;
using (var reader = new StreamReader(amqpMessage.BodyStream, System.Text.Encoding.UTF8))
{
error = reader.ReadToEnd();
};

// Retry for Http status code request timeout, Too many requests and server errors
throw new IotHubException(error, status >= 500 || status == 429 || status == 408);
var exception = new IotHubException(error, status >= 500 || status == 429 || status == 408);
_onTwinMessageReceived.Invoke(null, correlationId, null, exception);
}
}
else
Expand Down Expand Up @@ -325,7 +327,7 @@ private void OnTwinChangesReceived(AmqpMessage amqpMessage)
// This shouldn't happen
Logging.Info("Received a correlation Id for Twin operation that does not match Get, Patch or Put request", nameof(OnTwinChangesReceived));
}
_onTwinMessageReceived.Invoke(twin, correlationId, twinProperties);
_onTwinMessageReceived.Invoke(twin, correlationId, twinProperties, null);
}
}
finally
Expand Down

0 comments on commit d96ef56

Please sign in to comment.