Skip to content

Commit

Permalink
fix: #1241 - Telepathy updated to latest version. All tests are passi…
Browse files Browse the repository at this point in the history
…ng again. Thread.Interrupt was replaced by Abort+Join.
  • Loading branch information
miwarnec committed Jan 7, 2020
1 parent b581e89 commit 228b32e
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 18 deletions.
23 changes: 16 additions & 7 deletions Assets/Mirror/Runtime/Transport/Telepathy/Client.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using System.Collections.Concurrent;
using System.Net.Sockets;
using System.Threading;
Expand Down Expand Up @@ -77,6 +77,14 @@ void ReceiveThreadFunction(string ip, int port)
// knows that the Connect failed. otherwise they will never know
receiveQueue.Enqueue(new Message(0, EventType.Disconnected, null));
}
catch (ThreadInterruptedException)
{
// expected if Disconnect() aborts it
}
catch (ThreadAbortException)
{
// expected if Disconnect() aborts it
}
catch (Exception exception)
{
// something went wrong. probably important.
Expand All @@ -89,7 +97,8 @@ void ReceiveThreadFunction(string ip, int port)
// otherwise the send thread would only end if it's
// actually sending data while the connection is
// closed.
sendThread?.Interrupt();
// => AbortAndJoin is the safest way and avoids race conditions!
sendThread?.AbortAndJoin();

// Connect might have failed. thread might have been closed.
// let's reset connecting state no matter what.
Expand Down Expand Up @@ -153,11 +162,11 @@ public void Disconnect()
// close client
client.Close();

// wait until thread finished. this is the only way to guarantee
// that we can call Connect() again immediately after Disconnect
// -> calling .Join would sometimes wait forever, e.g. when
// calling Disconnect while trying to connect to a dead end
receiveThread?.Interrupt();
// kill the receive thread
// => AbortAndJoin is the safest way and avoids race conditions!
// this way we can guarantee that when Disconnect() returns,
// we are 100% ready for the next Connect!
receiveThread?.AbortAndJoin();

// we interrupted the receive Thread, so we can't guarantee that
// connecting was reset. let's do it manually.
Expand Down
2 changes: 1 addition & 1 deletion Assets/Mirror/Runtime/Transport/Telepathy/Common.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// common code used by server and client
// common code used by server and client
using System;
using System.Collections.Concurrent;
using System.Net.Sockets;
Expand Down
2 changes: 1 addition & 1 deletion Assets/Mirror/Runtime/Transport/Telepathy/EventType.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace Telepathy
namespace Telepathy
{
public enum EventType
{
Expand Down
2 changes: 1 addition & 1 deletion Assets/Mirror/Runtime/Transport/Telepathy/Logger.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// A simple logger class that uses Console.WriteLine by default.
// A simple logger class that uses Console.WriteLine by default.
// Can also do Logger.LogMethod = Debug.Log for Unity etc.
// (this way we don't have to depend on UnityEngine.DLL and don't need a
// different version for every UnityEngine version here)
Expand Down
2 changes: 1 addition & 1 deletion Assets/Mirror/Runtime/Transport/Telepathy/Message.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// incoming message queue of <connectionId, message>
// incoming message queue of <connectionId, message>
// (not a HashSet because one connection can have multiple new messages)
// -> a struct to minimize GC
namespace Telepathy
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System.IO;
using System.IO;
using System.Net.Sockets;

namespace Telepathy
Expand Down
2 changes: 1 addition & 1 deletion Assets/Mirror/Runtime/Transport/Telepathy/SafeQueue.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Net 4.X has ConcurrentQueue, but ConcurrentQueue has no TryDequeueAll method,
// Net 4.X has ConcurrentQueue, but ConcurrentQueue has no TryDequeueAll method,
// which makes SafeQueue twice as fast for the send thread.
//
// uMMORPG 450 CCU
Expand Down
21 changes: 16 additions & 5 deletions Assets/Mirror/Runtime/Transport/Telepathy/Server.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Net;
Expand Down Expand Up @@ -145,7 +145,8 @@ void Listen(int port)
// otherwise the send thread would only end if it's
// actually sending data while the connection is
// closed.
sendThread.Interrupt();
// => AbortAndJoin is the safest way and avoids race conditions!
sendThread.AbortAndJoin();
}
catch (Exception exception)
{
Expand Down Expand Up @@ -214,8 +215,18 @@ public void Stop()

// kill listener thread at all costs. only way to guarantee that
// .Active is immediately false after Stop.
// -> calling .Join would sometimes wait forever
listenerThread?.Interrupt();
// => AbortAndJoin is the safest way and avoids race conditions!
listenerThread?.AbortAndJoin();

// wait until thread is TRULY finished. this is the only way
// to guarantee that everything was properly cleaned up before
// returning.
// => this means that calling Stop() may sometimes block
// for a while, but there is no other way to guarantee that
// everything is cleaned up properly by the time Stop() returns.
// we have to live with the wait time.
listenerThread?.Join();

listenerThread = null;

// close all client connections
Expand All @@ -224,7 +235,7 @@ public void Stop()
TcpClient client = kvp.Value.client;
// close the stream if not closed yet. it may have been closed
// by a disconnect already, so use try/catch
try { client.GetStream().Close(); } catch { }
try { client.GetStream().Close(); } catch {}
client.Close();
}

Expand Down
26 changes: 26 additions & 0 deletions Assets/Mirror/Runtime/Transport/Telepathy/ThreadExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using System.Threading;

namespace Telepathy
{
public static class ThreadExtensions
{
// helper function to abort a thread and not return until it's fully done
public static void AbortAndJoin(this Thread thread)
{
// kill thread at all costs
// -> calling .Join would sometimes wait forever
// -> calling .Interrupt only interrupts certain states.
// => Abort() is the better solution.
thread.Abort();

// wait until thread is TRULY finished. this is the only way
// to guarantee that everything was properly cleaned up before
// returning.
// => this means that this function may sometimes block for a while
// but there is no other way to guarantee that everything is
// cleaned up properly by the time Stop() returns.
// we have to live with the wait time.
thread.Join();
}
}
}

0 comments on commit 228b32e

Please sign in to comment.