Skip to content

Commit

Permalink
Add support for AMQP over WebSockets (#304)
Browse files Browse the repository at this point in the history
* Add support for AMQP over WebSockets
  • Loading branch information
ancaantochi committed Sep 28, 2018
1 parent 07fa957 commit 87372c8
Show file tree
Hide file tree
Showing 22 changed files with 848 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,7 @@ public static class AmqpEventIds
public const int SendingLinkHandler = EventIdStart + 600;
public const int DeviceBoundLinkHandler = EventIdStart + 650;
public const int LinkHandler = EventIdStart + 700;
public const int AmqpWebSocketListener = EventIdStart + 800;
public const int ServerWebSocketTransport = EventIdStart + 900;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class AmqpProtocolHead : IProtocolHead
readonly ITransportSettings transportSettings;
readonly AmqpSettings amqpSettings;
readonly ITransportListenerProvider transportListenerProvider;
readonly IWebSocketListenerRegistry webSocketListenerRegistry;
readonly ConcurrentDictionary<uint, AmqpConnection> incomingConnectionMap;
readonly AsyncLock syncLock;

Expand All @@ -34,12 +35,14 @@ public class AmqpProtocolHead : IProtocolHead
public AmqpProtocolHead(
ITransportSettings transportSettings,
AmqpSettings amqpSettings,
ITransportListenerProvider transportListenerProvider)
ITransportListenerProvider transportListenerProvider,
IWebSocketListenerRegistry webSocketListenerRegistry)
{
this.syncLock = new AsyncLock();
this.transportSettings = Preconditions.CheckNotNull(transportSettings, nameof(transportSettings));
this.amqpSettings = Preconditions.CheckNotNull(amqpSettings, nameof(amqpSettings));
this.transportListenerProvider = Preconditions.CheckNotNull(transportListenerProvider);
this.webSocketListenerRegistry = Preconditions.CheckNotNull(webSocketListenerRegistry);

this.connectionSettings = new AmqpConnectionSettings
{
Expand All @@ -62,8 +65,9 @@ public async Task StartAsync()
{
Events.Starting();

// This transport settings object sets up a listener for TLS over TCP right now.
TransportListener[] listeners = { this.transportSettings.Settings.CreateListener() };
var amqpWebSocketListener = new AmqpWebSocketListener();
// This transport settings object sets up a listener for TLS over TCP and a listener for WebSockets.
TransportListener[] listeners = { this.transportSettings.Settings.CreateListener(), amqpWebSocketListener };

using (await this.syncLock.LockAsync())
{
Expand All @@ -72,6 +76,15 @@ public async Task StartAsync()
this.amqpTransportListener.Listen(this.OnAcceptTransport);
}

if (this.webSocketListenerRegistry.TryRegister(amqpWebSocketListener))
{
Events.RegisteredWebSocketListener();
}
else
{
Events.RegisterWebSocketListenerFailed();
}

// Preallocate buffers for AMQP transport
ByteBuffer.InitBufferManagers();

Expand Down Expand Up @@ -222,6 +235,8 @@ enum EventIds
ConnectionContextAddFailed = IdStart + 3,
Starting = IdStart + 4,
Started = IdStart + 5,
WebSocketsRegistered,
WebSocketsRegisterFail
}

internal static void AcceptTransportInputError(Exception ex) => Log.LogError((int)EventIds.AcceptTransportInputError, ex, $"Received a new transport connection with an error.");
Expand All @@ -237,6 +252,10 @@ enum EventIds
internal static void Starting() => Log.LogInformation((int)EventIds.Starting, $"Starting AMQP head");

internal static void Started() => Log.LogInformation((int)EventIds.Started, $"Started AMQP head");

internal static void RegisteredWebSocketListener() => Log.LogDebug((int)EventIds.WebSocketsRegistered, "WebSockets listener registered.");

internal static void RegisterWebSocketListenerFailed() => Log.LogDebug((int)EventIds.WebSocketsRegisterFail, "WebSockets listener failed to register.");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright (c) Microsoft. All rights reserved.

namespace Microsoft.Azure.Devices.Edge.Hub.Amqp
{
using System;
using System.Net;
using System.Net.WebSockets;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Edge.Util;
using Microsoft.Extensions.Logging;
using Microsoft.Azure.Amqp.Transport;
using Microsoft.Azure.Devices.Edge.Hub.Core;

class AmqpWebSocketListener : TransportListener, IWebSocketListener
{
public string SubProtocol => Constants.WebSocketSubProtocol;

public AmqpWebSocketListener()
: base(Constants.WebSocketListenerName)
{
}

public async Task ProcessWebSocketRequestAsync(WebSocket webSocket, Option<EndPoint> localEndPoint, EndPoint remoteEndPoint, string correlationId)
{
try
{
var taskCompletion = new TaskCompletionSource<bool>();

string localEndpointValue = localEndPoint.Expect(() => new ArgumentNullException(nameof(localEndPoint))).ToString();
var transport = new ServerWebSocketTransport(webSocket, localEndpointValue, remoteEndPoint.ToString(), correlationId);
transport.Open();

var args = new TransportAsyncCallbackArgs { Transport = transport, CompletedSynchronously = false };
this.OnTransportAccepted(args);

Events.EstablishedConnection(correlationId);

transport.Closed += (sender, eventArgs) =>
{
taskCompletion.SetResult(true);
};

//wait until websocket is closed
await taskCompletion.Task;
}
catch (Exception ex) when(!ex.IsFatal())
{
Events.FailedAcceptWebSocket(correlationId, ex);
throw;
}
}

protected override void OnListen()
{

}

static class Events
{
static readonly ILogger Log = Logger.Factory.CreateLogger<AmqpWebSocketListener>();
const int IdStart = AmqpEventIds.AmqpWebSocketListener;

enum EventIds
{
Established = IdStart,
Exception
}

public static void EstablishedConnection(string correlationId)
{
Log.LogInformation((int)EventIds.Established, $"Connection established CorrelationId {correlationId}");
}

public static void FailedAcceptWebSocket(string correlationId, Exception ex)
{
Log.LogWarning((int)EventIds.Exception, ex, $"Connection failed CorrelationId {correlationId}");
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,7 @@ public static class Constants
public const string MessageAnnotationsInputNameKey = "x-opt-input-name";
public const string MessageAnnotationsConnectionDeviceId = "iothub-connection-device-id";
public const string MessageAnnotationsConnectionModuleId = "iothub-connection-module-id";
public const string WebSocketSubProtocol = "AMQPWSB10";
public const string WebSocketListenerName = WebSocketSubProtocol +"-listener";
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<TreatWarningsAsErrors>True</TreatWarningsAsErrors>
Expand Down
Loading

0 comments on commit 87372c8

Please sign in to comment.