Skip to content

Commit

Permalink
Merge 08757f2 into 4bee62e
Browse files Browse the repository at this point in the history
  • Loading branch information
deavmi committed Nov 26, 2023
2 parents 4bee62e + 08757f2 commit 7a1f17c
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 1 deletion.
11 changes: 11 additions & 0 deletions source/tristanable/exceptions.d
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,17 @@ import std.conv : to;
*/
public enum ErrorType
{
/**
* Unset
*/
UNSET,

/**
* If the manager has already
* been shutdown
*/
MANAGER_SHUTDOWN,

/**
* If the requested queue could not be found
*/
Expand Down
2 changes: 2 additions & 0 deletions source/tristanable/manager/manager.d
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ public class Manager
public void stop()
{
watcher.shutdown();

// TODO: Unblock ALL queues here
}

/**
Expand Down
2 changes: 2 additions & 0 deletions source/tristanable/manager/watcher.d
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ public class Watcher : Thread
break;
}
}

// TODO: Unblock all `dequeue()`'s here
}

/**
Expand Down
37 changes: 36 additions & 1 deletion source/tristanable/queue/queue.d
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ public class Queue
*/
private Duration wakeInterval;

/**
* Reason for a `dequeue()`
* to have failed
*/
private ErrorType exitReason;
private bool alive;

/**
* Constructs a new Queue and immediately sets up the notification
* sub-system for the calling thread (the thread constructing this
Expand All @@ -85,6 +92,9 @@ public class Queue

/* Set the slumber interval */
this.wakeInterval = dur!("msecs")(50); // TODO: Decide on value

/* Set status to alive */
this.alive = true;
}

/**
Expand Down Expand Up @@ -157,6 +167,25 @@ public class Queue
}
}


public void shutdownQueue(ErrorType reason)
{
// Set running state and reason
this.alive = false;
this.exitReason = reason;

// Wakeup sleeping dequeue()

// Lock the mutex
this.mutex.lock();

// Awake all condition variable sleepers
this.signal.notifyAll();

// Unlock the mutex
this.mutex.unlock();
}

// TODO: Make a version of this which can time out

/**
Expand Down Expand Up @@ -188,6 +217,13 @@ public class Queue
/* Block till we dequeue a message successfully */
while(dequeuedMessage is null)
{
/* Check if this queue is still alive */
if(!this.alive)
{
// Throw an exception to unblock the calling `dequeue()`
throw new TristanableException(this.exitReason);
}

scope(exit)
{
// Unlock the mutex
Expand All @@ -207,7 +243,6 @@ public class Queue
throw new TristanableException(ErrorType.DEQUEUE_FAILED);
}


/* Lock the item queue */
queueLock.lock();

Expand Down

0 comments on commit 7a1f17c

Please sign in to comment.