-
Notifications
You must be signed in to change notification settings - Fork 494
Description
Context
- OS: Windows Server 2022 Datacenter 21H2
- .NET framework version: .NET 6.0
- Device: Azure VM
- SDK: Microsoft.Azure.Devices.Client 1.40 originally and patched to 1.41 and still exhibiting the problem.
Description of the issue
We have a service whose only job is to read messages from files in a directory and write the contents of those files to IoT hub messages. It has been in service at dozens of sites for several years transferring gigabytes a day without seeing this problem. However, starting a day ago, it seems two IoT hubs have entered a state which seems to cause a message sent with SendEventAsync to block / loop, and consume all available memory. In seconds the process goes from using 70M of memory to 25G and either has to be killed off or the machine rebooted. This only seems to occur under particular circumstances:
- There must be multiple threads calling SendEventAsync at once - when only one thread calls it, the problem doesn't occur. (Our default installation has 8 threads calling SendEventAsync.)
- DotNetty trace logging cannot be enabled. If I run a trace with all except DotNetty from https://github.com/Azure/azure-iot-sdk-csharp/blob/main/tools/CaptureLogs/iot_providers.txt, the problem recurs. If I run a trace with only DotNetty, or with DotNetty and any others, it does not.
At these two sites it takes five minutes or less for the service to enter the "consume all memory" state. The sites appear to be having other probably related problems. The service starts up, sends for about a minute, then logs that it's been disconnected from IoT hub and is reconnecting. A minute later it starts sending again, and a minute after that gets disconnected again. We are still working on getting access to IoT hub logs.
Code sample exhibiting the issue
It's impossible to isolate this to code since the issue is only happening at a couple of production sites and we don't have access to experiment with those IoT hubs. Relevant code included below. The service involves pulling from a BlockingCollection of IQueueMessages and awaiting a call to SendToHub.
private const int MaxMessageSize = 262144;
public bool IsSending { get; private set; }
private DeviceClient _IOTHubClient;
private ILogger<IotHubClient> _log;
private SemaphoreSlim _ioHubLock;
private DateTime firstDisabled;
public event EventHandler UnrecoverableError;
private int messagesSent;
private int bytesSent;
public IotHubClient(ILogger<IotHubClient> log)
{
_log = log;
_ioHubLock = new SemaphoreSlim(1, 1);
IsSending = true; //so that we at least try once to process some files on startup
}
public string ConnectionString { get; set; }
public string ContentType { get; set; }
public string ContentEncoding { get; set; }
public IDictionary<string, string> Properties { get; set; }
private async Task ConnectToHub()
{
if (_IOTHubClient == null)
{
try
{
await _ioHubLock.WaitAsync();
if (_IOTHubClient == null)
{
(string host, string device) = GetHostAndDevice(ConnectionString);
_log.LogDebug("IOTHubConsumer Store And Forward created for host {host}, device {device}", host, device);
DeviceClient client = DeviceClient.CreateFromConnectionString(ConnectionString, TransportType.Mqtt);
await client.OpenAsync();
IRetryPolicy retryPolicy = new ExponentialBackoff(int.MaxValue, TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(10), TimeSpan.FromMilliseconds(100));
client.SetRetryPolicy(retryPolicy);
client.SetConnectionStatusChangesHandler(ConnectionStatusChangedHandler);
_IOTHubClient = client;
_log.LogDebug("Successfully opened IOT hub");
firstDisabled = DateTime.MaxValue;
IsSending = true;
}
}
catch (Exception ex)
{
_log.LogError(ex, "Exception opening IOT hub");
_IOTHubClient = null;
throw;
}
finally
{
_ioHubLock.Release();
}
}
}
private async Task DisconnectClient()
{
IsSending = false;
DeviceClient local = _IOTHubClient;
_IOTHubClient = null;
if (local != null)
{
await local.CloseAsync();
local.Dispose();
}
}
private Task Bail()
{
IsSending = false;
UnrecoverableError?.Invoke(this, new EventArgs());
return Task.CompletedTask;
}
private async void ConnectionStatusChangedHandler(ConnectionStatus status, ConnectionStatusChangeReason reason)
{
switch (status)
{
case ConnectionStatus.Connected:
_log.LogDebug("Successfully connected to IOT hub");
IsSending = true;
break;
case ConnectionStatus.Disconnected_Retrying:
_log.LogDebug("IOT hub disconnected - retrying.");
IsSending = false;
break;
case ConnectionStatus.Disabled:
_log.LogInformation("IOT hub disabled - reconnecting manually.");
await DisconnectClient();
break;
case ConnectionStatus.Disconnected:
switch (reason)
{
case ConnectionStatusChangeReason.Bad_Credential:
_log.LogInformation("IOT hub credentials rejected. Retrying.");
try
{
DeviceClient client = DeviceClient.CreateFromConnectionString(ConnectionString, TransportType.Mqtt);
await client.OpenAsync();
client.SetConnectionStatusChangesHandler(ConnectionStatusChangedHandler);
IRetryPolicy retryPolicy = new ExponentialBackoff(int.MaxValue, TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(10), TimeSpan.FromMilliseconds(100));
client.SetRetryPolicy(retryPolicy);
await _ioHubLock.WaitAsync();
_IOTHubClient = client;
_log.LogInformation("Retry successful.");
_ioHubLock.Release();
}
catch (Exception ex)
{
_log.LogCritical(ex, "Exception re-establishing connection.");
await Bail();
}
break;
case ConnectionStatusChangeReason.Client_Close:
_log.LogDebug("IOT hub closed gracefully.");
break;
case ConnectionStatusChangeReason.Communication_Error:
_log.LogInformation("IOT hub disconnected because of non-retryable exception. Restarting.");
await DisconnectClient();
break;
case ConnectionStatusChangeReason.Device_Disabled:
// This is gonna lose data, but at this point that's unavoidable.
if (firstDisabled - DateTime.Now > TimeSpan.FromMinutes(5))
{
_log.LogCritical($"The IOT hub device for the store and forward has been disabled or deleted for more than 5 minutes. Aborting.");
await Bail();
}
else
{
// Hopefully the device has just been temporarily disabled and it will be enabled before the data piling up kills us off.
if (firstDisabled == DateTime.MaxValue)
{
firstDisabled = DateTime.Now;
}
_log.LogCritical($"The IOT hub device for the store and forward has been disabled or deleted. Retrying, but it's not looking good.");
await DisconnectClient();
}
break;
case ConnectionStatusChangeReason.Retry_Expired:
_log.LogInformation("IOT hub disconnected because retry expired. Retrying more forcibly.");
await DisconnectClient();
break;
// No_Network is not used.
case ConnectionStatusChangeReason.No_Network:
// Expired_SAS_Token is not used
case ConnectionStatusChangeReason.Expired_SAS_Token:
default:
_log.LogError($"ConnectionStatus {status}, ConnectionStatusChangeReason {reason}: this should never happen. Contact support if you see this message.");
break;
}
break;
default:
_log.LogError($"ConnectionStatus {status}, ConnectionStatusChangeReason {reason}: this should never happen. Contact support if you see this message.");
break;
}
}
public async Task SendToHub(IQueuedMessage message)
{
try
{
await ConnectToHub();
byte[] messageBytes = await message.GetBytes();
if (messageBytes.Length == 0)
{
await message.MarkProcessed();
}
else if (messageBytes.Length > 0 && messageBytes.Length < MaxMessageSize)
{
using Message msg = new(messageBytes);
if (!string.IsNullOrEmpty(ContentEncoding))
{
msg.ContentEncoding = ContentEncoding;
}
if (!string.IsNullOrEmpty(ContentType))
{
msg.ContentType = ContentType;
}
if (Properties != null)
{
foreach(KeyValuePair<string,string> pair in Properties)
{
msg.Properties[pair.Key] = pair.Value;
}
}
await _IOTHubClient.SendEventAsync(msg);
_log.LogDebug($"Sent {messageBytes.Length} byte message to IoTHub");
Interlocked.Increment(ref messagesSent);
Interlocked.Add(ref bytesSent, messageBytes.Length);
await message.MarkProcessed();
}
else
{
await message.MarkPoisoned();
}
}
catch (Exception ex)
{
_log.LogWarning(ex, "Error encountered while sending data to IoTHub");
await message.Requeue();
}
}
Console log of the issue
The service's log at Debug level ends on a line with "Sent # byte message to IoTHub" several seconds before the process had to be killed off. Before that, that message was occurring multiple times a second.
adc_000001.zip