Skip to content

Commit

Permalink
Watcher
Browse files Browse the repository at this point in the history
- When we exit the read loop (upon an error) call `stop_FailedWatcher()` on the associated `Manager`

Watcher (unittests)

- Added a unit test for the above test case
  • Loading branch information
deavmi committed Nov 26, 2023
1 parent c5a6612 commit 8ac7a54
Showing 1 changed file with 110 additions and 15 deletions.
125 changes: 110 additions & 15 deletions source/tristanable/manager/watcher.d
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,13 @@ public class Watcher : Thread
}
}

// TODO: Unblock all `dequeue()`'s here
// TODO: Get a reason for exiting (either cause of error OR shutdoiwn (see below (which in turn is called by the Manager)))
version(unittest) { writeln("Exited watcher loop"); }
// this.manager.stop();

// NOTE: This will also be run on normal user-initiated `stop()`
// ... but will just try shutdown an alreayd shutdown manager
// ... again and try shut our already-closed river stream
// Shutdown and unblock all `dequeue()` calls
this.manager.stop_FailedWatcher();
}

/**
Expand Down Expand Up @@ -312,18 +315,6 @@ unittest
* for such a case
*/
unittest
{

}

/**
* Setup a `Manager` and then block on a `dequeue()`
* but from another thread shutdown the `Manager`.
*
* This is to test the exception triggering mechanism
* for such a case
*/
unittest
{
writeln("<<<<< Test 4 start >>>>>");

Expand Down Expand Up @@ -430,4 +421,108 @@ unittest
// Check condition
assert(failingException !is null);
assert(failingException.getError() == ErrorType.MANAGER_SHUTDOWN);
}

/**
* Setup a `Manager` and then block on a `dequeue()`
* but from another thread shutdown the `Manager`.
*
* This is to test the exception triggering mechanism
* for such a case
*/
unittest
{
writeln("<<<<< Test 3 start >>>>>");

Address serverAddress = parseAddress("::1", 0);
Socket server = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
server.bind(serverAddress);
server.listen(0);

class ServerThread : Thread
{
this()
{
super(&worker);
}

private void worker()
{
Socket clientSocket = server.accept();
BClient bClient = new BClient(clientSocket);

Thread.sleep(dur!("seconds")(7));
writeln("Server start");

sleep(dur!("seconds")(15));

writeln("Server ending");

// Close the connection
bClient.close();
}
}

ServerThread serverThread = new ServerThread();
serverThread.start();

Socket client = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);

writeln(server.localAddress);


Manager manager = new Manager(client);

Queue sixtyNine = new Queue(69);

manager.registerQueue(sixtyNine);


/* Connect our socket to the server */
client.connect(server.localAddress);

/* Start the manager and let it manage the socket */
manager.start();


// The failing exception
TristanableException failingException;

class DequeueThread : Thread
{
private Queue testQueue;

this(Queue testQueue)
{
super(&worker);
this.testQueue = testQueue;
}

public void worker()
{
try
{
writeln("dequeuThread: Before dequeue()");
this.testQueue.dequeue();
writeln("dequeueThread: After dequeue() [should not get here]");
}
catch(TristanableException e)
{
writeln("Got tristanable exception during dequeue(): "~e.toString());

// TODO: Fliup boolean is all cgood and assret it later
failingException = e;
}
}
}

DequeueThread dequeueThread = new DequeueThread(sixtyNine);
dequeueThread.start();

// Wait for the dequeueing thread to stop
dequeueThread.join();

// Check condition
assert(failingException !is null);
assert(failingException.getError() == ErrorType.WATCHER_FAILED);
}

0 comments on commit 8ac7a54

Please sign in to comment.