Skip to content

Commit

Permalink
Merge 7554703 into 4bee62e
Browse files Browse the repository at this point in the history
  • Loading branch information
deavmi committed Nov 26, 2023
2 parents 4bee62e + 7554703 commit 2afdda9
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 5 deletions.
11 changes: 11 additions & 0 deletions source/tristanable/exceptions.d
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,17 @@ import std.conv : to;
*/
public enum ErrorType
{
/**
* Unset
*/
UNSET,

/**
* If the manager has already
* been shutdown
*/
MANAGER_SHUTDOWN,

/**
* If the requested queue could not be found
*/
Expand Down
26 changes: 26 additions & 0 deletions source/tristanable/manager/manager.d
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,33 @@ public class Manager
*/
public void stop()
{
/* Stop the watcher */
watcher.shutdown();

/* Unblock all `dequeue()` calls */
shutdownAllQueues();
}

/**
* Shuts down all registered queues
*/
protected void shutdownAllQueues()
{
/* Lock the queue of queues */
queuesLock.lock();

/* On return or error */
scope(exit)
{
/* Unlock the queue of queues */
queuesLock.unlock();
}

/* Shutdown each queue */
foreach(Queue queue; this.queues)
{
queue.shutdownQueue(ErrorType.MANAGER_SHUTDOWN);
}
}

/**
Expand Down
144 changes: 140 additions & 4 deletions source/tristanable/manager/watcher.d
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ public class Watcher : Thread
break;
}
}

// TODO: Unblock all `dequeue()`'s here
// TODO: Get a reason for exiting (either cause of error OR shutdoiwn (see below (which in turn is called by the Manager)))
version(unittest) { writeln("Exited watcher loop"); }
// this.manager.stop();
}

/**
Expand All @@ -141,17 +146,20 @@ public class Watcher : Thread
}
}

version(unittest)
{
import std.socket;
import std.stdio;
import core.thread;
}

/**
* Set up a server which will send some tagged messages to us (the client),
* where we have setup a `Manager` to watch the queues with tags `42` and `69`,
* we then dequeue some messages from both queus. Finally, we shut down the manager.
*/
unittest
{
import std.socket;
import std.stdio;
import core.thread;

Address serverAddress = parseAddress("::1", 0);
Socket server = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
server.bind(serverAddress);
Expand Down Expand Up @@ -294,4 +302,132 @@ unittest

/* Stop the manager */
manager.stop();
}

/**
* Setup a server which dies (kills its connection to us)
* midway whilst we are doing a `dequeue()`
*
* This is to test the exception triggering mechanism
* for such a case
*/
unittest
{

}

/**
* Setup a `Manager` and then block on a `dequeue()`
* but from another thread shutdown the `Manager`.
*
* This is to test the exception triggering mechanism
* for such a case
*/
unittest
{
writeln("<<<<< Test 4 start >>>>>");

Address serverAddress = parseAddress("::1", 0);
Socket server = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
server.bind(serverAddress);
server.listen(0);

class ServerThread : Thread
{
this()
{
super(&worker);
}

private void worker()
{
Socket clientSocket = server.accept();
BClient bClient = new BClient(clientSocket);

Thread.sleep(dur!("seconds")(7));
writeln("Server start");

/**
* Create a tagged message to send
*
* tag 42 payload Cucumber 😳️
*/
TaggedMessage message = new TaggedMessage(42, cast(byte[])"Cucumber 😳️");
byte[] tEncoded = message.encode();
writeln("server send status: ", bClient.sendMessage(tEncoded));

writeln("server send [done]");

sleep(dur!("seconds")(15));

writeln("Server ending");
}
}

ServerThread serverThread = new ServerThread();
serverThread.start();

Socket client = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);

writeln(server.localAddress);


Manager manager = new Manager(client);

Queue sixtyNine = new Queue(69);

manager.registerQueue(sixtyNine);


/* Connect our socket to the server */
client.connect(server.localAddress);

/* Start the manager and let it manage the socket */
manager.start();


// The failing exception
TristanableException failingException;

class DequeueThread : Thread
{
private Queue testQueue;

this(Queue testQueue)
{
super(&worker);
this.testQueue = testQueue;
}

public void worker()
{
try
{
writeln("dequeuThread: Before dequeue()");
this.testQueue.dequeue();
writeln("dequeueThread: After dequeue() [should not get here]");
}
catch(TristanableException e)
{
writeln("Got tristanable exception during dequeue(): "~e.toString());

// TODO: Fliup boolean is all cgood and assret it later
failingException = e;
}
}
}

DequeueThread dequeueThread = new DequeueThread(sixtyNine);
dequeueThread.start();

// Stop the manager
manager.stop();
writeln("drop");

// Wait for the dequeueing thread to stop
dequeueThread.join();

// Check condition
assert(failingException !is null);
assert(failingException.getError() == ErrorType.MANAGER_SHUTDOWN);
}
37 changes: 36 additions & 1 deletion source/tristanable/queue/queue.d
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ public class Queue
*/
private Duration wakeInterval;

/**
* Reason for a `dequeue()`
* to have failed
*/
private ErrorType exitReason;
private bool alive;

/**
* Constructs a new Queue and immediately sets up the notification
* sub-system for the calling thread (the thread constructing this
Expand All @@ -85,6 +92,9 @@ public class Queue

/* Set the slumber interval */
this.wakeInterval = dur!("msecs")(50); // TODO: Decide on value

/* Set status to alive */
this.alive = true;
}

/**
Expand Down Expand Up @@ -157,6 +167,25 @@ public class Queue
}
}


public void shutdownQueue(ErrorType reason)
{
// Set running state and reason
this.alive = false;
this.exitReason = reason;

// Wakeup sleeping dequeue()

// Lock the mutex
this.mutex.lock();

// Awake all condition variable sleepers
this.signal.notifyAll();

// Unlock the mutex
this.mutex.unlock();
}

// TODO: Make a version of this which can time out

/**
Expand Down Expand Up @@ -188,6 +217,13 @@ public class Queue
/* Block till we dequeue a message successfully */
while(dequeuedMessage is null)
{
/* Check if this queue is still alive */
if(!this.alive)
{
// Throw an exception to unblock the calling `dequeue()`
throw new TristanableException(this.exitReason);
}

scope(exit)
{
// Unlock the mutex
Expand All @@ -207,7 +243,6 @@ public class Queue
throw new TristanableException(ErrorType.DEQUEUE_FAILED);
}


/* Lock the item queue */
queueLock.lock();

Expand Down

0 comments on commit 2afdda9

Please sign in to comment.