Skip to content

Commit

Permalink
Transport api can now send messages in channels (#419)
Browse files Browse the repository at this point in the history
BREAKING CHANGE: Transports now receive and return channels
  • Loading branch information
paulpach committed Oct 21, 2020
1 parent ad81a9c commit 9a2690e
Show file tree
Hide file tree
Showing 24 changed files with 88 additions and 85 deletions.
2 changes: 1 addition & 1 deletion Assets/Mirror/Components/LobbyReady.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public void SetAllClientsNotReady()
}
}

public void SendToReady<T>(NetworkIdentity identity, T msg, bool includeOwner = true, int channelId = Channels.DefaultReliable)
public void SendToReady<T>(NetworkIdentity identity, T msg, bool includeOwner = true, int channelId = Channel.Reliable)
{
if (logger.LogEnabled()) logger.Log("Server.SendToReady msgType:" + typeof(T));

Expand Down
4 changes: 2 additions & 2 deletions Assets/Mirror/Runtime/CustomAttributes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class SyncVarAttribute : PropertyAttribute
public class ServerRpcAttribute : Attribute
{
// this is zero
public int channel = Channels.DefaultReliable;
public int channel = Channel.Reliable;
public bool requireAuthority = true;
}

Expand All @@ -37,7 +37,7 @@ public enum Client { Owner, Observers, Connection }
public class ClientRpcAttribute : Attribute
{
// this is zero
public int channel = Channels.DefaultReliable;
public int channel = Channel.Reliable;
public Client target = Client.Observers;
public bool excludeOwner;
}
Expand Down
4 changes: 2 additions & 2 deletions Assets/Mirror/Runtime/INetworkClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ public interface INetworkClient
{
void Disconnect();

void Send<T>(T message, int channelId = Channels.DefaultReliable);
void Send<T>(T message, int channelId = Channel.Reliable);

UniTask SendAsync<T>(T message, int channelId = Channels.DefaultReliable);
UniTask SendAsync<T>(T message, int channelId = Channel.Reliable);
}
}
4 changes: 2 additions & 2 deletions Assets/Mirror/Runtime/INetworkConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ public interface IMessageHandler

void ClearHandlers();

void Send<T>(T msg, int channelId = Channels.DefaultReliable);
void Send<T>(T msg, int channelId = Channel.Reliable);

UniTask SendAsync<T>(T msg, int channelId = Channels.DefaultReliable);
UniTask SendAsync<T>(T msg, int channelId = Channel.Reliable);

UniTask ProcessMessagesAsync();

Expand Down
4 changes: 2 additions & 2 deletions Assets/Mirror/Runtime/INetworkServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace Mirror
{
public interface IServerObjectManager
{
void SendToClientOfPlayer<T>(NetworkIdentity identity, T msg, int channelId = Channels.DefaultReliable);
void SendToClientOfPlayer<T>(NetworkIdentity identity, T msg, int channelId = Channel.Reliable);

bool AddPlayerForConnection(INetworkConnection conn, GameObject player);

Expand Down Expand Up @@ -36,6 +36,6 @@ public interface INetworkServer

void RemoveConnection(INetworkConnection conn);

void SendToAll<T>(T msg, int channelId = Channels.DefaultReliable);
void SendToAll<T>(T msg, int channelId = Channel.Reliable);
}
}
4 changes: 2 additions & 2 deletions Assets/Mirror/Runtime/NetworkClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -273,12 +273,12 @@ public void Disconnect()
/// <param name="message"></param>
/// <param name="channelId"></param>
/// <returns>True if message was sent.</returns>
public UniTask SendAsync<T>(T message, int channelId = Channels.DefaultReliable)
public UniTask SendAsync<T>(T message, int channelId = Channel.Reliable)
{
return Connection.SendAsync(message, channelId);
}

public void Send<T>(T message, int channelId = Channels.DefaultReliable)
public void Send<T>(T message, int channelId = Channel.Reliable)
{
Connection.SendAsync(message, channelId).Forget();
}
Expand Down
19 changes: 11 additions & 8 deletions Assets/Mirror/Runtime/NetworkConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public void ClearHandlers()
/// <param name="msg">The message to send</param>
/// <param name="channelId">The transport layer channel to send on.</param>
/// <returns></returns>
public virtual void Send<T>(T msg, int channelId = Channels.DefaultReliable)
public virtual void Send<T>(T msg, int channelId = Channel.Reliable)
{
SendAsync(msg, channelId).Forget();
}
Expand All @@ -189,7 +189,7 @@ public virtual void Send<T>(T msg, int channelId = Channels.DefaultReliable)
/// <param name="msg">The message to send.</param>
/// <param name="channelId">The transport layer channel to send on.</param>
/// <returns></returns>
public virtual UniTask SendAsync<T>(T msg, int channelId = Channels.DefaultReliable)
public virtual UniTask SendAsync<T>(T msg, int channelId = Channel.Reliable)
{
using (PooledNetworkWriter writer = NetworkWriterPool.GetWriter())
{
Expand All @@ -200,7 +200,7 @@ public virtual UniTask SendAsync<T>(T msg, int channelId = Channels.DefaultRelia
}
}

public static void Send<T>(IEnumerable<INetworkConnection> connections, T msg, int channelId = Channels.DefaultReliable)
public static void Send<T>(IEnumerable<INetworkConnection> connections, T msg, int channelId = Channel.Reliable)
{
using (PooledNetworkWriter writer = NetworkWriterPool.GetWriter())
{
Expand Down Expand Up @@ -229,9 +229,9 @@ public static void Send<T>(IEnumerable<INetworkConnection> connections, T msg, i

// internal because no one except Mirror should send bytes directly to
// the client. they would be detected as a message. send messages instead.
internal virtual UniTask SendAsync(ArraySegment<byte> segment, int channelId = Channels.DefaultReliable)
internal virtual UniTask SendAsync(ArraySegment<byte> segment, int channelId = Channel.Reliable)
{
return connection.SendAsync(segment);
return connection.SendAsync(segment, channelId);
}

public override string ToString()
Expand Down Expand Up @@ -347,10 +347,13 @@ public async UniTask ProcessMessagesAsync()
{
var buffer = new MemoryStream();

while (await connection.ReceiveAsync(buffer))
{
(bool next, int channel) = await connection.ReceiveAsync(buffer);

while (next) {
buffer.TryGetBuffer(out ArraySegment<byte> data);
TransportReceive(data, Channels.DefaultReliable);
TransportReceive(data, channel);

(next, channel) = await connection.ReceiveAsync(buffer);
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions Assets/Mirror/Runtime/NetworkServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ internal void ActivateHostScene()
/// <param name="identity"></param>
/// <param name="msg"></param>
/// <param name="channelId"></param>
internal void SendToObservers<T>(NetworkIdentity identity, T msg, bool includeOwner = true, int channelId = Channels.DefaultReliable)
internal void SendToObservers<T>(NetworkIdentity identity, T msg, bool includeOwner = true, int channelId = Channel.Reliable)
{
if (logger.LogEnabled()) logger.Log("Server.SendToObservers id:" + typeof(T));

Expand Down Expand Up @@ -397,7 +397,7 @@ internal void SendToObservers<T>(NetworkIdentity identity, T msg, bool includeOw
/// <typeparam name="T">Message type</typeparam>
/// <param name="msg">Message</param>
/// <param name="channelId">Transport channel to use</param>
public void SendToAll<T>(T msg, int channelId = Channels.DefaultReliable)
public void SendToAll<T>(T msg, int channelId = Channel.Reliable)
{
if (logger.LogEnabled()) logger.Log("Server.SendToAll id:" + typeof(T));
NetworkConnection.Send(connections, msg, channelId);
Expand Down Expand Up @@ -507,7 +507,7 @@ internal void OnAuthenticated(INetworkConnection conn)
/// <typeparam name="T">Message type</typeparam>
/// <param name="identity"></param>
/// <param name="msg"></param>
public void SendToClientOfPlayer<T>(NetworkIdentity identity, T msg, int channelId = Channels.DefaultReliable)
public void SendToClientOfPlayer<T>(NetworkIdentity identity, T msg, int channelId = Channel.Reliable)
{
if (identity != null)
{
Expand Down
19 changes: 12 additions & 7 deletions Assets/Mirror/Runtime/Transport/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,26 @@

namespace Mirror
{

public static class Channel
{
// 2 well known channels
// transports can implement other channels
// to expose their features
public const int Reliable = 0;
public const int Unreliable = 1;
}

public interface IConnection
{
UniTask SendAsync(ArraySegment<byte> data);
UniTask SendAsync(ArraySegment<byte> data, int channel = Channel.Reliable);

/// <summary>
/// reads a message from connection
/// </summary>
/// <param name="buffer">buffer where the message will be written</param>
/// <returns>true if we got a message, false if we got disconnected</returns>
UniTask<bool> ReceiveAsync(MemoryStream buffer);
UniTask<(bool next, int channel)> ReceiveAsync(MemoryStream buffer);

/// <summary>
/// Disconnect this connection
Expand All @@ -29,9 +39,4 @@ public interface IConnection
/// <returns></returns>
EndPoint GetEndPointAddress();
}

public interface IChannelConnection : IConnection
{
UniTask SendAsync(ArraySegment<byte> data, int channel);
}
}
2 changes: 1 addition & 1 deletion Assets/Mirror/Runtime/Transport/Kcp/KcpClientConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ protected async UniTask HandshakeAsync(int bits)
await SendAsync(data);

var stream = new MemoryStream();
if (!await ReceiveAsync(stream))
if (!(await ReceiveAsync(stream)).next)
{
throw new OperationCanceledException("Unable to establish connection, no Handshake message received.");
}
Expand Down
10 changes: 5 additions & 5 deletions Assets/Mirror/Runtime/Transport/Kcp/KcpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ private void SendWithChecksum(byte [] data, int length)
}
}

public UniTask SendAsync(ArraySegment<byte> data)
public UniTask SendAsync(ArraySegment<byte> data, int channel = Channel.Reliable)
{
kcp.Send(data.Array, data.Offset, data.Count);
return UniTask.CompletedTask;
Expand All @@ -149,7 +149,7 @@ public UniTask SendAsync(ArraySegment<byte> data)
/// </summary>
/// <param name="buffer">buffer where the message will be written</param>
/// <returns>true if we got a message, false if we got disconnected</returns>
public async UniTask<bool> ReceiveAsync(MemoryStream buffer)
public async UniTask<(bool next, int channel)> ReceiveAsync(MemoryStream buffer)
{
int msgSize = kcp.PeekSize();

Expand All @@ -164,7 +164,7 @@ public async UniTask<bool> ReceiveAsync(MemoryStream buffer)
if (!open)
{
Disconnected?.Invoke();
return false;
return (false, 0);
}

// we have some data, return it
Expand All @@ -180,10 +180,10 @@ public async UniTask<bool> ReceiveAsync(MemoryStream buffer)
{
open = false;
Disconnected?.Invoke();
return false;
return (false, 0);
}

return true;
return (true, 0);
}

/// <summary>
Expand Down
2 changes: 1 addition & 1 deletion Assets/Mirror/Runtime/Transport/Kcp/KcpServerConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ internal async UniTask HandshakeAsync()
// this first message is the one that contains the Hashcash,
// but we don't care, we already validated it before creating
// the connection
if (!await ReceiveAsync(stream))
if (!(await ReceiveAsync(stream)).next)
{
throw new OperationCanceledException("Unable to establish connection, no Handshake message received.");
}
Expand Down
2 changes: 1 addition & 1 deletion Assets/Mirror/Runtime/Transport/Kcp/KcpTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class KcpTransport : Transport

public KcpDelayMode delayMode = KcpDelayMode.Normal;
internal readonly Dictionary<IPEndPoint, KcpServerConnection> connectedClients = new Dictionary<IPEndPoint, KcpServerConnection>(new IPEndpointComparer());
readonly Channel<KcpServerConnection> acceptedConnections = Channel.CreateSingleConsumerUnbounded<KcpServerConnection>();
readonly Channel<KcpServerConnection> acceptedConnections = Cysharp.Threading.Tasks.Channel.CreateSingleConsumerUnbounded<KcpServerConnection>();

public override IEnumerable<string> Scheme => new[] { "kcp" };

Expand Down
10 changes: 5 additions & 5 deletions Assets/Mirror/Runtime/Transport/PipeConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ public void Disconnect()

// technically not an IPEndpoint, will fix later
public EndPoint GetEndPointAddress() => new IPEndPoint(IPAddress.Loopback, 0);

public async UniTask<bool> ReceiveAsync(MemoryStream buffer)
public async UniTask<(bool next, int channel)> ReceiveAsync(MemoryStream buffer)
{
// wait for a message
await MessageCount.WaitAsync();
Expand All @@ -64,7 +64,7 @@ public async UniTask<bool> ReceiveAsync(MemoryStream buffer)
ArraySegment<byte> data = reader.ReadBytesAndSizeSegment();

if (data.Count == 0)
return false;
return (false, 0);

buffer.SetLength(0);
buffer.Write(data.Array, data.Offset, data.Count);
Expand All @@ -76,10 +76,10 @@ public async UniTask<bool> ReceiveAsync(MemoryStream buffer)
reader.Position = 0;
}

return true;
return (true, 0);
}

public UniTask SendAsync(ArraySegment<byte> data)
public UniTask SendAsync(ArraySegment<byte> data, int channel = Channel.Reliable)
{
// add some data to the writer in the connected connection
// and increase the message count
Expand Down
11 changes: 6 additions & 5 deletions Assets/Mirror/Runtime/Transport/Tcp/TcpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ public TcpConnection(TcpClient client)
}

#region Receiving
public async UniTask<bool> ReceiveAsync(MemoryStream buffer)
public async UniTask<(bool next, int channel)> ReceiveAsync(MemoryStream buffer)
{
buffer.SetLength(0);
long position = buffer.Position;
try
{
// read message size
if (!await ReadExactlyAsync(stream, buffer, 4))
return false;
return (false, 0);

// rewind so that we read it
buffer.Position = position;
Expand All @@ -36,11 +36,12 @@ public async UniTask<bool> ReceiveAsync(MemoryStream buffer)
// now read the message
buffer.Position = position;

return await ReadExactlyAsync(stream, buffer, length);
bool next = await ReadExactlyAsync(stream, buffer, length);
return (next, 0);
}
catch (ObjectDisposedException)
{
return false;
return (false, 0);
}
}

Expand Down Expand Up @@ -96,7 +97,7 @@ private static int ReadInt(Stream buffer)
#endregion

#region Sending
public async UniTask SendAsync(ArraySegment<byte> data)
public async UniTask SendAsync(ArraySegment<byte> data, int channel = Channel.Reliable)
{
try
{
Expand Down
6 changes: 0 additions & 6 deletions Assets/Mirror/Runtime/UNetwork.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,6 @@ public enum Version
Current = 1
}

public static class Channels
{
public const int DefaultReliable = 0;
public const int DefaultUnreliable = 1;
}

// -- helpers for float conversion without allocations --
[StructLayout(LayoutKind.Explicit)]
internal struct UIntFloat
Expand Down
2 changes: 1 addition & 1 deletion Assets/Tests/Common/LoopbackTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Mirror.Tests

public class LoopbackTransport : Transport
{
public readonly Channel<IConnection> AcceptConnections = Channel.CreateSingleConsumerUnbounded<IConnection>();
public readonly Channel<IConnection> AcceptConnections = Cysharp.Threading.Tasks.Channel.CreateSingleConsumerUnbounded<IConnection>();

public override async UniTask<IConnection> AcceptAsync()
{
Expand Down
4 changes: 2 additions & 2 deletions Assets/Tests/Common/MockTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ namespace Mirror.Tests

public class MockTransport : Transport
{
public readonly Channel<IConnection> AcceptConnections = Channel.CreateSingleConsumerUnbounded<IConnection>();
public readonly Channel<IConnection> AcceptConnections = Cysharp.Threading.Tasks.Channel.CreateSingleConsumerUnbounded<IConnection>();

public override UniTask<IConnection> AcceptAsync()
{
return AcceptConnections.Reader.ReadAsync();
}

public readonly Channel<IConnection> ConnectConnections = Channel.CreateSingleConsumerUnbounded<IConnection>();
public readonly Channel<IConnection> ConnectConnections = Cysharp.Threading.Tasks.Channel.CreateSingleConsumerUnbounded<IConnection>();

public override IEnumerable<string> Scheme => new []{"tcp4"};

Expand Down

0 comments on commit 9a2690e

Please sign in to comment.