diff --git a/smoke/LeafDevice/LeafDevice.cs b/smoke/LeafDevice/LeafDevice.cs index 2b7217a5cb1..9bc29878809 100644 --- a/smoke/LeafDevice/LeafDevice.cs +++ b/smoke/LeafDevice/LeafDevice.cs @@ -13,8 +13,9 @@ public class LeafDevice : Details.Details string eventhubCompatibleEndpointWithEntityPath, string deviceId, string certificateFileName, - string edgeHostName) : - base(iothubConnectionString, eventhubCompatibleEndpointWithEntityPath, deviceId, certificateFileName, edgeHostName) + string edgeHostName, + bool useWebSockets) : + base(iothubConnectionString, eventhubCompatibleEndpointWithEntityPath, deviceId, certificateFileName, edgeHostName, useWebSockets) { } diff --git a/smoke/LeafDevice/LeafDevice.csproj b/smoke/LeafDevice/LeafDevice.csproj index a806ffc929a..78ff8054839 100644 --- a/smoke/LeafDevice/LeafDevice.csproj +++ b/smoke/LeafDevice/LeafDevice.csproj @@ -24,7 +24,7 @@ - + diff --git a/smoke/LeafDevice/Program.cs b/smoke/LeafDevice/Program.cs index 1b895a1dd4d..1c1fab79da7 100644 --- a/smoke/LeafDevice/Program.cs +++ b/smoke/LeafDevice/Program.cs @@ -19,7 +19,7 @@ namespace LeafDevice Option Environment variable --connection-string iothubConnectionString --eventhub-endpoint eventhubCompatibleEndpointWithEntityPath - + Defaults: All options to this command have defaults. If an option is not specified and its corresponding environment variable is not defined, then the default will @@ -54,6 +54,9 @@ class Program [Option("-ed|--edge-hostname", Description = "Leaf device identifier to be registered with IoT Hub")] public string EdgeHostName { get; } = ""; + [Option("--use-web-sockets", CommandOptionType.NoValue, Description = "Use websockets for IoT Hub connections.")] + public bool UseWebSockets { get; } = false; + // ReSharper disable once UnusedMember.Local async Task OnExecuteAsync() { @@ -70,7 +73,8 @@ async Task OnExecuteAsync() endpoint, this.DeviceId, this.CertificateFileName, - this.EdgeHostName); + this.EdgeHostName, + this.UseWebSockets); await test.RunAsync(); } catch (Exception ex) diff --git a/smoke/LeafDevice/details/Details.cs b/smoke/LeafDevice/details/Details.cs index ec032276f1c..85e75c77fa4 100644 --- a/smoke/LeafDevice/details/Details.cs +++ b/smoke/LeafDevice/details/Details.cs @@ -14,6 +14,9 @@ namespace LeafDevice.Details using Microsoft.Azure.EventHubs; using System.Net; using Microsoft.Azure.Devices.Edge.Util; + using DeviceClientTransportType = Microsoft.Azure.Devices.Client.TransportType; + using EventHubClientTransportType = Microsoft.Azure.EventHubs.TransportType; + using ServiceClientTransportType = Microsoft.Azure.Devices.TransportType; public class Details { @@ -22,6 +25,9 @@ public class Details readonly string deviceId; readonly string certificateFileName; readonly string edgeHostName; + readonly ServiceClientTransportType serviceClientTransportType; + readonly EventHubClientTransportType eventHubClientTransportType; + readonly DeviceClientTransportType deviceClientTransportType; DeviceContext context; @@ -30,7 +36,8 @@ public class Details string eventhubCompatibleEndpointWithEntityPath, string deviceId, string certificateFileName, - string edgeHostName + string edgeHostName, + bool useWebSockets ) { this.iothubConnectionString = iothubConnectionString; @@ -38,6 +45,19 @@ string edgeHostName this.deviceId = deviceId; this.certificateFileName = certificateFileName; this.edgeHostName = edgeHostName; + + if (useWebSockets) + { + this.serviceClientTransportType = ServiceClientTransportType.Amqp_WebSocket_Only; + this.eventHubClientTransportType = EventHubClientTransportType.AmqpWebSockets; + this.deviceClientTransportType = DeviceClientTransportType.Mqtt_WebSocket_Only; + } + else + { + this.serviceClientTransportType = ServiceClientTransportType.Amqp; + this.eventHubClientTransportType = EventHubClientTransportType.Amqp; + this.deviceClientTransportType = DeviceClientTransportType.Mqtt; + } } protected Task InstallCaCertificate() @@ -54,7 +74,7 @@ protected async Task ConnectToEdgeAndSendData() Microsoft.Azure.Devices.IotHubConnectionStringBuilder builder = Microsoft.Azure.Devices.IotHubConnectionStringBuilder.Create(this.iothubConnectionString); string leafDeviceConnectionString = $"HostName={builder.HostName};DeviceId={this.deviceId};SharedAccessKey={this.context.Device.Authentication.SymmetricKey.PrimaryKey};GatewayHostName={this.edgeHostName}"; - this.context.DeviceClientInstance = Option.Some(DeviceClient.CreateFromConnectionString(leafDeviceConnectionString, Microsoft.Azure.Devices.Client.TransportType.Mqtt)); + this.context.DeviceClientInstance = Option.Some(DeviceClient.CreateFromConnectionString(leafDeviceConnectionString, this.deviceClientTransportType)); Console.WriteLine("Leaf Device client created."); var message = new Microsoft.Azure.Devices.Client.Message(Encoding.ASCII.GetBytes($"Message from Leaf Device. MsgGUID: {this.context.MessageGuid}")); @@ -122,6 +142,7 @@ async Task CreateDeviceIdentity(RegistryManager rm) protected async Task VerifyDataOnIoTHub() { var builder = new EventHubsConnectionStringBuilder(this.eventhubCompatibleEndpointWithEntityPath); + builder.TransportType = this.eventHubClientTransportType; Console.WriteLine($"Receiving events from device '{this.context.Device.Id}' on Event Hub '{builder.EntityPath}'"); @@ -133,7 +154,7 @@ protected async Task VerifyDataOnIoTHub() EventHubPartitionKeyResolver.ResolveToPartition( this.context.Device.Id, (await eventHubClient.GetRuntimeInformationAsync()).PartitionCount), - DateTime.Now.AddMinutes(-5)); + EventPosition.FromEnqueuedTime(DateTime.Now.AddMinutes(-5))); var result = new TaskCompletionSource(); using (var cts = new CancellationTokenSource(TimeSpan.FromMinutes(3))) @@ -144,7 +165,7 @@ protected async Task VerifyDataOnIoTHub() new PartitionReceiveHandler( eventData => { - eventData.Properties.TryGetValue("iothub-connection-device-id", out object devId); + eventData.SystemProperties.TryGetValue("iothub-connection-device-id", out object devId); if (devId != null && devId.ToString().Equals(this.context.Device.Id) && Encoding.UTF8.GetString(eventData.Body).Contains(this.context.MessageGuid)) @@ -174,7 +195,7 @@ protected async Task VerifyDirectMethod() { //User Service SDK to invoke Direct Method on the device. ServiceClient serviceClient = - ServiceClient.CreateFromConnectionString(this.context.IotHubConnectionString); + ServiceClient.CreateFromConnectionString(this.context.IotHubConnectionString, this.serviceClientTransportType); //Call a direct method using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(300))) diff --git a/smoke/LeafDevice/details/PartitionReceiveHandler.cs b/smoke/LeafDevice/details/PartitionReceiveHandler.cs index 73e6276fe7d..bed7197e0bc 100644 --- a/smoke/LeafDevice/details/PartitionReceiveHandler.cs +++ b/smoke/LeafDevice/details/PartitionReceiveHandler.cs @@ -26,6 +26,6 @@ public Task ProcessEventsAsync(IEnumerable events) return Task.CompletedTask; } public Task ProcessErrorAsync(Exception error) => throw error; - public int MaxBatchSize { get; } = 10; + public int MaxBatchSize { get; set; } } }