Skip to content

Commit

Permalink
Add Test cases for EndpointUtilty, LifetimeManager and ServiceConnect…
Browse files Browse the repository at this point in the history
…ion (#133)

* Add EndpointUtilty Test

* Add tests for lifetimeManager

* Add LifetimeManager IT

* Format the codes

* Format codes

* Remove mock and use TestClass instead.

* Add a few reconnection test

* Add message fieds verification

* Add reconnect test

* Update reconnection test

* Update reconnection when connection throws exception

* Update reconnection logic

* Format codes

* Add keepalive reconnect

* Fix typo

* fix merge conflict

* clean up test infrastructure

* PR comments

* fix typo
  • Loading branch information
zackliu committed Jul 9, 2018
1 parent 9ab17f4 commit 16fc928
Show file tree
Hide file tree
Showing 11 changed files with 742 additions and 88 deletions.
6 changes: 4 additions & 2 deletions src/Microsoft.Azure.SignalR/HubHost/ServiceLifetimeManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ namespace Microsoft.Azure.SignalR
{
internal class ServiceLifetimeManager<THub> : HubLifetimeManager<THub> where THub : Hub
{
private const string MarkerNotConfiguredError =
"'AddAzureSignalR(...)' was called without a matching call to 'IApplicationBuilder.UseAzureSignalR(...)'.";

private readonly ILogger<ServiceLifetimeManager<THub>> _logger;
private readonly IReadOnlyList<IHubProtocol> _allProtocols;

Expand All @@ -28,8 +31,7 @@ internal class ServiceLifetimeManager<THub> : HubLifetimeManager<THub> where THu
{
if (!marker.IsConfigured)
{
throw new InvalidOperationException("'UseAzureSignalR(...)' not called after calling 'AddAzureSignalR(...)'." +
" Please always use 'AddAzureSignalR(...)' and 'UseAzureSignalR(...)' altogether.");
throw new InvalidOperationException(MarkerNotConfiguredError);
}

_serviceConnectionManager = serviceConnectionManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Builder.Internal;
Expand Down Expand Up @@ -96,10 +95,6 @@ public void UseAzureSignalRWithAddSignalR()
}
}

class TestHub : Hub
{
}

public class EmptyApplicationLifetime : IApplicationLifetime
{
public CancellationToken ApplicationStarted => CancellationToken.None;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.IO;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.SignalR.Protocol;

namespace Microsoft.Azure.SignalR.Tests
{
public static class HandshakeUtils
{
private static readonly TimeSpan DefaultHandshakeTimeout = TimeSpan.FromSeconds(5);
private static readonly IServiceProtocol ServiceProtocol = new ServiceProtocol();

public static async Task ReceiveHandshakeRequestAsync(PipeReader input)
{
using (var cts = new CancellationTokenSource(DefaultHandshakeTimeout))
{
while (true)
{
var result = await input.ReadAsync(cts.Token);

var buffer = result.Buffer;
var consumed = buffer.Start;
var examined = buffer.End;

try
{
if (!buffer.IsEmpty)
{
if (ServiceProtocol.TryParseMessage(ref buffer, out var message))
{
consumed = buffer.Start;
examined = consumed;

if (!(message is HandshakeRequestMessage handshakeRequest))
{
throw new InvalidDataException(
$"{message.GetType().Name} received when waiting for handshake request.");
}

if (handshakeRequest.Version != ServiceProtocol.Version)
{
throw new InvalidDataException("Protocol version not supported.");
}

break;
}
}

if (result.IsCompleted)
{
// Not enough data, and we won't be getting any more data.
throw new InvalidOperationException(
"Service connectioned disconnected before sending a handshake request");
}
}
finally
{
input.AdvanceTo(consumed, examined);
}
}
}
}

public static Task SendHandshakeResponseAsync(PipeWriter output, HandshakeResponseMessage response = null)
{
ServiceProtocol.WriteMessage(response ?? new HandshakeResponseMessage(), output);
return output.FlushAsync().AsTask();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Buffers;
using System.Collections.Generic;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@

using System;
using System.Collections.Concurrent;
using System.IO;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.Azure.SignalR.Protocol;
Expand All @@ -15,32 +13,32 @@ namespace Microsoft.Azure.SignalR.Tests
{
internal class ServiceConnectionProxy : IClientConnectionManager, IClientConnectionFactory
{
private static readonly TimeSpan DefaultHandshakeTimeout = TimeSpan.FromSeconds(5);
private static readonly IServiceProtocol _serviceProtocol = new ServiceProtocol();
private static readonly IServiceProtocol ServiceProtocol = new ServiceProtocol();
private readonly PipeOptions _clientPipeOptions;

private IConnectionFactory ConnectionFactory { get; }
public TestConnectionFactory ConnectionFactory { get; }

public IClientConnectionManager ClientConnectionManager { get; }

public TestConnection ConnectionContext { get; }
public TestConnection ConnectionContext => ConnectionFactory.CurrentConnectionContext;

private ServiceConnection ServiceConnection { get; }
public ServiceConnection ServiceConnection { get; }

public ConcurrentDictionary<string, ServiceConnectionContext> ClientConnections => ClientConnectionManager.ClientConnections;

private readonly ConcurrentDictionary<string, TaskCompletionSource<ConnectionContext>> _waitForConnectionOpen = new ConcurrentDictionary<string, TaskCompletionSource<ConnectionContext>>();
private readonly ConcurrentDictionary<string, TaskCompletionSource<object>> _waitForConnectionClose = new ConcurrentDictionary<string, TaskCompletionSource<object>>();
private readonly ConcurrentDictionary<Type, TaskCompletionSource<ServiceMessage>> _waitForApplicationMessage = new ConcurrentDictionary<Type, TaskCompletionSource<ServiceMessage>>();

public ServiceConnectionProxy(ConnectionDelegate callback = null, PipeOptions clientPipeOptions = null)
public ServiceConnectionProxy(ConnectionDelegate callback = null, PipeOptions clientPipeOptions = null,
TestConnectionFactory connectionFactory = null)
{
ConnectionContext = new TestConnection();
ConnectionFactory = new TestConnectionFactory(ConnectionContext);
ConnectionFactory = connectionFactory ?? new TestConnectionFactory();
ClientConnectionManager = new ClientConnectionManager();
_clientPipeOptions = clientPipeOptions;

ServiceConnection = new ServiceConnection(
_serviceProtocol,
ServiceProtocol,
this,
ConnectionFactory,
NullLoggerFactory.Instance,
Expand All @@ -51,8 +49,54 @@ public ServiceConnectionProxy(ConnectionDelegate callback = null, PipeOptions cl

public Task StartAsync()
{
_ = ServiceConnection.StartAsync();
return HandshakeAsync();
return ServiceConnection.StartAsync();
}

public async Task ProcessApplicationMessagesAsync()
{
try
{
while (true)
{
var result = await ConnectionContext.Application.Input.ReadAsync();
var buffer = result.Buffer;

var consumed = buffer.Start;
var examined = buffer.End;

try
{
if (result.IsCanceled)
{
break;
}

if (!buffer.IsEmpty)
{
if (ServiceProtocol.TryParseMessage(ref buffer, out var message))
{
consumed = buffer.Start;
examined = consumed;

AddApplicationMessage(message.GetType(), message);
}
}

if (result.IsCompleted)
{
break;
}
}
finally
{
ConnectionContext.Application.Input.AdvanceTo(consumed, examined);
}
}
}
catch
{
// Ignored.
}
}

public void Stop()
Expand All @@ -62,7 +106,7 @@ public void Stop()

public async Task WriteMessageAsync(ServiceMessage message)
{
_serviceProtocol.WriteMessage(message, ConnectionContext.Application.Output);
ServiceProtocol.WriteMessage(message, ConnectionContext.Application.Output);
await ConnectionContext.Application.Output.FlushAsync();
}

Expand All @@ -76,6 +120,16 @@ public Task WaitForConnectionCloseAsync(string connectionId)
return _waitForConnectionClose.GetOrAdd(connectionId, key => new TaskCompletionSource<object>()).Task;
}

public Task<ServiceMessage> WaitForApplicationMessageAsync(Type type)
{
return _waitForApplicationMessage.GetOrAdd(type, key => new TaskCompletionSource<ServiceMessage>()).Task;
}

public Task<ConnectionContext> WaitForServerConnectionAsync(int count)
{
return ConnectionFactory.WaitForConnectionAsync(count);
}

private Task OnConnectionAsync(ConnectionContext connection)
{
var tcs = new TaskCompletionSource<object>();
Expand Down Expand Up @@ -110,67 +164,17 @@ public void RemoveClientConnection(string connectionId)
}
}

private async Task HandshakeAsync()
public ServiceConnectionContext CreateConnection(OpenConnectionMessage message)
{
using (var handshakeCts = new CancellationTokenSource(DefaultHandshakeTimeout))
{
await ReceiveHandshakeRequestAsync(ConnectionContext.Application.Input, handshakeCts.Token);
}

await WriteMessageAsync(new HandshakeResponseMessage());
return new ServiceConnectionContext(message, _clientPipeOptions, _clientPipeOptions);
}

private async Task ReceiveHandshakeRequestAsync(PipeReader input, CancellationToken cancellationToken)
private void AddApplicationMessage(Type type, ServiceMessage message)
{
while (true)
if (_waitForApplicationMessage.TryGetValue(type, out var tcs))
{
var result = await input.ReadAsync(cancellationToken);

var buffer = result.Buffer;
var consumed = buffer.Start;
var examined = buffer.End;

try
{
if (!buffer.IsEmpty)
{
if (_serviceProtocol.TryParseMessage(ref buffer, out var message))
{
consumed = buffer.Start;
examined = consumed;

if (!(message is HandshakeRequestMessage handshakeRequest))
{
throw new InvalidDataException(
$"{message.GetType().Name} received when waiting for handshake request.");
}

if (handshakeRequest.Version != _serviceProtocol.Version)
{
throw new InvalidDataException("Protocol version not supported.");
}

break;
}
}

if (result.IsCompleted)
{
// Not enough data, and we won't be getting any more data.
throw new InvalidOperationException(
"Service connectioned disconnected before sending a handshake request");
}
}
finally
{
input.AdvanceTo(consumed, examined);
}
tcs.TrySetResult(message);
}
}

public ServiceConnectionContext CreateConnection(OpenConnectionMessage message)
{
return new ServiceConnectionContext(message, _clientPipeOptions, _clientPipeOptions);
}
}
}

0 comments on commit 16fc928

Please sign in to comment.