Skip to content

Commit

Permalink
perf: removing alloc from reliable sends
Browse files Browse the repository at this point in the history
  • Loading branch information
James-Frowen committed Jul 20, 2021
1 parent 2f4112c commit 00945f3
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 14 deletions.
57 changes: 45 additions & 12 deletions Assets/Mirage/Runtime/SocketLayer/AckSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ internal class AckSystem
readonly IRawConnection connection;
readonly ITime time;
readonly Pool<ByteBuffer> bufferPool;
readonly Pool<ReliablePacket> reliablePool;
readonly Metrics metrics;

//todo implement this
Expand Down Expand Up @@ -84,6 +85,7 @@ public AckSystem(IRawConnection connection, Config config, ITime time, Pool<Byte
this.connection = connection;
this.time = time;
this.bufferPool = bufferPool;
this.reliablePool = new Pool<ReliablePacket>(ReliablePacket.CreateNew, default, 5, maxPacketsInSendBufferPerConnection);
this.metrics = metrics;

ackTimeout = config.TimeBeforeEmptyAck;
Expand Down Expand Up @@ -351,15 +353,17 @@ private ReliablePacket CreateReliableBuffer(PacketType packetType)
{
ushort order = (ushort)reliableOrder.Next();

ByteBuffer final = bufferPool.Take();
ReliablePacket packet = reliablePool.Take();
ByteBuffer buffer = bufferPool.Take();

int offset = 0;
ByteUtils.WriteByte(final.array, ref offset, (byte)packetType);
ByteUtils.WriteByte(buffer.array, ref offset, (byte)packetType);

offset = SEQUENCE_HEADER;
ByteUtils.WriteUShort(final.array, ref offset, order);
ByteUtils.WriteUShort(buffer.array, ref offset, order);

return new ReliablePacket(final, RELIABLE_HEADER_SIZE, order);
packet.Setup(order, buffer, RELIABLE_HEADER_SIZE);
return packet;
}

static void AddToBatch(ReliablePacket packet, byte[] message, int offset, int length)
Expand Down Expand Up @@ -559,7 +563,7 @@ private void ackMessagesInSentQueue(ushort sequence, ulong mask)
uint ackableSequence = (uint)sequencer.MoveInBounds(start + i);
AckablePacket ackable = sentAckablePackets[ackableSequence];

if (ackable.Equals(default))
if (ackable.IsNotValid())
continue;

if (alreadyAcked(ackable, ackableSequence))
Expand All @@ -574,6 +578,7 @@ private bool alreadyAcked(AckablePacket ackable, uint ackableSequence)
// if we have already ackeds this, just remove it
if (ackable.IsReliable && ackable.reliablePacket.acked)
{
Debug.Assert(false, "We should never see an already ackked packet, it should have been removed");
// this should never happen because packet should be removed when acked is set to true
// but remove it just incase we get here
sentAckablePackets.RemoveAt(ackableSequence);
Expand Down Expand Up @@ -614,11 +619,11 @@ private void CheckAckablePacket(ushort sequence, ulong mask, AckablePacket ackab

private void reliableAcked(ReliablePacket reliablePacket)
{
reliablePacket.OnAck();
foreach (ushort seq in reliablePacket.sequences)
{
sentAckablePackets.RemoveAt(seq);
}
reliablePacket.OnAck();
}

private void reliableLost(ushort sequence, ReliablePacket reliablePacket)
Expand Down Expand Up @@ -684,6 +689,15 @@ public bool Equals(AckablePacket other)
return token == other.token &&
reliablePacket == other.reliablePacket;
}

/// <summary>
/// returns true if this is default value of struct
/// </summary>
/// <returns></returns>
public bool IsNotValid()
{
return token == null && reliablePacket == null;
}
}

class ReliablePacket
Expand All @@ -692,9 +706,11 @@ class ReliablePacket
public bool acked;
public int length;

public readonly List<ushort> sequences = new List<ushort>();
public readonly ByteBuffer buffer;
public readonly ushort order;
public ByteBuffer buffer;
public ushort order;

public readonly List<ushort> sequences = new List<ushort>(4);
private readonly Pool<ReliablePacket> pool;

public void OnSend(ushort sequence)
{
Expand All @@ -706,13 +722,25 @@ public void OnAck()
{
acked = true;
buffer.Release();
Release();
}

public ReliablePacket(ByteBuffer packet, int length, ushort order)
public void Setup(ushort order, ByteBuffer buffer, int length)
{
buffer = packet;
this.length = length;
this.order = order;
this.buffer = buffer;
this.length = length;
}

public void Release()
{
sequences.Clear();
pool.Put(this);
}

private ReliablePacket(Pool<ReliablePacket> pool)
{
this.pool = pool;
}

public override int GetHashCode()
Expand All @@ -729,6 +757,11 @@ public override bool Equals(object obj)
}
return false;
}

public static ReliablePacket CreateNew(int _size, Pool<ReliablePacket> pool)
{
return new ReliablePacket(pool);
}
}
public struct ReliableReceived : IEquatable<ReliableReceived>
{
Expand Down
5 changes: 3 additions & 2 deletions Assets/Mirage/Runtime/SocketLayer/Pool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ public class Pool<T> where T : class
int maxPoolSize;
readonly int bufferSize;
readonly ILogger logger;
readonly Func<int, Pool<T>, T> createNew;
public delegate T CreateNewItem(int bufferSize, Pool<T> pool);
readonly CreateNewItem createNew;


T[] pool;
Expand Down Expand Up @@ -50,7 +51,7 @@ public void Configure(int startPoolSize, int maxPoolSize)
/// <param name="startPoolSize">how many buffers to create at start</param>
/// <param name="maxPoolSize">max number of buffers in pool</param>
/// <param name="logger"></param>
public Pool(Func<int, Pool<T>, T> createNew, int bufferSize, int startPoolSize, int maxPoolSize, ILogger logger = null)
public Pool(CreateNewItem createNew, int bufferSize, int startPoolSize, int maxPoolSize, ILogger logger = null)
{
if (startPoolSize > maxPoolSize) throw new ArgumentException("Start size must be less than max size", nameof(startPoolSize));
this.createNew = createNew ?? throw new ArgumentNullException(nameof(createNew));
Expand Down

0 comments on commit 00945f3

Please sign in to comment.