Skip to content

Commit

Permalink
feat: adding option to disable reliable layer
Browse files Browse the repository at this point in the history
useful for tcp sockets like websocket

NoReliableConnection will batch all message no matter the channel, and then send them with Reliable packet type
  • Loading branch information
James-Frowen committed Apr 30, 2023
1 parent 348deb8 commit 6618b5d
Show file tree
Hide file tree
Showing 7 changed files with 377 additions and 245 deletions.
6 changes: 6 additions & 0 deletions Assets/Mirage/Runtime/SocketLayer/Config.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ public class Config
/// <summary>
/// Max concurrent connections server will accept
/// </summary>
// todo remove replace with "stop connections" instead. add some high level feature (maybe net authenticator?)
public int MaxConnections = 100;
#endregion

Expand Down Expand Up @@ -97,6 +98,11 @@ public class Config
/// <para>max value is 255</para>
/// </summary>
public int MaxReliableFragments = 5;

/// <summary>
/// Enable if the Socket you are using has its own Reliable layer. For example using Websocket, which is TCP.
/// </summary>
public bool DisableReliableLayer = false;
#endregion
}
}
275 changes: 43 additions & 232 deletions Assets/Mirage/Runtime/SocketLayer/Connection/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,23 @@

namespace Mirage.SocketLayer
{
/// <summary>
/// Objects that represends a connection to/from a server/client. Holds state that is needed to update, send, and receive data
/// </summary>
internal sealed class Connection : IConnection, IRawConnection
internal abstract class Connection : IConnection
{
private readonly ILogger _logger;
protected readonly ILogger _logger;
protected readonly int _maxPacketSize;
protected readonly Peer _peer;
protected readonly IDataHandler _dataHandler;

private readonly Config _config;
private readonly Peer _peer;
public readonly IEndPoint EndPoint;
private readonly IDataHandler _dataHandler;

private readonly ConnectingTracker _connectingTracker;
private readonly TimeoutTracker _timeoutTracker;
private readonly KeepAliveTracker _keepAliveTracker;
private readonly DisconnectedTracker _disconnectedTracker;
protected readonly ConnectingTracker _connectingTracker;
protected readonly TimeoutTracker _timeoutTracker;
protected readonly KeepAliveTracker _keepAliveTracker;
protected readonly DisconnectedTracker _disconnectedTracker;

private readonly Metrics _metrics;
private readonly AckSystem _ackSystem;
protected readonly Metrics _metrics;

private ConnectionState _state;
protected ConnectionState _state;
public ConnectionState State
{
get => _state;
Expand Down Expand Up @@ -59,11 +55,11 @@ public ConnectionState State

public bool Connected => State == ConnectionState.Connected;

internal Connection(Peer peer, IEndPoint endPoint, IDataHandler dataHandler, Config config, int maxPacketSize, Time time, Pool<ByteBuffer> bufferPool, ILogger logger, Metrics metrics)
protected Connection(Peer peer, IEndPoint endPoint, IDataHandler dataHandler, Config config, int maxPacketSize, Time time, ILogger logger, Metrics metrics)
{
_peer = peer;
_config = config;
_logger = logger;
_maxPacketSize = maxPacketSize;

EndPoint = endPoint ?? throw new ArgumentNullException(nameof(endPoint));
_dataHandler = dataHandler ?? throw new ArgumentNullException(nameof(dataHandler));
Expand All @@ -75,7 +71,6 @@ internal Connection(Peer peer, IEndPoint endPoint, IDataHandler dataHandler, Con
_disconnectedTracker = new DisconnectedTracker(config, time);

_metrics = metrics;
_ackSystem = new AckSystem(this, config, maxPacketSize, time, bufferPool, metrics);
}

public override string ToString()
Expand Down Expand Up @@ -114,102 +109,14 @@ public void SetSendTime()
_keepAliveTracker.SetSendTime();
}

private void ThrowIfNotConnected()
protected void ThrowIfNotConnectedOrConnecting()
{
if (_state != ConnectionState.Connected)
// sending to Connecting is also valid
if (_state != ConnectionState.Connected && _state != ConnectionState.Connecting)
throw new InvalidOperationException("Connection is not connected");
}


public void SendUnreliable(byte[] packet, int offset, int length)
{
ThrowIfNotConnected();
_metrics?.OnSendMessageUnreliable(length);
_peer.SendUnreliable(this, packet, offset, length);
}
public void SendUnreliable(byte[] packet)
{
SendUnreliable(packet, 0, packet.Length);
}
public void SendUnreliable(ArraySegment<byte> packet)
{
SendUnreliable(packet.Array, packet.Offset, packet.Count);
}

/// <summary>
/// Use <see cref="INotifyCallBack"/> version for non-alloc
/// </summary>
public INotifyToken SendNotify(byte[] packet, int offset, int length)
{
ThrowIfNotConnected();
_metrics?.OnSendMessageNotify(length);
return _ackSystem.SendNotify(packet, offset, length);
}
/// <summary>
/// Use <see cref="INotifyCallBack"/> version for non-alloc
/// </summary>
public INotifyToken SendNotify(byte[] packet)
{
return SendNotify(packet, 0, packet.Length);
}
/// <summary>
/// Use <see cref="INotifyCallBack"/> version for non-alloc
/// </summary>
public INotifyToken SendNotify(ArraySegment<byte> packet)
{
return SendNotify(packet.Array, packet.Offset, packet.Count);
}

/// <summary>
/// Use <see cref="INotifyCallBack"/> version for non-alloc
/// </summary>
public void SendNotify(byte[] packet, int offset, int length, INotifyCallBack callBacks)
{
ThrowIfNotConnected();
_metrics?.OnSendMessageNotify(length);
_ackSystem.SendNotify(packet, offset, length, callBacks);
}
/// <summary>
/// Use <see cref="INotifyCallBack"/> version for non-alloc
/// </summary>
public void SendNotify(byte[] packet, INotifyCallBack callBacks)
{
SendNotify(packet, 0, packet.Length, callBacks);
}
/// <summary>
/// Use <see cref="INotifyCallBack"/> version for non-alloc
/// </summary>
public void SendNotify(ArraySegment<byte> packet, INotifyCallBack callBacks)
{
SendNotify(packet.Array, packet.Offset, packet.Count, callBacks);
}


/// <summary>
/// single message, batched by AckSystem
/// </summary>
/// <param name="message"></param>
public void SendReliable(byte[] message, int offset, int length)
{
ThrowIfNotConnected();
_metrics?.OnSendMessageReliable(length);
_ackSystem.SendReliable(message, offset, length);
}
public void SendReliable(byte[] packet)
{
SendReliable(packet, 0, packet.Length);
}
public void SendReliable(ArraySegment<byte> packet)
{
SendReliable(packet.Array, packet.Offset, packet.Count);
}


void IRawConnection.SendRaw(byte[] packet, int length)
{
_peer.Send(this, packet, length);
}

/// <summary>
/// starts disconnecting this connection
/// </summary>
Expand Down Expand Up @@ -237,122 +144,6 @@ internal void Disconnect(DisconnectReason reason, bool sendToOther = true)
}
}

internal void ReceiveUnreliablePacket(Packet packet)
{
var offset = 1;
var count = packet.Length - offset;
var segment = new ArraySegment<byte>(packet.Buffer.array, offset, count);
_metrics?.OnReceiveMessageUnreliable(count);
_dataHandler.ReceiveMessage(this, segment);
}

internal void ReceiveReliablePacket(Packet packet)
{
_ackSystem.ReceiveReliable(packet.Buffer.array, packet.Length, false);

HandleQueuedMessages();
}

internal void ReceiveReliableFragment(Packet packet)
{
if (_ackSystem.InvalidFragment(packet.Buffer.array))
{
Disconnect(DisconnectReason.InvalidPacket);
return;
}

_ackSystem.ReceiveReliable(packet.Buffer.array, packet.Length, true);

HandleQueuedMessages();
}

private void HandleQueuedMessages()
{
// gets messages in order
while (_ackSystem.NextReliablePacket(out var received))
{
if (received.IsFragment)
{
HandleFragmentedMessage(received);
}
else
{
HandleBatchedMessageInPacket(received);
}
}
}

private void HandleFragmentedMessage(AckSystem.ReliableReceived received)
{
// get index from first
var firstArray = received.Buffer.array;
// length +1 because zero indexed
var fragmentLength = firstArray[0] + 1;

// todo find way to remove allocation? (can't use buffers because they will be too small for this bigger message)
var message = new byte[fragmentLength * _ackSystem.SizePerFragment];

// copy first
var copyLength = received.Length - 1;
_logger?.Assert(copyLength == _ackSystem.SizePerFragment, "First should be max size");
Buffer.BlockCopy(firstArray, 1, message, 0, copyLength);
received.Buffer.Release();

var messageLength = copyLength;
// start at 1 because first copied above
for (var i = 1; i < fragmentLength; i++)
{
var next = _ackSystem.GetNextFragment();
var nextArray = next.Buffer.array;

_logger?.Assert(i == (fragmentLength - 1 - nextArray[0]), "fragment index should decrement each time");

// +1 because first is copied above
copyLength = next.Length - 1;
Buffer.BlockCopy(nextArray, 1, message, _ackSystem.SizePerFragment * i, copyLength);
messageLength += copyLength;
next.Buffer.Release();
}

_metrics?.OnReceiveMessageReliable(messageLength);
_dataHandler.ReceiveMessage(this, new ArraySegment<byte>(message, 0, messageLength));
}

private void HandleBatchedMessageInPacket(AckSystem.ReliableReceived received)
{
var array = received.Buffer.array;
var packetLength = received.Length;
var offset = 0;
while (offset < packetLength)
{
var length = ByteUtils.ReadUShort(array, ref offset);
var message = new ArraySegment<byte>(array, offset, length);
offset += length;

_metrics?.OnReceiveMessageReliable(length);
_dataHandler.ReceiveMessage(this, message);
}

// release buffer after all its message have been handled
received.Buffer.Release();
}

internal void ReceiveNotifyPacket(Packet packet)
{
var segment = _ackSystem.ReceiveNotify(packet.Buffer.array, packet.Length);
if (segment != default)
{
_metrics?.OnReceiveMessageNotify(packet.Length);
_dataHandler.ReceiveMessage(this, segment);
}
}

internal void ReceiveNotifyAck(Packet packet)
{
_ackSystem.ReceiveAck(packet.Buffer.array);
}


/// <summary>
/// client connecting attempts
/// </summary>
Expand Down Expand Up @@ -383,28 +174,48 @@ private void UpdateDisconnected()
}
}

void IConnection.FlushBatch()
{
_ackSystem.Update();
}

/// <summary>
/// Used to keep connection alive
/// </summary>
/// </summary>
private void UpdateConnected()
{
if (_timeoutTracker.TimeToDisconnect())
{
Disconnect(DisconnectReason.Timeout);
}

_ackSystem.Update();
FlushBatch();

if (_keepAliveTracker.TimeToSend())
{
_peer.SendKeepAlive(this);
}
}

public abstract void FlushBatch();

public abstract void SendReliable(byte[] message, int offset, int length);
public abstract INotifyToken SendNotify(byte[] packet, int offset, int length);
public abstract void SendNotify(byte[] packet, int offset, int length, INotifyCallBack callBacks);
public abstract void SendUnreliable(byte[] packet, int offset, int length);

internal abstract void ReceiveUnreliablePacket(Packet packet);
internal abstract void ReceiveNotifyPacket(Packet packet);
internal abstract void ReceiveReliablePacket(Packet packet);
internal abstract void ReceiveNotifyAck(Packet packet);
internal abstract void ReceiveReliableFragment(Packet packet);

protected void HandleReliableBatched(byte[] array, int offset, int packetLength)
{
while (offset < packetLength)
{
var length = ByteUtils.ReadUShort(array, ref offset);
var message = new ArraySegment<byte>(array, offset, length);
offset += length;

_metrics?.OnReceiveMessageReliable(length);
_dataHandler.ReceiveMessage(this, message);
}
}
}
}

0 comments on commit 6618b5d

Please sign in to comment.