From 169f47dd8d93f100c25edd39d97c1aa50e2f9059 Mon Sep 17 00:00:00 2001 From: "Tristan B. Velloza Kildaire" Date: Sun, 26 Nov 2023 19:11:25 +0200 Subject: [PATCH] Watcher - Hoisted common imports into a `version(unittest)` Watcher (unittests) - Added a unittest for testing `stop()` to unblock `dequeue()`s --- source/tristanable/manager/watcher.d | 142 ++++++++++++++++++++++++++- 1 file changed, 138 insertions(+), 4 deletions(-) diff --git a/source/tristanable/manager/watcher.d b/source/tristanable/manager/watcher.d index b86af6d..6501826 100644 --- a/source/tristanable/manager/watcher.d +++ b/source/tristanable/manager/watcher.d @@ -130,6 +130,9 @@ public class Watcher : Thread } // TODO: Unblock all `dequeue()`'s here + // TODO: Get a reason for exiting (either cause of error OR shutdoiwn (see below (which in turn is called by the Manager))) + version(unittest) { writeln("Exited watcher loop"); } + this.manager.stop(); } /** @@ -143,6 +146,13 @@ public class Watcher : Thread } } +version(unittest) +{ + import std.socket; + import std.stdio; + import core.thread; +} + /** * Set up a server which will send some tagged messages to us (the client), * where we have setup a `Manager` to watch the queues with tags `42` and `69`, @@ -150,10 +160,6 @@ public class Watcher : Thread */ unittest { - import std.socket; - import std.stdio; - import core.thread; - Address serverAddress = parseAddress("::1", 0); Socket server = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP); server.bind(serverAddress); @@ -296,4 +302,132 @@ unittest /* Stop the manager */ manager.stop(); +} + +/** + * Setup a server which dies (kills its connection to us) + * midway whilst we are doing a `dequeue()` + * + * This is to test the exception triggering mechanism + * for such a case + */ +unittest +{ + +} + +/** + * Setup a `Manager` and then block on a `dequeue()` + * but from another thread shutdown the `Manager`. + * + * This is to test the exception triggering mechanism + * for such a case + */ +unittest +{ + writeln("<<<<< Test 4 start >>>>>"); + + Address serverAddress = parseAddress("::1", 0); + Socket server = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP); + server.bind(serverAddress); + server.listen(0); + + class ServerThread : Thread + { + this() + { + super(&worker); + } + + private void worker() + { + Socket clientSocket = server.accept(); + BClient bClient = new BClient(clientSocket); + + Thread.sleep(dur!("seconds")(7)); + writeln("Server start"); + + /** + * Create a tagged message to send + * + * tag 42 payload Cucumber 😳️ + */ + TaggedMessage message = new TaggedMessage(42, cast(byte[])"Cucumber 😳️"); + byte[] tEncoded = message.encode(); + writeln("server send status: ", bClient.sendMessage(tEncoded)); + + writeln("server send [done]"); + + sleep(dur!("seconds")(15)); + + writeln("Server ending"); + } + } + + ServerThread serverThread = new ServerThread(); + serverThread.start(); + + Socket client = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP); + + writeln(server.localAddress); + + + Manager manager = new Manager(client); + + Queue sixtyNine = new Queue(69); + + manager.registerQueue(sixtyNine); + + + /* Connect our socket to the server */ + client.connect(server.localAddress); + + /* Start the manager and let it manage the socket */ + manager.start(); + + + // The failing exception + TristanableException failingException; + + class DequeueThread : Thread + { + private Queue testQueue; + + this(Queue testQueue) + { + super(&worker); + this.testQueue = testQueue; + } + + public void worker() + { + try + { + writeln("dequeuThread: Before dequeue()"); + this.testQueue.dequeue(); + writeln("dequeueThread: After dequeue() [should not get here]"); + } + catch(TristanableException e) + { + writeln("Got tristanable exception during dequeue(): "~e.toString()); + + // TODO: Fliup boolean is all cgood and assret it later + failingException = e; + } + } + } + + DequeueThread dequeueThread = new DequeueThread(sixtyNine); + dequeueThread.start(); + + // Stop the manager + manager.stop(); + writeln("drop"); + + // Wait for the dequeueing thread to stop + dequeueThread.join(); + + // Check condition + assert(failingException !is null); + assert(failingException.getError() == ErrorType.MANAGER_SHUTDOWN); } \ No newline at end of file