Skip to content

Commit

Permalink
Revert "fix: reduce allocations at the transport"
Browse files Browse the repository at this point in the history
This reverts commit 80e4f17.
  • Loading branch information
paulpach committed Jul 19, 2019
1 parent 318ae41 commit bb128fe
Show file tree
Hide file tree
Showing 10 changed files with 102 additions and 111 deletions.
14 changes: 9 additions & 5 deletions Assets/Mirror/Runtime/LocalConnections.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@ public ULocalConnectionToClient() : base ("localClient")
connectionId = 0;
}

internal override bool SendBytes(byte[] bytes, int channelId = Channels.DefaultReliable)
internal override bool SendBytes(ArraySegment<byte> bytes, int channelId = Channels.DefaultReliable)
{
NetworkClient.localClientPacketQueue.Enqueue(bytes);
// must hold on to the data, so copy into a byte[]
byte[] data = new byte[bytes.Count];
Array.Copy(bytes.Array, bytes.Offset, data, 0, bytes.Count);

NetworkClient.localClientPacketQueue.Enqueue(data);
return true;
}
}
Expand All @@ -30,17 +34,17 @@ public ULocalConnectionToServer() : base("localServer")
connectionId = 0;
}

internal override bool SendBytes(byte[] bytes, int channelId = Channels.DefaultReliable)
internal override bool SendBytes(ArraySegment<byte> bytes, int channelId = Channels.DefaultReliable)
{
if (bytes == null || bytes.Length == 0)
if (bytes.Count == 0)
{
Debug.LogError("LocalConnection.SendBytes cannot send zero bytes");
return false;
}

// handle the server's message directly
// TODO any way to do this without NetworkServer.localConnection?
NetworkServer.localConnection.TransportReceive(new ArraySegment<byte>(bytes));
NetworkServer.localConnection.TransportReceive(bytes);
return true;
}
}
Expand Down
2 changes: 0 additions & 2 deletions Assets/Mirror/Runtime/MessagePacker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,6 @@ public static bool UnpackMessage(NetworkReader messageReader, out int msgType)
// read message type (varint)
try
{
_ = messageReader.ReadInt32();

msgType = (int)messageReader.ReadUInt16();
return true;
}
Expand Down
16 changes: 10 additions & 6 deletions Assets/Mirror/Runtime/NetworkConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,22 +88,26 @@ internal void SetHandlers(Dictionary<int, NetworkMessageDelegate> handlers)
public virtual bool Send<T>(T msg, int channelId = Channels.DefaultReliable) where T: IMessageBase
{
// pack message and send
return SendBytes(MessagePacker.Pack(msg), channelId);
NetworkWriter writer = NetworkWriterPool.GetWriter();
MessagePacker.Pack(msg, writer);
bool result = SendBytes(writer.ToArraySegment(), channelId);
NetworkWriterPool.Recycle(writer);
return result;
}

// internal because no one except Mirror should send bytes directly to
// the client. they would be detected as a message. send messages instead.
internal virtual bool SendBytes(byte[] bytes, int channelId = Channels.DefaultReliable)
internal virtual bool SendBytes(ArraySegment<byte> bytes, int channelId = Channels.DefaultReliable)
{
if (logNetworkMessages) Debug.Log("ConnectionSend con:" + connectionId + " bytes:" + BitConverter.ToString(bytes));
if (logNetworkMessages) Debug.Log("ConnectionSend con:" + connectionId + " bytes:" + BitConverter.ToString(bytes.Array, bytes.Offset, bytes.Count ));

if (bytes.Length > Transport.activeTransport.GetMaxPacketSize(channelId))
if (bytes.Count > Transport.activeTransport.GetMaxPacketSize(channelId))
{
Debug.LogError("NetworkConnection.SendBytes cannot send packet larger than " + Transport.activeTransport.GetMaxPacketSize(channelId) + " bytes");
return false;
}

if (bytes.Length == 0)
if (bytes.Count == 0)
{
// zero length packets getting into the packet queues are bad.
Debug.LogError("NetworkConnection.SendBytes cannot send zero bytes");
Expand Down Expand Up @@ -200,7 +204,7 @@ public virtual void TransportReceive(ArraySegment<byte> buffer)
}
}

public virtual bool TransportSend(int channelId, byte[] bytes)
public virtual bool TransportSend(int channelId, ArraySegment<byte> bytes)
{
if (Transport.activeTransport.ClientConnected())
{
Expand Down
18 changes: 14 additions & 4 deletions Assets/Mirror/Runtime/NetworkServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -170,14 +170,17 @@ internal static void ActivateLocalClientScene()

if (identity != null && identity.observers != null)
{
byte[] bytes = MessagePacker.Pack(msg);
NetworkWriter writer = NetworkWriterPool.GetWriter();
MessagePacker.Pack(msg, writer);
ArraySegment<byte> bytes = writer.ToArraySegment();

bool result = true;
foreach (KeyValuePair<int, NetworkConnection> kvp in identity.observers)
{
result &= kvp.Value.SendBytes(bytes);
}


NetworkWriterPool.Recycle(writer);
return result;
}
return false;
Expand All @@ -187,13 +190,17 @@ internal static void ActivateLocalClientScene()
{
if (LogFilter.Debug) Debug.Log("Server.SendToAll id:" + typeof(T));

byte[] bytes = MessagePacker.Pack(msg);
NetworkWriter writer = NetworkWriterPool.GetWriter();
MessagePacker.Pack(msg, writer);
ArraySegment<byte> bytes = writer.ToArraySegment();

bool result = true;
foreach (KeyValuePair<int, NetworkConnection> kvp in connections)
{
result &= kvp.Value.SendBytes(bytes, channelId);
}

NetworkWriterPool.Recycle(writer);
return result;
}

Expand All @@ -204,7 +211,9 @@ internal static void ActivateLocalClientScene()
if (identity != null && identity.observers != null)
{
// pack message into byte[] once
byte[] bytes = MessagePacker.Pack(msg);
NetworkWriter writer = NetworkWriterPool.GetWriter();
MessagePacker.Pack(msg, writer);
ArraySegment<byte> bytes = writer.ToArraySegment();

bool result = true;
foreach (KeyValuePair<int, NetworkConnection> kvp in identity.observers)
Expand All @@ -214,6 +223,7 @@ internal static void ActivateLocalClientScene()
result &= kvp.Value.SendBytes(bytes, channelId);
}
}
NetworkWriterPool.Recycle(writer);
return result;
}
return false;
Expand Down
16 changes: 6 additions & 10 deletions Assets/Mirror/Runtime/Transport/Tcp/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace Mirror.Tcp
public class Client : Common
{
public event Action Connected;
public event Action<ArraySegment<byte>> ReceivedData;
public event Action<byte[]> ReceivedData;
public event Action Disconnected;
public event Action<Exception> ReceivedError;

Expand Down Expand Up @@ -76,21 +76,17 @@ private async Task ReceiveLoop(TcpClient client)
{
using (Stream networkStream = client.GetStream())
{
MemoryStream buffer = new MemoryStream();
while (true)
{
buffer.SetLength(0);
byte[] data = await ReadMessageAsync(networkStream);

if (!await ReadMessageAsync(networkStream, buffer))
if (data == null)
break;

try
{
if (buffer.TryGetBuffer(out ArraySegment<byte> data))
{
// we received some data, raise event
ReceivedData?.Invoke(data);
}
// we received some data, raise event
ReceivedData?.Invoke(data);
}
catch (Exception exception)
{
Expand All @@ -114,7 +110,7 @@ public void Disconnect()
}

// send the data or throw exception
public async void Send(byte[] data)
public async void Send(ArraySegment<byte> data)
{
if (client == null)
{
Expand Down
43 changes: 23 additions & 20 deletions Assets/Mirror/Runtime/Transport/Tcp/Common.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,35 +39,38 @@ protected static void WriteSize(int length, byte[] bytes)

// send message (via stream) with the <size,content> message structure
// throws exception if there is a problem
protected static async Task SendMessage(NetworkStream stream, byte[] payload)
protected static async Task SendMessage(NetworkStream stream, ArraySegment<byte> content)
{
// note that payload has a 4 byte message length prefix from mirror
// stream.Write throws exceptions if client sends with high
// frequency and the server stops

// construct header (size)

// TODO: we can do this without allocation
// write header+content at once via payload array. writing
// header,payload separately would cause 2 TCP packets to be
// sent if nagle's algorithm is disabled(2x TCP header overhead)

// TODO: what if we are sending this message to multiple clients?
// we would allocate an identicall array buffer for all of them
// should be possible to do just one and use it for all connections
byte[] payload = new byte[4 + content.Count];
WriteSize(content.Count, payload);
Array.Copy(content.Array, content.Offset, payload, 4, content.Count);
await stream.WriteAsync(payload, 0, payload.Length).ConfigureAwait(false);
}

// read message (via stream) with the <size,content> message structure
protected static async Task<bool> ReadMessageAsync(Stream stream, MemoryStream buffer)
protected static async Task<byte[]> ReadMessageAsync(Stream stream)
{
byte[] messageSizeBuffer = await stream.ReadExactlyAsync(4);

// messages are packed with a 4 byte in message size
// read the size to see how much more data we need
int headerSize = await stream.ReadExactlyAsync(4, buffer);

if (headerSize == 0)
return false; // end of stream, just disconnect


stream.Position -= headerSize;
if (messageSizeBuffer == null)
return null; // end of stream, just disconnect

int messageSize =
stream.ReadByte() |
(stream.ReadByte() << 8) |
(stream.ReadByte() << 16) |
(stream.ReadByte() << 32);
int messageSize = BytesToInt(messageSizeBuffer);

// read the rest of the message
int readSize = await stream.ReadExactlyAsync(messageSize - headerSize, buffer);
return readSize == messageSize - headerSize;
return await stream.ReadExactlyAsync(messageSize);
}

}
Expand Down
52 changes: 2 additions & 50 deletions Assets/Mirror/Runtime/Transport/Tcp/NetworkStreamExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.IO;
using System.IO;
using System.Net.Sockets;
using System.Threading.Tasks;

Expand All @@ -22,7 +21,7 @@ public static async Task<byte[]> ReadExactlyAsync(this Stream stream, int size)
while (offset < size)
{
int received;
if (stream is NetworkStream netStream && netStream.DataAvailable)
if (stream is NetworkStream && ((NetworkStream)stream).DataAvailable)
{
// read available data immediatelly
// this is an important optimization because unity seems
Expand All @@ -49,52 +48,5 @@ public static async Task<byte[]> ReadExactlyAsync(this Stream stream, int size)
return data;
}

// helper function to read EXACTLY 'n' bytes
// -> default .Read reads up to 'n' bytes. this function reads exactly 'n'
// bytes
// -> either return all the bytes requested or null if end of stream
public static async Task<int> ReadExactlyAsync(this Stream stream, int size, MemoryStream buffer)
{
// make sure the buffer is big enough

if (buffer.Capacity < buffer.Position + size)
{
buffer.Capacity = (int)(buffer.Position + size);
}

int offset = 0;

// keep reading until we fill up the buffer;
while (offset < size)
{
int received;
if (stream is NetworkStream netstream && netstream.DataAvailable)
{
// read available data immediatelly
// this is an important optimization because unity seems
// to wait until the next frame every time we call ReadAsync
// so if we have a bunch of data waiting in the buffer it takes a long
// time to receive it.
received = stream.Read(buffer.GetBuffer(), (int)(offset + buffer.Position), size - offset);
}
else
{
// wait for more data
received = await stream.ReadAsync(buffer.GetBuffer(), (int)(offset + buffer.Position), size - offset);
}

// we just got disconnected
if (received == 0)
{
return 0;
}

offset += received;
}

buffer.Position += size;
return size;
}

}
}
13 changes: 5 additions & 8 deletions Assets/Mirror/Runtime/Transport/Tcp/Server.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace Mirror.Tcp
public class Server : Common
{
public event Action<int> Connected;
public event Action<int, ArraySegment<byte>> ReceivedData;
public event Action<int, byte[]> ReceivedData;
public event Action<int> Disconnected;
public event Action<int, Exception> ReceivedError;

Expand Down Expand Up @@ -124,18 +124,15 @@ private async void ReceiveLoop(TcpClient tcpClient)
{
while (true)
{
MemoryStream buffer = new MemoryStream();
byte[] data = await ReadMessageAsync(networkStream);

if (!await ReadMessageAsync(networkStream, buffer))
if (data == null)
break;

try
{
// we received some data, raise event
if (buffer.TryGetBuffer(out ArraySegment<byte> data))
{
ReceivedData?.Invoke(connectionId, data);
}
ReceivedData?.Invoke(connectionId, data);
}
catch (Exception exception)
{
Expand Down Expand Up @@ -180,7 +177,7 @@ public void Stop()
}

// send message to client using socket connection or throws exception
public async void Send(int connectionId, byte[] data)
public async void Send(int connectionId, ArraySegment<byte> data)
{
// find the connection
if (clients.TryGetValue(connectionId, out TcpClient client))
Expand Down
8 changes: 4 additions & 4 deletions Assets/Mirror/Runtime/Transport/Tcp/TcpTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ public void Awake()
// dispatch the events from the server
server.Connected += (connectionId) => OnServerConnected.Invoke(connectionId);
server.Disconnected += (connectionId) => OnServerDisconnected.Invoke(connectionId);
server.ReceivedData += (connectionId, data) => OnServerDataReceived.Invoke(connectionId, data);
server.ReceivedData += (connectionId, data) => OnServerDataReceived.Invoke(connectionId, new ArraySegment<byte>(data));
server.ReceivedError += (connectionId, error) => OnServerError.Invoke(connectionId, error);

// dispatch events from the client
client.Connected += () => OnClientConnected.Invoke();
client.Disconnected += () => OnClientDisconnected.Invoke();
client.ReceivedData += (data) => OnClientDataReceived.Invoke(data);
client.ReceivedData += (data) => OnClientDataReceived.Invoke(new ArraySegment<byte>(data));
client.ReceivedError += (error) => OnClientError.Invoke(error);

// configure
Expand All @@ -38,7 +38,7 @@ public void Awake()
// client
public override bool ClientConnected() { return client.IsConnected; }
public override void ClientConnect(string address) { client.Connect(address, port); }
public override bool ClientSend(int channelId, byte[] data)
public override bool ClientSend(int channelId, ArraySegment<byte> data)
{
client.Send(data);
return true;
Expand All @@ -55,7 +55,7 @@ public override void ServerStart()
server.Listen(port);
}

public override bool ServerSend(int connectionId, int channelId, byte[] data)
public override bool ServerSend(int connectionId, int channelId, ArraySegment<byte> data)
{
server.Send(connectionId, data);
return true;
Expand Down

0 comments on commit bb128fe

Please sign in to comment.