From 569b0297ff2577f1a705b5cdae80e9dfc81594e8 Mon Sep 17 00:00:00 2001 From: "Tristan B. Velloza Kildaire" Date: Sun, 26 Nov 2023 14:32:01 +0200 Subject: [PATCH 1/7] Manager - Added a TODO for `stop()` --- source/tristanable/manager/manager.d | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/tristanable/manager/manager.d b/source/tristanable/manager/manager.d index 014e9db..4c01341 100644 --- a/source/tristanable/manager/manager.d +++ b/source/tristanable/manager/manager.d @@ -97,6 +97,8 @@ public class Manager public void stop() { watcher.shutdown(); + + // TODO: Unblock ALL queues here } /** From 6c9df711f855532a91cf5b508b4b56b5d1b5a999 Mon Sep 17 00:00:00 2001 From: "Tristan B. Velloza Kildaire" Date: Sun, 26 Nov 2023 14:34:31 +0200 Subject: [PATCH 2/7] Watcher - Added TODO where we need to handling the unblocking of all `dequeue()` calls --- source/tristanable/manager/watcher.d | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/tristanable/manager/watcher.d b/source/tristanable/manager/watcher.d index f07518f..b86af6d 100644 --- a/source/tristanable/manager/watcher.d +++ b/source/tristanable/manager/watcher.d @@ -128,6 +128,8 @@ public class Watcher : Thread break; } } + + // TODO: Unblock all `dequeue()`'s here } /** From 198cb5234200fff93581a25679da136a343ca9b9 Mon Sep 17 00:00:00 2001 From: "Tristan B. Velloza Kildaire" Date: Sun, 26 Nov 2023 18:51:38 +0200 Subject: [PATCH 3/7] ErrorType - Added enum member `UNSET` --- source/tristanable/exceptions.d | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/source/tristanable/exceptions.d b/source/tristanable/exceptions.d index e8d5208..f214641 100644 --- a/source/tristanable/exceptions.d +++ b/source/tristanable/exceptions.d @@ -10,6 +10,11 @@ import std.conv : to; */ public enum ErrorType { + /** + * Unset + */ + UNSET, + /** * If the requested queue could not be found */ From 713c102da5d468afc6781d5cd4762326c048bc12 Mon Sep 17 00:00:00 2001 From: "Tristan B. Velloza Kildaire" Date: Sun, 26 Nov 2023 18:58:53 +0200 Subject: [PATCH 4/7] Queue - Added an `exitReason` and an `alive` (set to `true` on construction) - Calling `shutdownQueue(ErrorType)` will set the exit reason, will also set the aliveness to `false` and wake up ALL `dequeue()`'s blocking - `dequeue()` first check in wakeup routine duty cycle is to check if we are alive --- source/tristanable/queue/queue.d | 37 +++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/source/tristanable/queue/queue.d b/source/tristanable/queue/queue.d index 8db3aa8..c170646 100644 --- a/source/tristanable/queue/queue.d +++ b/source/tristanable/queue/queue.d @@ -62,6 +62,13 @@ public class Queue */ private Duration wakeInterval; + /** + * Reason for a `dequeue()` + * to have failed + */ + private ErrorType exitReason; + private bool alive; + /** * Constructs a new Queue and immediately sets up the notification * sub-system for the calling thread (the thread constructing this @@ -85,6 +92,9 @@ public class Queue /* Set the slumber interval */ this.wakeInterval = dur!("msecs")(50); // TODO: Decide on value + + /* Set status to alive */ + this.alive = true; } /** @@ -157,6 +167,25 @@ public class Queue } } + + public void shutdownQueue(ErrorType reason) + { + // Set running state and reason + this.alive = false; + this.exitReason = reason; + + // Wakeup sleeping dequeue() + + // Lock the mutex + this.mutex.lock(); + + // Awake all condition variable sleepers + this.signal.notifyAll(); + + // Unlock the mutex + this.mutex.unlock(); + } + // TODO: Make a version of this which can time out /** @@ -188,6 +217,13 @@ public class Queue /* Block till we dequeue a message successfully */ while(dequeuedMessage is null) { + /* Check if this queue is still alive */ + if(!this.alive) + { + // Throw an exception to unblock the calling `dequeue()` + throw new TristanableException(this.exitReason); + } + scope(exit) { // Unlock the mutex @@ -207,7 +243,6 @@ public class Queue throw new TristanableException(ErrorType.DEQUEUE_FAILED); } - /* Lock the item queue */ queueLock.lock(); From 08757f27f2c1e71870459cc49a2001e7b881b2f1 Mon Sep 17 00:00:00 2001 From: "Tristan B. Velloza Kildaire" Date: Sun, 26 Nov 2023 19:00:56 +0200 Subject: [PATCH 5/7] ErrorType - Added new member `MANAGER_SHUTDOWN` --- source/tristanable/exceptions.d | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/source/tristanable/exceptions.d b/source/tristanable/exceptions.d index f214641..fd0a840 100644 --- a/source/tristanable/exceptions.d +++ b/source/tristanable/exceptions.d @@ -15,6 +15,12 @@ public enum ErrorType */ UNSET, + /** + * If the manager has already + * been shutdown + */ + MANAGER_SHUTDOWN, + /** * If the requested queue could not be found */ From 31f7b6355f97487f499db3e6fb9ec2d71b0b7a6f Mon Sep 17 00:00:00 2001 From: "Tristan B. Velloza Kildaire" Date: Sun, 26 Nov 2023 19:09:46 +0200 Subject: [PATCH 6/7] Manager - Implemented `shutdownAllQueues()` - Calling `stop()` now shuts down all queues --- source/tristanable/manager/manager.d | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/source/tristanable/manager/manager.d b/source/tristanable/manager/manager.d index 4c01341..6ed2bad 100644 --- a/source/tristanable/manager/manager.d +++ b/source/tristanable/manager/manager.d @@ -96,9 +96,33 @@ public class Manager */ public void stop() { + /* Stop the watcher */ watcher.shutdown(); - // TODO: Unblock ALL queues here + /* Unblock all `dequeue()` calls */ + shutdownAllQueues(); + } + + /** + * Shuts down all registered queues + */ + protected void shutdownAllQueues() + { + /* Lock the queue of queues */ + queuesLock.lock(); + + /* On return or error */ + scope(exit) + { + /* Unlock the queue of queues */ + queuesLock.unlock(); + } + + /* Shutdown each queue */ + foreach(Queue queue; this.queues) + { + queue.shutdownQueue(ErrorType.MANAGER_SHUTDOWN); + } } /** From 169f47dd8d93f100c25edd39d97c1aa50e2f9059 Mon Sep 17 00:00:00 2001 From: "Tristan B. Velloza Kildaire" Date: Sun, 26 Nov 2023 19:11:25 +0200 Subject: [PATCH 7/7] Watcher - Hoisted common imports into a `version(unittest)` Watcher (unittests) - Added a unittest for testing `stop()` to unblock `dequeue()`s --- source/tristanable/manager/watcher.d | 142 ++++++++++++++++++++++++++- 1 file changed, 138 insertions(+), 4 deletions(-) diff --git a/source/tristanable/manager/watcher.d b/source/tristanable/manager/watcher.d index b86af6d..6501826 100644 --- a/source/tristanable/manager/watcher.d +++ b/source/tristanable/manager/watcher.d @@ -130,6 +130,9 @@ public class Watcher : Thread } // TODO: Unblock all `dequeue()`'s here + // TODO: Get a reason for exiting (either cause of error OR shutdoiwn (see below (which in turn is called by the Manager))) + version(unittest) { writeln("Exited watcher loop"); } + this.manager.stop(); } /** @@ -143,6 +146,13 @@ public class Watcher : Thread } } +version(unittest) +{ + import std.socket; + import std.stdio; + import core.thread; +} + /** * Set up a server which will send some tagged messages to us (the client), * where we have setup a `Manager` to watch the queues with tags `42` and `69`, @@ -150,10 +160,6 @@ public class Watcher : Thread */ unittest { - import std.socket; - import std.stdio; - import core.thread; - Address serverAddress = parseAddress("::1", 0); Socket server = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP); server.bind(serverAddress); @@ -296,4 +302,132 @@ unittest /* Stop the manager */ manager.stop(); +} + +/** + * Setup a server which dies (kills its connection to us) + * midway whilst we are doing a `dequeue()` + * + * This is to test the exception triggering mechanism + * for such a case + */ +unittest +{ + +} + +/** + * Setup a `Manager` and then block on a `dequeue()` + * but from another thread shutdown the `Manager`. + * + * This is to test the exception triggering mechanism + * for such a case + */ +unittest +{ + writeln("<<<<< Test 4 start >>>>>"); + + Address serverAddress = parseAddress("::1", 0); + Socket server = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP); + server.bind(serverAddress); + server.listen(0); + + class ServerThread : Thread + { + this() + { + super(&worker); + } + + private void worker() + { + Socket clientSocket = server.accept(); + BClient bClient = new BClient(clientSocket); + + Thread.sleep(dur!("seconds")(7)); + writeln("Server start"); + + /** + * Create a tagged message to send + * + * tag 42 payload Cucumber 😳️ + */ + TaggedMessage message = new TaggedMessage(42, cast(byte[])"Cucumber 😳️"); + byte[] tEncoded = message.encode(); + writeln("server send status: ", bClient.sendMessage(tEncoded)); + + writeln("server send [done]"); + + sleep(dur!("seconds")(15)); + + writeln("Server ending"); + } + } + + ServerThread serverThread = new ServerThread(); + serverThread.start(); + + Socket client = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP); + + writeln(server.localAddress); + + + Manager manager = new Manager(client); + + Queue sixtyNine = new Queue(69); + + manager.registerQueue(sixtyNine); + + + /* Connect our socket to the server */ + client.connect(server.localAddress); + + /* Start the manager and let it manage the socket */ + manager.start(); + + + // The failing exception + TristanableException failingException; + + class DequeueThread : Thread + { + private Queue testQueue; + + this(Queue testQueue) + { + super(&worker); + this.testQueue = testQueue; + } + + public void worker() + { + try + { + writeln("dequeuThread: Before dequeue()"); + this.testQueue.dequeue(); + writeln("dequeueThread: After dequeue() [should not get here]"); + } + catch(TristanableException e) + { + writeln("Got tristanable exception during dequeue(): "~e.toString()); + + // TODO: Fliup boolean is all cgood and assret it later + failingException = e; + } + } + } + + DequeueThread dequeueThread = new DequeueThread(sixtyNine); + dequeueThread.start(); + + // Stop the manager + manager.stop(); + writeln("drop"); + + // Wait for the dequeueing thread to stop + dequeueThread.join(); + + // Check condition + assert(failingException !is null); + assert(failingException.getError() == ErrorType.MANAGER_SHUTDOWN); } \ No newline at end of file