From 5491697ebe30b91159d0ffb9a7e804da4e272769 Mon Sep 17 00:00:00 2001 From: brycewang-microsoft Date: Tue, 18 Apr 2023 12:17:49 -0700 Subject: [PATCH 1/2] fix bug where device operations not recovering after reconnect --- e2e/LongHaul/device/IotHub.cs | 6 ++++-- e2e/LongHaul/device/Parameters.cs | 2 +- e2e/LongHaul/device/Program.cs | 2 +- iothub/device/src/Transport/AmqpIot/AmqpIotSendingLink.cs | 7 +++++++ 4 files changed, 13 insertions(+), 4 deletions(-) diff --git a/e2e/LongHaul/device/IotHub.cs b/e2e/LongHaul/device/IotHub.cs index 25f42db34f..3a8ebfe4ea 100644 --- a/e2e/LongHaul/device/IotHub.cs +++ b/e2e/LongHaul/device/IotHub.cs @@ -124,7 +124,7 @@ public async Task SendTelemetryMessagesAsync(Logger logger, CancellationToken ct { await Task.Delay(s_messageLoopSleepTime, ct).ConfigureAwait(false); } - catch (TaskCanceledException) + catch (OperationCanceledException) { // App is signalled to exit _logger.Trace($"Exit signal encountered. Terminating telemetry message pump.", TraceSeverity.Verbose); @@ -169,12 +169,13 @@ public async Task SendTelemetryMessagesAsync(Logger logger, CancellationToken ct { if (pendingMessages.Count > 1) { - logger.Trace($"Sending {pendingMessages.Count} messages in bulk."); + logger.Trace($"Sending {pendingMessages.Count} telemetry messages in bulk.", TraceSeverity.Information); sw.Restart(); await _deviceClient.SendTelemetryAsync(pendingMessages, ct).ConfigureAwait(false); } else { + logger.Trace("Sending a telemetry message.", TraceSeverity.Information); sw.Restart(); await _deviceClient.SendTelemetryAsync(pendingMessages.First(), ct).ConfigureAwait(false); } @@ -206,6 +207,7 @@ public async Task ReportReadOnlyPropertiesAsync(Logger logger, CancellationToken { "TotalTelemetryMessagesSent", _totalTelemetryMessagesSent }, }; + logger.Trace($"Updating reported properties.", TraceSeverity.Information); sw.Restart(); await _deviceClient.UpdateReportedPropertiesAsync(reported, ct).ConfigureAwait(false); sw.Stop(); diff --git a/e2e/LongHaul/device/Parameters.cs b/e2e/LongHaul/device/Parameters.cs index df41807c37..a4856e9b8f 100644 --- a/e2e/LongHaul/device/Parameters.cs +++ b/e2e/LongHaul/device/Parameters.cs @@ -48,7 +48,7 @@ internal class Parameters "TransportProtocol", Default = IotHubClientTransportProtocol.Tcp, Required = false, - HelpText = "The protocol over which a transport (i.e., MQTT, AMQP) communicates.")] + HelpText = "The protocol over which a transport communicates (i.e., Tcp, WebSocket).")] public IotHubClientTransportProtocol TransportProtocol { get; set; } [Option( diff --git a/e2e/LongHaul/device/Program.cs b/e2e/LongHaul/device/Program.cs index 3b91eed5c0..8e49fafee8 100644 --- a/e2e/LongHaul/device/Program.cs +++ b/e2e/LongHaul/device/Program.cs @@ -94,7 +94,7 @@ await Task iotHub.UploadFilesAsync(s_logger.Clone(), cancellationTokenSource.Token)) .ConfigureAwait(false); } - catch (TaskCanceledException) { } // user signalled an exit + catch (OperationCanceledException) { } // user signalled an exit catch (Exception ex) { s_logger.Trace($"Device app failed with exception {ex}", TraceSeverity.Error); diff --git a/iothub/device/src/Transport/AmqpIot/AmqpIotSendingLink.cs b/iothub/device/src/Transport/AmqpIot/AmqpIotSendingLink.cs index 784af72feb..a612d0fce7 100644 --- a/iothub/device/src/Transport/AmqpIot/AmqpIotSendingLink.cs +++ b/iothub/device/src/Transport/AmqpIot/AmqpIotSendingLink.cs @@ -130,6 +130,13 @@ private async Task SendAmqpMessageAsync(AmqpMessage amqpMessage, Cancel cancellationToken) .ConfigureAwait(false); } + catch (OperationCanceledException oce) when (!cancellationToken.IsCancellationRequested) + { + // OperationCanceledException may be thrown here when there is networking disconnect + // even if cancellation has not been requested yet. This case is treated as a transient + // network error rather than an OperationCanceledException. + throw new IotHubClientException(oce.Message, IotHubClientErrorCode.NetworkErrors, oce); + } catch (Exception ex) when (!Fx.IsFatal(ex)) { Exception iotEx = AmqpIotExceptionAdapter.ConvertToIotHubException(ex, _sendingAmqpLink); From 14405d63fb633d8bf2fc9bfdeec130cadbfee773 Mon Sep 17 00:00:00 2001 From: brycewang-microsoft Date: Tue, 18 Apr 2023 14:23:55 -0700 Subject: [PATCH 2/2] fixup --- .../src/Transport/AmqpIot/AmqpIotSendingLink.cs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/iothub/device/src/Transport/AmqpIot/AmqpIotSendingLink.cs b/iothub/device/src/Transport/AmqpIot/AmqpIotSendingLink.cs index a612d0fce7..bd1c152fde 100644 --- a/iothub/device/src/Transport/AmqpIot/AmqpIotSendingLink.cs +++ b/iothub/device/src/Transport/AmqpIot/AmqpIotSendingLink.cs @@ -130,16 +130,18 @@ private async Task SendAmqpMessageAsync(AmqpMessage amqpMessage, Cancel cancellationToken) .ConfigureAwait(false); } - catch (OperationCanceledException oce) when (!cancellationToken.IsCancellationRequested) - { - // OperationCanceledException may be thrown here when there is networking disconnect - // even if cancellation has not been requested yet. This case is treated as a transient - // network error rather than an OperationCanceledException. - throw new IotHubClientException(oce.Message, IotHubClientErrorCode.NetworkErrors, oce); - } catch (Exception ex) when (!Fx.IsFatal(ex)) { Exception iotEx = AmqpIotExceptionAdapter.ConvertToIotHubException(ex, _sendingAmqpLink); + + if (iotEx is OperationCanceledException && !cancellationToken.IsCancellationRequested) + { + // OperationCanceledException may be thrown here when there is networking disconnect + // even if cancellation has not been requested yet. This case is treated as a transient + // network error rather than an OperationCanceledException. + throw new IotHubClientException(iotEx.Message, IotHubClientErrorCode.NetworkErrors, iotEx); + } + if (ReferenceEquals(ex, iotEx)) { throw;