Skip to content

Commit

Permalink
Queue
Browse files Browse the repository at this point in the history
- Switched to using condition variable
- Added configurable slumber interval
  • Loading branch information
deavmi committed Oct 1, 2023
1 parent e13e7c3 commit f35441b
Showing 1 changed file with 52 additions and 38 deletions.
90 changes: 52 additions & 38 deletions source/tristanable/queue.d
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import core.sync.condition : Condition;
import core.sync.exception : SyncError;
import std.container.slist : SList;
import tristanable.encoding;
import core.thread : dur;
import core.time : Duration, dur;
import tristanable.exceptions;

version(unittest)
Expand Down Expand Up @@ -50,6 +50,15 @@ public class Queue
*/
private ulong queueID;

/**
* If a message is enqueued prior
* to us sleeping then we won't
* wake up and return for it.
*
* Therefore a periodic wakeup
* is required.
*/
private Duration wakeInterval;

/**
* Constructs a new Queue and immediately sets up the notification
Expand All @@ -71,6 +80,31 @@ public class Queue

/* Set the queue id */
this.queueID = queueID;

/* Set the slumber interval */
this.wakeInterval = dur!("msecs")(50); // TODO: Decide on value
}

/**
* Returns the current wake interval
* for the queue checker
*
* Returns: the `Duration`
*/
public Duration getWakeInterval()
{
return this.wakeInterval;
}

/**
* Sets the wake up interval
*
* Params:
* interval = the new interval
*/
public void setWakeInterval(Duration interval)
{
this.wakeInterval = interval;
}

/**
Expand Down Expand Up @@ -152,45 +186,25 @@ public class Queue
/* Block till we dequeue a message successfully */
while(dequeuedMessage is null)
{
/**
* Call `wait()` and catch any interrupts
* in which case loop back and call `wait()`
* again
*/
while(true)
scope(exit)
{
// Unlock the mutex
this.mutex.unlock();
}

// Lock the mutex
this.mutex.lock();

try
{
try
{
// TODO: Make us wait on the event (optional with a time-out)
event.wait();
}
catch(InterruptedException e)
{
version(unittest)
{
import std.stdio;
writeln("dequeue() had libsnooze wait() get interrupted!");
}

// Retry the wait()
continue;
}
catch(FatalException fatalErr)
{
version(unittest)
{
import std.stdio;
writeln("dequeue() had libsnooze wait() get FATALLY fail! Exception will now throw...");
}

// Throw an exception on a fatal exception
throw new TristanableException(ErrorType.DEQUEUE_FAILED);
}

// On successful wait() wake-up exit this wait()-retry loop
break;
this.signal.wait(this.wakeInterval);
}

catch(SyncError e)
{
// Throw an exception on a fatal exception
throw new TristanableException(ErrorType.DEQUEUE_FAILED);
}


/* Lock the item queue */
queueLock.lock();
Expand Down

0 comments on commit f35441b

Please sign in to comment.