Skip to content

Commit

Permalink
feat: Piped connection (#138)
Browse files Browse the repository at this point in the history
* Piped connection

* Remove unnecesary using
  • Loading branch information
paulpach committed Mar 30, 2020
1 parent 6877e0f commit 471a881
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 0 deletions.
89 changes: 89 additions & 0 deletions Assets/Mirror/Runtime/Transport/PipeConnection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
using System;
using System.IO;
using System.Net;
using System.Threading;
using System.Threading.Tasks;

namespace Mirror
{

/// <summary>
/// A connection that is directly connected to another connection
/// If you send data in one of them, you receive it on the other one
/// </summary>
public class PipeConnection : IConnection
{

private PipeConnection connected;

// should only be created by CreatePipe
private PipeConnection()
{

}

// buffer where we can queue up data
readonly NetworkWriter writer = new NetworkWriter();
readonly NetworkReader reader = new NetworkReader(new byte[] { });

// counts how many messages we have pending
private readonly SemaphoreSlim MessageCount = new SemaphoreSlim(0);

public static (IConnection, IConnection) CreatePipe()
{
var c1 = new PipeConnection();
var c2 = new PipeConnection();

c1.connected = c2;
c2.connected = c1;

return (c1, c2);
}

public void Disconnect()
{
// disconnect both ends of the pipe
connected.writer.WriteBytesAndSizeSegment(new ArraySegment<byte>(Array.Empty<byte>()));
connected.MessageCount.Release();

writer.WriteBytesAndSizeSegment(new ArraySegment<byte>(Array.Empty<byte>()));
MessageCount.Release();
}

// technically not an IPEndpoint, will fix later
public EndPoint GetEndPointAddress() => new IPEndPoint(IPAddress.Loopback, 0);

public async Task<bool> ReceiveAsync(MemoryStream buffer)
{
// wait for a message
await MessageCount.WaitAsync();

reader.buffer = writer.ToArraySegment();

ArraySegment<byte> data = reader.ReadBytesAndSizeSegment();

if (data.Count == 0)
return false;

buffer.Write(data.Array, data.Offset, data.Count);

if (reader.Position == reader.Length)
{
// if we reached the end of the buffer, reset the buffer to recycle memory
writer.SetLength(0);
reader.Position = 0;
}

return true;
}

public Task SendAsync(ArraySegment<byte> data)
{
// add some data to the writer in the connected connection
// and increase the message count
connected.writer.WriteBytesAndSizeSegment(data);
connected.MessageCount.Release();
return Task.CompletedTask;
}
}
}
11 changes: 11 additions & 0 deletions Assets/Mirror/Runtime/Transport/PipeConnection.cs.meta

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

108 changes: 108 additions & 0 deletions Assets/Mirror/Tests/Editor/PipeConnectionTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
using System;
using System.Collections;
using System.IO;
using System.Net;
using System.Threading.Tasks;
using NUnit.Framework;
using UnityEngine.TestTools;

using static Mirror.Tests.AsyncUtil;

namespace Mirror.Tests
{
public class AsyncPipeConnectionTest
{

IConnection c1;
IConnection c2;

[SetUp]
public void Setup()
{
(c1, c2) = PipeConnection.CreatePipe();
}

private static async Task TestSendData(IConnection c1, IConnection c2)
{
byte[] data = { 1, 2, 3, 4 };

await c1.SendAsync(new ArraySegment<byte>(data));
var memoryStream = new MemoryStream();
Assert.That(await c2.ReceiveAsync(memoryStream));

memoryStream.TryGetBuffer(out ArraySegment<byte> receivedData);

Assert.That(receivedData, Is.EqualTo(receivedData));
}

[UnityTest]
public IEnumerator TestSendAndReceive()
{
return RunAsync(async () =>
{
await TestSendData(c1, c2);
});
}

[UnityTest]
public IEnumerator TestSendAndReceiveBackwards()
{
return RunAsync(async () =>
{
await TestSendData(c2, c1);
});
}

[UnityTest]
public IEnumerator TestSendAndReceiveMultiple()
{
return RunAsync(async () =>
{
await TestSendData(c1, c2);
await TestSendData(c1, c2);
});
}

[UnityTest]
public IEnumerator TestDisconnectC1()
{
return RunAsync(async () =>
{
// disconnecting c1 should disconnect both
c1.Disconnect();
var memoryStream = new MemoryStream();
Assert.That(await c1.ReceiveAsync(memoryStream), Is.False);
Assert.That(await c2.ReceiveAsync(memoryStream), Is.False);
});
}

[UnityTest]
public IEnumerator TestDisconnectC2()
{
return RunAsync(async () =>
{
// disconnecting c1 should disconnect both
c2.Disconnect();
var memoryStream = new MemoryStream();
Assert.That(await c1.ReceiveAsync(memoryStream), Is.False);
Assert.That(await c2.ReceiveAsync(memoryStream), Is.False);
});
}

[Test]
public void TestAddressC1()
{
Assert.That(c1.GetEndPointAddress(), Is.EqualTo(new IPEndPoint(IPAddress.Loopback, 0)));
}

[Test]
public void TestAddressC2()
{
Assert.That(c2.GetEndPointAddress(), Is.EqualTo(new IPEndPoint(IPAddress.Loopback, 0)));
}

}
}
11 changes: 11 additions & 0 deletions Assets/Mirror/Tests/Editor/PipeConnectionTest.cs.meta

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 471a881

Please sign in to comment.