Skip to content

Commit

Permalink
Merge pull request Azure#63 from Azure/ravokkar/tpmamqpws-tracing
Browse files Browse the repository at this point in the history
Add client side tracing to the TPM over AMQP/WS scenario
  • Loading branch information
rajeevmv authored and Cristian Pop committed Oct 8, 2018
1 parent cd6a99b commit 059dfec
Showing 1 changed file with 15 additions and 1 deletion.
16 changes: 15 additions & 1 deletion provisioning/transport/amqp/src/AmqpClientConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ internal AmqpClientConnection(Uri uri, AmqpSettings amqpSettings)

public async Task OpenAsync(TimeSpan timeout, bool useWebSocket, X509Certificate2 clientCert, IWebProxy proxy)
{
if (Logging.IsEnabled) Logging.Enter(this, $"{nameof(AmqpClientConnection)}.{nameof(OpenAsync)}");
var hostName = _uri.Host;

var tcpSettings = new TcpTransportSettings { Host = hostName, Port = _uri.Port != -1 ? _uri.Port : AmqpConstants.DefaultSecurePort };
Expand All @@ -67,10 +68,11 @@ public async Task OpenAsync(TimeSpan timeout, bool useWebSocket, X509Certificate
SaslTransportProvider provider = _amqpSettings.GetTransportProvider<SaslTransportProvider>();
if (provider != null)
{
if (Logging.IsEnabled) Logging.Info(this, $"{nameof(AmqpClientConnection)}.{nameof(OpenAsync)}: Using SaslTransport");
_sentHeader = new ProtocolHeader(provider.ProtocolId, provider.DefaultVersion);
ByteBuffer buffer = new ByteBuffer(new byte[AmqpConstants.ProtocolHeaderSize]);
_sentHeader.Encode(buffer);

var args = new TransportAsyncCallbackArgs();
args.SetBuffer(buffer.Buffer, buffer.Offset, buffer.Length);
args.CompletedCallback = OnWriteHeaderComplete;
Expand Down Expand Up @@ -190,6 +192,8 @@ async Task<ClientWebSocket> CreateClientWebSocketAsync(Uri websocketUri, TimeSpa

private void OnWriteHeaderComplete(TransportAsyncCallbackArgs args)
{
if (Logging.IsEnabled) Logging.Enter(this, $"{nameof(AmqpClientConnection)}.{nameof(OnWriteHeaderComplete)}");

if (args.Exception != null)
{
CompleteOnException(args);
Expand All @@ -204,6 +208,8 @@ private void OnWriteHeaderComplete(TransportAsyncCallbackArgs args)

private void OnReadHeaderComplete(TransportAsyncCallbackArgs args)
{
if (Logging.IsEnabled) Logging.Enter(this, $"{nameof(AmqpClientConnection)}.{nameof(OnReadHeaderComplete)}");

if (args.Exception != null)
{
CompleteOnException(args);
Expand All @@ -214,13 +220,17 @@ private void OnReadHeaderComplete(TransportAsyncCallbackArgs args)
{
ProtocolHeader receivedHeader = new ProtocolHeader();
receivedHeader.Decode(new ByteBuffer(args.Buffer, args.Offset, args.Count));

if (Logging.IsEnabled) Logging.Info(this, $"{nameof(AmqpClientConnection)}.{nameof(OnReadHeaderComplete)}: Received Protocol Header: {receivedHeader.ToString()}");

if (!receivedHeader.Equals(_sentHeader))
{
throw new AmqpException(AmqpErrorCode.NotImplemented, $"The requested protocol version {_sentHeader} is not supported. The supported version is {receivedHeader}");
}

SaslTransportProvider provider = _amqpSettings.GetTransportProvider<SaslTransportProvider>();
var transport = provider.CreateTransport(args.Transport, true);
if (Logging.IsEnabled) Logging.Info(this, $"{nameof(AmqpClientConnection)}.{nameof(OnReadHeaderComplete)}: Created SaslTransportHandler ");
_tcs.TrySetResult(transport);
}
catch (Exception ex)
Expand All @@ -232,8 +242,12 @@ private void OnReadHeaderComplete(TransportAsyncCallbackArgs args)

private void CompleteOnException(TransportAsyncCallbackArgs args)
{
if (Logging.IsEnabled) Logging.Enter(this, $"{nameof(AmqpClientConnection)}.{nameof(CompleteOnException)}");

if (args.Exception != null && args.Transport != null)
{
if (Logging.IsEnabled) Logging.Error(this, $"{nameof(AmqpClientConnection)}.{nameof(CompleteOnException)}: Exception thrown {args.Exception.Message}");

args.Transport.SafeClose(args.Exception);
args.Transport = null;
_tcs.TrySetException(args.Exception);
Expand Down

0 comments on commit 059dfec

Please sign in to comment.