Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IotHubClient API revision #143

Merged
merged 6 commits into from Jul 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 0 additions & 1 deletion README.md
Expand Up @@ -121,7 +121,6 @@ The protocol gateway provides a number of configuration settings to help fine tu
- **RetainProperty**. Protocol gateway will set a property named RetainProperty and a value of “True” on a message sent to IoT hub if the PUBLISH packet was marked with RETAIN=1. Default value: mqtt-retain
- **DupProperty**. Protocol gateway will set a property named DupProperty and a value of “True” on a message sent to IoT hub if the PUBLISH packet was marked with DUP=1. Default value: mqtt-dup
- **QoSProperty**. Indicates the name of the property on cloud-to-device messages that might be used to override the default QoS level for the device-bound message processing. For device to cloud messages, this is the name of the property that indicates the QoS level of a message when received from the device. Default value: mqtt-qos
- **DeviceReceiveAckTimeout**. If specified, indicates timeout for acknowledgement. If an acknowledgement times out for any of the acknowledgement queues (PUBACK, PUBREC, PUBCOMP), the device connection will be put in retransmission mode until all outstanding acknowledgements are received. If not specified or is less than or equal to 00:00:00, acknowledgement is awaited without a timeout. Default value: 00:00:00
- **MaxInboundMessageSize**. REQUIRED Maximum message size allowed for publishing from a device to the gateway. If a device publishes a bigger message, the protocol gateway will close the connection. The max supported value is 262144 (256 KB). Default value: 262144
- **MaxPendingInboundAcknowledgements**. Maximum allowed number of ACK messages pending processing. Protocol gateway will stop reading data from the device connection once it reaches MaxPendingInboundAcknowledgements and will restart only after the number of pending acknowledgements becomes lower than MaxPendingInboundAcknowledgements. Default value: 16
- **IotHubClient.ConnectionString**. REQUIRED Connection string to IoT hub. Defines the connection parameters when connecting to IoT hub on behalf of devices. By default, the protocol gateway will override the credentials from the connection string with the device credentials provided when a device connects to the gateway. No default value.
Expand Down
Expand Up @@ -185,10 +185,6 @@
<Content Include="GCSettingsManagement.ps1">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</Content>
<None Include="mqttTopicConversion.config.user">
<SubType>Designer</SubType>
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<Content Include="Startup.cmd">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</Content>
Expand Down
4 changes: 0 additions & 4 deletions host/ProtocolGateway.Host.Cloud.Service/app.config
@@ -1,9 +1,5 @@
<?xml version="1.0" encoding="utf-8"?>
<configuration>
<configSections>
<section name="mqttTopicNameConversion" type="Microsoft.Azure.Devices.ProtocolGateway.IotHubClient.Addressing.MessageAddressConversionConfigurationHandler, Microsoft.Azure.Devices.ProtocolGateway.IotHubClient" />
</configSections>
<mqttTopicNameConversion configSource="mqttTopicConversion.config.user" />
<system.diagnostics>
<sources>
<source name="mySource" switchName="sourceSwitch" switchType="System.Diagnostics.SourceSwitch">
Expand Down

This file was deleted.

67 changes: 48 additions & 19 deletions host/ProtocolGateway.Host.Common/Bootstrapper.cs
Expand Up @@ -4,10 +4,10 @@
namespace ProtocolGateway.Host.Common
{
using System;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.Net;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using DotNetty.Buffers;
Expand All @@ -18,14 +18,14 @@ namespace ProtocolGateway.Host.Common
using DotNetty.Transport.Bootstrapping;
using DotNetty.Transport.Channels;
using DotNetty.Transport.Channels.Sockets;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.ProtocolGateway;
using Microsoft.Azure.Devices.ProtocolGateway.Identity;
using Microsoft.Azure.Devices.ProtocolGateway.Instrumentation;
using Microsoft.Azure.Devices.ProtocolGateway.IotHubClient;
using Microsoft.Azure.Devices.ProtocolGateway.IotHubClient.Addressing;
using Microsoft.Azure.Devices.ProtocolGateway.Messaging;
using Microsoft.Azure.Devices.ProtocolGateway.Mqtt;
using Microsoft.Azure.Devices.ProtocolGateway.Mqtt.Persistence;
using Message = Microsoft.Azure.Devices.ProtocolGateway.Messaging.Message;

public class Bootstrapper
{
Expand All @@ -46,19 +46,8 @@ public class Bootstrapper
IEventLoopGroup eventLoopGroup;
IChannel serverChannel;
readonly IotHubClientSettings iotHubClientSettings;
readonly IMessageAddressConverter topicNameConverter;

public Bootstrapper(ISettingsProvider settingsProvider, ISessionStatePersistenceProvider sessionStateManager, IQos2StatePersistenceProvider qos2StateProvider) :
this(settingsProvider, sessionStateManager, qos2StateProvider, new ConfigurableMessageAddressConverter())
{
}

public Bootstrapper(ISettingsProvider settingsProvider, ISessionStatePersistenceProvider sessionStateManager, IQos2StatePersistenceProvider qos2StateProvider, List<string> inboundTemplates, List<string> outboundTemplates) :
this(settingsProvider, sessionStateManager, qos2StateProvider, new ConfigurableMessageAddressConverter(inboundTemplates ?? new List<string>(), outboundTemplates ?? new List<string>()))
{
}

Bootstrapper(ISettingsProvider settingsProvider, ISessionStatePersistenceProvider sessionStateManager, IQos2StatePersistenceProvider qos2StateProvider, IMessageAddressConverter addressConverter)
public Bootstrapper(ISettingsProvider settingsProvider, ISessionStatePersistenceProvider sessionStateManager, IQos2StatePersistenceProvider qos2StateProvider)
{
Contract.Requires(settingsProvider != null);
Contract.Requires(sessionStateManager != null);
Expand All @@ -71,7 +60,6 @@ public class Bootstrapper
this.sessionStateManager = sessionStateManager;
this.qos2StateProvider = qos2StateProvider;
this.authProvider = new SasTokenDeviceIdentityProvider();
this.topicNameConverter = addressConverter;
}

public Task CloseCompletion => this.closeCompletionSource.Task;
Expand Down Expand Up @@ -137,14 +125,22 @@ async void CloseAsync()

ServerBootstrap SetupBootstrap()
{
// pull/customize configuration
int maxInboundMessageSize = this.settingsProvider.GetIntegerSetting("MaxInboundMessageSize", 256 * 1024);
int connectionPoolSize = this.settingsProvider.GetIntegerSetting("IotHubClient.ConnectionPoolSize", DefaultConnectionPoolSize);
TimeSpan connectionIdleTimeout = this.settingsProvider.GetTimeSpanSetting("IotHubClient.ConnectionIdleTimeout", DefaultConnectionIdleTimeout);
string connectionString = this.iotHubClientSettings.IotHubConnectionString;

Func<IDeviceIdentity, Task<IMessagingServiceClient>> deviceClientFactory = IotHubClient.PreparePoolFactory(connectionString, connectionPoolSize,
connectionIdleTimeout, this.iotHubClientSettings, PooledByteBufferAllocator.Default, this.topicNameConverter);
MessagingBridgeFactoryFunc bridgeFactory = async deviceIdentity => new SingleClientMessagingBridge(deviceIdentity, await deviceClientFactory(deviceIdentity));
// setup message processing logic
var telemetryProcessing = TopicHandling.CompileParserFromUriTemplates(new[] { "devices/{deviceId}/messages/events" });
var commandProcessing = TopicHandling.CompileFormatterFromUriTemplate("devices/{deviceId}/messages/devicebound");
MessagingBridgeFactoryFunc bridgeFactory = IotHubBridge.PrepareFactory(connectionString, connectionPoolSize,
connectionIdleTimeout, this.iotHubClientSettings, bridge =>
{
bridge.RegisterRoute(topic => true, new TelemetrySender(bridge, telemetryProcessing)); // handle all incoming messages with TelemetrySender
bridge.RegisterSource(new CommandReceiver(bridge, PooledByteBufferAllocator.Default, commandProcessing)); // handle device command queue
bridge.RegisterSource(new MethodHandler("SendMessageToDevice", bridge, (request, dispatcher) => DispatchCommands(bridge.DeviceId, request, dispatcher))); // register
});

var acceptLimiter = new AcceptLimiter(MaxConcurrentAccepts);

Expand All @@ -171,5 +167,38 @@ ServerBootstrap SetupBootstrap()
bridgeFactory));
}));
}

static async Task<MethodResponse> DispatchCommands(string deviceId, MethodRequest request, IMessageDispatcher dispatcher)
{
try
{
// deserialize request payload and further process it before sending or
// just pass it through to message.Payload using Unpooled.WrappedBuffer(request.Data)
var message = new Message
{
Id = Guid.NewGuid().ToString(),
CreatedTimeUtc = DateTime.UtcNow,
Address = "devices/" + deviceId + "/messages/commands",
Properties = { { "extra", "property" } },
Payload = Unpooled.WrappedBuffer(request.Data)
};

var outcome = await dispatcher.SendAsync(message);
switch (outcome)
{
case SendMessageOutcome.Completed:
return new MethodResponse(200);
case SendMessageOutcome.Rejected:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Reject / Abandoned make sense in methods context ?

return new MethodResponse(Encoding.UTF8.GetBytes("{\"message\":\"Could not dispatch the call. Device is not subscribed.\"}"), 404);
default:
return new MethodResponse(Encoding.UTF8.GetBytes("{\"message\":\"Unexpected outcome.\"}"), 500);
}
}
catch (Exception ex)
{
CommonEventSource.Log.Error("Received malformed method request: " + request.DataAsJson, ex, null, deviceId);
return new MethodResponse(Encoding.UTF8.GetBytes($"{{\"message\":\"error sending message: {ex.ToString()}\"}}"), 500);
}
}
}
}
Expand Up @@ -49,7 +49,11 @@ static IByteBuffer ApplyCompression(IByteBuffer buffer, CompressionMode compress

Contract.Assert(outputStream.Length <= int.MaxValue);

#if NETSTANDARD1_3
return Unpooled.WrappedBuffer(outputStream.ToArray());
#else
return Unpooled.WrappedBuffer(outputStream.GetBuffer(), 0, (int)outputStream.Length);
#endif
}
}
finally
Expand Down
4 changes: 0 additions & 4 deletions host/ProtocolGateway.Host.Console/App.config
@@ -1,10 +1,6 @@
<?xml version="1.0" encoding="utf-8"?>
<configuration>
<configSections>
<section name="mqttTopicNameConversion" type="Microsoft.Azure.Devices.ProtocolGateway.IotHubClient.Addressing.MessageAddressConversionConfigurationHandler, Microsoft.Azure.Devices.ProtocolGateway.IotHubClient" />
</configSections>
<appSettings configSource="appSettings.config.user" />
<mqttTopicNameConversion configSource="mqttTopicConversion.config.user" />
<startup>
<supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5.1" />
</startup>
Expand Down
Expand Up @@ -147,10 +147,6 @@
<SubType>Designer</SubType>
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Include="mqttTopicConversion.config.user">
<SubType>Designer</SubType>
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Include="packages.config">
<SubType>Designer</SubType>
</None>
Expand Down

This file was deleted.