Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge master to previews/mockclients #1882

Merged
merged 4 commits into from
Apr 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 49 additions & 12 deletions common/src/device/transport/mqtt/ClientWebSocketChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using System.Threading.Tasks;
using DotNetty.Buffers;
using DotNetty.Transport.Channels;
using Microsoft.Azure.Devices.Shared;

namespace Microsoft.Azure.Devices.Client.Transport.Mqtt
{
Expand Down Expand Up @@ -242,38 +243,74 @@ protected override async void DoBeginRead()

protected override async void DoWrite(ChannelOutboundBuffer channelOutboundBuffer)
{
if (channelOutboundBuffer == null)
{
throw new ArgumentNullException(nameof(channelOutboundBuffer), "The channel outbound buffer cannot be null.");
}

try
{
// The ChannelOutboundBuffer might have more than one message per MQTT packet that needs to be written to the websocket as a single frame.
// One example of this is a PUBLISH packet, which encodes the payload and topic information as separate messages.
// In order to reduce the number of frames sent to the websocket, we will consolidate the individual messages per MQTT packet into a single byte buffer.
IByteBufferAllocator allocator = Configuration.Allocator;

// The parameter "direct" is used to indicate if operations carried out in the CompositeByteBuffer should be treated as "unsafe".
var compositeByteBuffer = new CompositeByteBuffer(allocator, direct: false, maxNumComponents: int.MaxValue);

var bytesToBeWritten = new ArraySegment<byte>();
_isWriteInProgress = true;

while (true)
{
object currentMessage = channelOutboundBuffer.Current;

// Once there are no more messages pending in ChannelOutboundBuffer, the "Current" property is returned as "null".
// This indicates that all pending messages have been dequeued from the ChannelOutboundBuffer and are ready to be written to the websocket.
if (currentMessage == null)
{
// Wrote all messages.
// This indicates that the ChannelOutboundBuffer had readable bytes and they have been added to the CompositeByteBuffer.
if (compositeByteBuffer.NumComponents > 0)
{
// All messages have been added to the CompositeByteBuffer and are now ready to be written to the socket.
bytesToBeWritten = compositeByteBuffer.GetIoBuffer();
}
break;
}

var byteBuffer = currentMessage as IByteBuffer;
Debug.Assert(byteBuffer != null, "channelOutBoundBuffer contents must be of type IByteBuffer");

if (byteBuffer.ReadableBytes == 0)
// If the byte buffer has readable bytes then add them to the CompositeByteBuffer.
if (byteBuffer.ReadableBytes != 0)
{
channelOutboundBuffer.Remove();
continue;
// There are two operations carried out while adding a byte buffer component to a CompositeByteBuffer:
// - Increase WriterIndex of the CompositeByteBuffer
// - increases the count of readable bytes added to the CompositeByteBuffer.
// - Call the method Retain() on the byte buffer being added
// - The property ReferenceCount of a byte buffer implementation maintains a counter of the no of messages available for dequeuing.
// A ReferenceCount of 0 indicates that all messages have been flushed and the buffer can be deallocated.
// By calling the method Retain() on each byte buffer component added to the CompositeByteBuffer,
// we increase the ReferenceCount by 1 and mark them as ready for dequeuing.
compositeByteBuffer
.AddComponent(
increaseWriterIndex: true,
buffer: (IByteBuffer)byteBuffer.Retain());
}

await _webSocket
.SendAsync(
byteBuffer.GetIoBuffer(),
WebSocketMessageType.Binary,
true,
_writeCancellationTokenSource.Token)
.ConfigureAwait(false);

// Once the readable bytes are added to the CompositeByteBuffer they can be removed from the ChannelOutboundBuffer
// and the next message, if any, can be processed.
channelOutboundBuffer.Remove();
}

if (bytesToBeWritten.Count > 0)
{
if (Logging.IsEnabled)
Logging.Info(compositeByteBuffer, $"Writing bytes of size {bytesToBeWritten.Count} to the websocket", nameof(DoWrite));

await _webSocket.SendAsync(bytesToBeWritten, WebSocketMessageType.Binary, true, _writeCancellationTokenSource.Token).ConfigureAwait(false);
}

_isWriteInProgress = false;
}
catch (Exception e)
Expand Down
78 changes: 47 additions & 31 deletions common/src/service/HttpClientHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,53 +40,31 @@ internal sealed class HttpClientHelper : IHttpClientHelper
// IDisposables

private readonly HttpClient _httpClientWithDefaultTimeout;

private readonly HttpClient _httpClientWithNoTimeout;
private readonly HttpClientHandler _httpClientHandler;

public HttpClientHelper(
Uri baseAddress,
IAuthorizationHeaderProvider authenticationHeaderProvider,
IDictionary<HttpStatusCode, Func<HttpResponseMessage, Task<Exception>>> defaultErrorMapping,
TimeSpan timeout,
IWebProxy customHttpProxy)
IWebProxy customHttpProxy,
int connectionLeaseTimeoutMilliseconds)
{
_baseAddress = baseAddress;
_authenticationHeaderProvider = authenticationHeaderProvider;
_defaultErrorMapping = new ReadOnlyDictionary<HttpStatusCode, Func<HttpResponseMessage, Task<Exception>>>(defaultErrorMapping);
_defaultOperationTimeout = timeout;

// HttpClientHandler is IDisposable, so save onto it for disposing.
_httpClientHandler = new HttpClientHandler
{
#if !NET451
SslProtocols = TlsVersions.Instance.Preferred,
CheckCertificateRevocationList = TlsVersions.Instance.CertificateRevocationCheck,
#endif
};

if (customHttpProxy != DefaultWebProxySettings.Instance)
{
_httpClientHandler.UseProxy = customHttpProxy != null;
_httpClientHandler.Proxy = customHttpProxy;
}

// We need two types of HttpClients, one with our default operation timeout, and one without. The one without will rely on
// a cancellation token.

_httpClientWithDefaultTimeout = new HttpClient(_httpClientHandler, false)
{
BaseAddress = _baseAddress,
Timeout = _defaultOperationTimeout,
};
_httpClientWithDefaultTimeout = CreateDefaultClient(customHttpProxy, _baseAddress, connectionLeaseTimeoutMilliseconds);
_httpClientWithDefaultTimeout.Timeout = _defaultOperationTimeout;
_httpClientWithDefaultTimeout.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue(CommonConstants.MediaTypeForDeviceManagementApis));
_httpClientWithDefaultTimeout.DefaultRequestHeaders.ExpectContinue = false;

_httpClientWithNoTimeout = new HttpClient(_httpClientHandler, false)
{
BaseAddress = _baseAddress,
Timeout = Timeout.InfiniteTimeSpan,
};
_httpClientWithNoTimeout = CreateDefaultClient(customHttpProxy, _baseAddress, connectionLeaseTimeoutMilliseconds);
_httpClientWithNoTimeout.Timeout = Timeout.InfiniteTimeSpan;
_httpClientWithNoTimeout.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue(CommonConstants.MediaTypeForDeviceManagementApis));
_httpClientWithNoTimeout.DefaultRequestHeaders.ExpectContinue = false;

Expand Down Expand Up @@ -788,9 +766,6 @@ public void Dispose()
{
_httpClientWithDefaultTimeout.Dispose();
_httpClientWithNoTimeout.Dispose();

// Since HttpClientHandler was passed to the 2 HttpClients above, but told them not to dispose it, we want to dispose this after
_httpClientHandler.Dispose();
}

private async Task ExecuteAsync(
Expand Down Expand Up @@ -919,5 +894,46 @@ public void Dispose()
Task<Exception> exception = mapToExceptionFunc(response);
return await exception.ConfigureAwait(false);
}

internal static HttpClient CreateDefaultClient(IWebProxy webProxy, Uri baseUri, int connectionLeaseTimeoutMilliseconds)
{
// This http client will dispose of this httpMessageHandler when the http client is disposed, so no need to save a local copy
#pragma warning disable CA2000 // Dispose objects before losing scope (httpClient and messageHandler are disposed outside of this scope)
return new HttpClient(CreateDefaultHttpMessageHandler(webProxy, baseUri, connectionLeaseTimeoutMilliseconds))
#pragma warning restore CA2000 // Dispose objects before losing scope
{
// Timeouts are handled by the pipeline
Timeout = Timeout.InfiniteTimeSpan,
BaseAddress = baseUri
};
}

internal static HttpMessageHandler CreateDefaultHttpMessageHandler(IWebProxy webProxy, Uri baseUri, int connectionLeaseTimeoutMilliseconds)
{
#pragma warning disable CA2000 // Dispose objects before losing scope (object is returned by this method, so the caller is responsible for disposing it)
#if NETCOREAPP && !NETCOREAPP2_0 && !NETCOREAPP1_0 && !NETCOREAPP1_1
// SocketsHttpHandler is only available in netcoreapp2.1 and onwards
SocketsHttpHandler httpMessageHandler = new SocketsHttpHandler();
httpMessageHandler.SslOptions.EnabledSslProtocols = TlsVersions.Instance.Preferred;
#else
HttpClientHandler httpMessageHandler = new HttpClientHandler();
#if !NET451
httpMessageHandler.SslProtocols = TlsVersions.Instance.Preferred;
httpMessageHandler.CheckCertificateRevocationList = TlsVersions.Instance.CertificateRevocationCheck;
#endif
#endif
#pragma warning restore CA2000 // Dispose objects before losing scope


if (webProxy != DefaultWebProxySettings.Instance)
{
httpMessageHandler.UseProxy = webProxy != null;
httpMessageHandler.Proxy = webProxy;
}

ServicePointHelpers.SetLimits(httpMessageHandler, baseUri, connectionLeaseTimeoutMilliseconds);

return httpMessageHandler;
}
}
}
1 change: 1 addition & 0 deletions iothub/device/src/AuthenticationWithTokenRefresh.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public async Task<string> GetTokenAsync(string iotHub)
return _token;
}

Debug.Assert(_lock != null);
await _lock.WaitAsync().ConfigureAwait(false);

try
Expand Down
45 changes: 39 additions & 6 deletions iothub/device/src/Transport/Mqtt/ClientWebSocketChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -268,32 +268,65 @@ protected override async void DoWrite(ChannelOutboundBuffer channelOutboundBuffe

try
{
// The ChannelOutboundBuffer might have more than one message per MQTT packet that needs to be written to the websocket as a single frame.
// One example of this is a PUBLISH packet, which encodes the payload and topic information as separate messages.
// In order to reduce the number of frames sent to the websocket, we will consolidate the individual messages per MQTT packet into a single byte buffer.
IByteBufferAllocator allocator = Configuration.Allocator;

// The parameter "direct" is used to indicate if operations carried out in the CompositeByteBuffer should be treated as "unsafe".
var compositeByteBuffer = new CompositeByteBuffer(allocator, direct: false, maxNumComponents: int.MaxValue);

var bytesToBeWritten = new ArraySegment<byte>();
_isWriteInProgress = true;

while (true)
{
object currentMessage = channelOutboundBuffer.Current;

// Once there are no more messages pending in ChannelOutboundBuffer, the "Current" property is returned as "null".
// This indicates that all pending messages have been dequeued from the ChannelOutboundBuffer and are ready to be written to the websocket.
if (currentMessage == null)
{
// Wrote all messages.
// This indicates that the ChannelOutboundBuffer had readable bytes and they have been added to the CompositeByteBuffer.
if (compositeByteBuffer.NumComponents > 0)
{
// All messages have been added to the CompositeByteBuffer and are now ready to be written to the socket.
bytesToBeWritten = compositeByteBuffer.GetIoBuffer();
}
break;
}

var byteBuffer = currentMessage as IByteBuffer;
Fx.AssertAndThrow(byteBuffer != null, "channelOutBoundBuffer contents must be of type IByteBuffer");

if (byteBuffer.ReadableBytes == 0)
// If the byte buffer has readable bytes then add them to the CompositeByteBuffer.
if (byteBuffer.ReadableBytes != 0)
{
channelOutboundBuffer.Remove();
continue;
// There are two operations carried out while adding a byte buffer component to a CompositeByteBuffer:
// - Increase WriterIndex of the CompositeByteBuffer
// - increases the count of readable bytes added to the CompositeByteBuffer.
// - Call the method Retain() on the byte buffer being added
// - The property ReferenceCount of a byte buffer implementation maintains a counter of the no of messages available for dequeuing.
// A ReferenceCount of 0 indicates that all messages have been flushed and the buffer can be deallocated.
// By calling the method Retain() on each byte buffer component added to the CompositeByteBuffer,
// we increase the ReferenceCount by 1 and mark them as ready for dequeuing.
compositeByteBuffer
.AddComponent(
increaseWriterIndex: true,
buffer: (IByteBuffer)byteBuffer.Retain());
}

ArraySegment<byte> bytesToBeWritten = byteBuffer.GetIoBuffer();
// Once the readable bytes are added to the CompositeByteBuffer they can be removed from the ChannelOutboundBuffer
// and the next message, if any, can be processed.
channelOutboundBuffer.Remove();
}

if (bytesToBeWritten.Count > 0)
{
if (Logging.IsEnabled)
Logging.Info(this, $"Writing bytes of size {bytesToBeWritten.Count} to the websocket", nameof(DoWrite));

await _webSocket.SendAsync(bytesToBeWritten, WebSocketMessageType.Binary, true, _writeCancellationTokenSource.Token).ConfigureAwait(false);
channelOutboundBuffer.Remove();
}

_isWriteInProgress = false;
Expand Down
4 changes: 1 addition & 3 deletions iothub/service/src/ClientApiVersionHelper.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;

namespace Microsoft.Azure.Devices
{
/// <summary>
Expand All @@ -11,7 +9,7 @@ namespace Microsoft.Azure.Devices
internal class ClientApiVersionHelper
{
private const string ApiVersionQueryPrefix = "api-version=";
private const string ApiVersionDefault = "2020-09-30";
private const string ApiVersionDefault = "2021-04-12";

/// <summary>
/// The default API version to use for all data-plane service calls
Expand Down
Loading