Skip to content

Commit

Permalink
Transport now has connected and started events. (#479)
Browse files Browse the repository at this point in the history
These changes are designed so that transports can be stacked on top of each other
and each middleware transport can implement their own handshake.

BREAKING CHANGE: Add Connected event to Transport API
BREAKING CHANGE: Add Started event to Transport API
BREAKING CHANGE: ListenAsync returns a task that completes when the transport stops
BREAKING CHANGE: Remove AcceptAsync from transports
  • Loading branch information
paulpach committed Nov 6, 2020
1 parent 8365894 commit 3e7f688
Show file tree
Hide file tree
Showing 15 changed files with 179 additions and 241 deletions.
52 changes: 26 additions & 26 deletions Assets/Mirror/Runtime/NetworkServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -177,33 +177,14 @@ public async UniTask ListenAsync()
{
Initialize();

// only start server if we want to listen
if (Listening)
{
await transport.ListenAsync();
logger.Log("Server started listening");
}

Active = true;

// (useful for loading & spawning stuff from database etc.)
Started.Invoke();

AcceptAsync().Forget();
}

// accept connections from clients
async UniTaskVoid AcceptAsync()
{
try
{
IConnection connection;

while ((connection = await transport.AcceptAsync()) != null)
// only start server if we want to listen
if (Listening)
{
INetworkConnection networkConnectionToClient = GetNewConnection(connection);

ConnectionAcceptedAsync(networkConnectionToClient).Forget();
transport.Started.AddListener(TransportStarted);
transport.Connected.AddListener(TransportConnected);
await transport.ListenAsync();
}
}
catch (Exception ex)
Expand All @@ -212,21 +193,39 @@ async UniTaskVoid AcceptAsync()
}
finally
{
transport.Connected.RemoveListener(TransportConnected);
transport.Started.RemoveListener(TransportStarted);
Cleanup();
}
}

private void TransportStarted()
{
logger.Log("Server started listening");
Active = true;
// (useful for loading & spawning stuff from database etc.)
Started.Invoke();
}

private void TransportConnected(IConnection connection)
{
INetworkConnection networkConnectionToClient = GetNewConnection(connection);
ConnectionAcceptedAsync(networkConnectionToClient).Forget();
}

/// <summary>
/// This starts a network "host" - a server and client in the same application.
/// <para>The client returned from StartHost() is a special "local" client that communicates to the in-process server using a message queue instead of the real network. But in almost all other cases, it can be treated as a normal client.</para>
/// </summary>
public async UniTask StartHost(NetworkClient client)
public UniTask StartHost(NetworkClient client)
{
if (!client)
throw new InvalidOperationException("NetworkClient not assigned. Unable to StartHost()");

// start listening to network connections
await ListenAsync();
UniTask task = ListenAsync();

Active = true;

client.ConnectHost(this);

Expand All @@ -236,6 +235,7 @@ public async UniTask StartHost(NetworkClient client)
OnStartHost.Invoke();

logger.Log("NetworkServer StartHost");
return task;
}

/// <summary>
Expand Down
51 changes: 21 additions & 30 deletions Assets/Mirror/Runtime/Transport/Kcp/KcpTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ public class KcpTransport : Transport

public KcpDelayMode delayMode = KcpDelayMode.Normal;
internal readonly Dictionary<IPEndPoint, KcpServerConnection> connectedClients = new Dictionary<IPEndPoint, KcpServerConnection>(new IPEndpointComparer());
Channel<KcpServerConnection> acceptedConnections;

public override IEnumerable<string> Scheme => new[] { "kcp" };

readonly byte[] buffer = new byte[1500];

public long ReceivedMessageCount { get; private set; }

private AutoResetUniTaskCompletionSource ListenCompletionSource;

/// <summary>
/// Open up the port and listen for connections
/// Use in servers.
Expand All @@ -36,13 +37,24 @@ public class KcpTransport : Transport
/// <returns></returns>
public override UniTask ListenAsync()
{
socket = new Socket(AddressFamily.InterNetworkV6, SocketType.Dgram, ProtocolType.Udp)
try
{
DualMode = true
};
socket.Bind(new IPEndPoint(IPAddress.IPv6Any, Port));
acceptedConnections = Cysharp.Threading.Tasks.Channel.CreateSingleConsumerUnbounded<KcpServerConnection>();
return UniTask.CompletedTask;
socket = new Socket(AddressFamily.InterNetworkV6, SocketType.Dgram, ProtocolType.Udp)
{
DualMode = true
};
socket.Bind(new IPEndPoint(IPAddress.IPv6Any, Port));

// transport started
Started.Invoke();

ListenCompletionSource = AutoResetUniTaskCompletionSource.Create();
return ListenCompletionSource.Task;
}
catch (Exception ex)
{
return UniTask.FromException(ex);
}
}

EndPoint newClientEP = new IPEndPoint(IPAddress.IPv6Any, 0);
Expand Down Expand Up @@ -98,7 +110,7 @@ private async UniTaskVoid ServerHandshake(EndPoint endpoint, byte[] data, int ms
await connection.HandshakeAsync();

// once handshake is completed, then the connection has been accepted
acceptedConnections.Writer.TryWrite(connection);
Connected.Invoke(connection);
}

private readonly HashSet<HashCash> used = new HashSet<HashCash>();
Expand Down Expand Up @@ -153,28 +165,7 @@ public override void Disconnect()
{
socket?.Close();
socket = null;
acceptedConnections.Writer.TryComplete();
}

/// <summary>
/// Accepts a connection from a client.
/// After ListenAsync completes, clients will queue up until you call AcceptAsync
/// then you get the connection to the client
/// </summary>
/// <returns>The connection to a client</returns>
public override async UniTask<IConnection> AcceptAsync()
{
if (socket == null)
return null;

try
{
return await acceptedConnections.Reader.ReadAsync();
}
catch (ChannelClosedException)
{
return null;
}
ListenCompletionSource?.TrySetResult();
}

/// <summary>
Expand Down
83 changes: 11 additions & 72 deletions Assets/Mirror/Runtime/Transport/MultiplexTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
using System.Collections.Generic;
using System.Linq;
using Cysharp.Threading.Tasks;
using UnityEngine;

namespace Mirror
{
Expand All @@ -11,11 +10,6 @@ public class MultiplexTransport : Transport

public Transport[] transports;

AutoResetUniTaskCompletionSource completionSource;

Queue<IConnection> acceptedConnections;
Queue<Transport> acceptedTransport;

public override IEnumerable<string> Scheme =>
transports
.Where(transport => transport.Supported)
Expand All @@ -33,67 +27,6 @@ internal Transport GetTransport()

public override bool Supported => GetTransport() != null;

public override async UniTask<IConnection> AcceptAsync()
{
if (acceptedTransport == null)
{
acceptedTransport = new Queue<Transport>();
acceptedConnections = new Queue<IConnection>();

foreach (Transport transport in transports)
{
acceptedTransport.Enqueue(transport);
}
}

while (true)
{
if (acceptedConnections.Count > 0)
return acceptedConnections.Dequeue();

// all transports already closed
if (acceptedTransport.Count == 0)
return null;

completionSource = AutoResetUniTaskCompletionSource.Create();

// no pending connections, accept from any transport again
while (acceptedTransport.Count > 0 && acceptedConnections.Count == 0)
{
Transport transport = acceptedTransport.Dequeue();
AcceptConnection(transport).Forget();
}

await completionSource.Task;

completionSource = null;

}

}

private async UniTaskVoid AcceptConnection(Transport transport)
{
try
{
IConnection connection = await transport.AcceptAsync();

if (connection != null)
{
acceptedConnections.Enqueue(connection);
acceptedTransport.Enqueue(transport);
}
}
catch (Exception ex)
{
Debug.LogException(ex);
}
finally
{
completionSource?.TrySetResult();
}
}

public override UniTask<IConnection> ConnectAsync(Uri uri)
{
foreach (Transport transport in transports)
Expand All @@ -110,12 +43,18 @@ public override void Disconnect()
transport.Disconnect();
}

public override async UniTask ListenAsync()
public void Start()
{
foreach (Transport t in transports)
{
t.Connected.AddListener(c => Connected.Invoke(c));
t.Started.AddListener(() => Started.Invoke());
}
}

public override UniTask ListenAsync()
{
IEnumerable<UniTask> tasks = from t in transports select t.ListenAsync();
await UniTask.WhenAll(tasks);
acceptedTransport = null;
acceptedConnections = null;
return UniTask.WhenAll(transports.Select(t => t.ListenAsync()));
}

public override IEnumerable<Uri> ServerUri() =>
Expand Down
22 changes: 14 additions & 8 deletions Assets/Mirror/Runtime/Transport/Transport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Generic;
using UnityEngine;
using Cysharp.Threading.Tasks;
using UnityEngine.Events;

namespace Mirror
{
Expand All @@ -11,11 +12,24 @@ namespace Mirror
/// </summary>
public abstract class Transport : MonoBehaviour
{
public class ConnectEvent : UnityEvent<IConnection> { }

public abstract IEnumerable<string> Scheme { get; }

/// <summary>
/// Event that gets fired when a client is accepted by the transport
/// </summary>
public ConnectEvent Connected = new ConnectEvent();

/// <summary>
/// Raised when the transport starts
/// </summary>
public UnityEvent Started = new UnityEvent();

/// <summary>
/// Open up the port and listen for connections
/// Use in servers.
/// Note the task ends when we stop listening
/// </summary>
/// <exception>If we cannot start the transport</exception>
/// <returns></returns>
Expand All @@ -40,14 +54,6 @@ public abstract class Transport : MonoBehaviour
/// <exception>If connection cannot be established</exception>
public abstract UniTask<IConnection> ConnectAsync(Uri uri);

/// <summary>
/// Accepts a connection from a client.
/// After ListenAsync completes, clients will queue up until you call AcceptAsync
/// then you get the connection to the client
/// </summary>
/// <returns>The connection to a client</returns>
public abstract UniTask<IConnection> AcceptAsync();

/// <summary>
/// Retrieves the address of this server.
/// Useful for network discovery
Expand Down
16 changes: 7 additions & 9 deletions Assets/Tests/Common/LoopbackTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,29 @@ public class LoopbackTransport : Transport
{
public readonly Channel<IConnection> AcceptConnections = Cysharp.Threading.Tasks.Channel.CreateSingleConsumerUnbounded<IConnection>();

public override async UniTask<IConnection> AcceptAsync()
{
return await AcceptConnections.Reader.ReadAsync();
}

public override IEnumerable<string> Scheme => new [] { "local" };

public override bool Supported => true;

public override UniTask<IConnection> ConnectAsync(Uri uri)
{
(IConnection c1, IConnection c2) = PipeConnection.CreatePipe();
AcceptConnections.Writer.TryWrite(c2);

Connected.Invoke(c2);
return UniTask.FromResult(c1);
}

UniTaskCompletionSource listenCompletionSource;

public override void Disconnect()
{
AcceptConnections.Writer.TryWrite(null);
listenCompletionSource?.TrySetResult();
}

public override UniTask ListenAsync()
{
return UniTask.CompletedTask;
Started.Invoke();
listenCompletionSource = new UniTaskCompletionSource();
return listenCompletionSource.Task;
}

public override IEnumerable<Uri> ServerUri()
Expand Down

0 comments on commit 3e7f688

Please sign in to comment.