Skip to content

Commit

Permalink
Merge 87e376d into fb4cc5b
Browse files Browse the repository at this point in the history
  • Loading branch information
deavmi authored Nov 18, 2023
2 parents fb4cc5b + 87e376d commit 8f441f5
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 54 deletions.
3 changes: 0 additions & 3 deletions dub.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@
"Tristan B. Velloza Kildaire"
],
"copyright": "Copyright © 2023, Tristan B. Kildaire",
"dependencies": {
"libsnooze": ">=1.2.0-beta"
},
"description": "Executor framework with future-based task submission and pluggable thread execution engines",
"homepage": "https://deavmi.assigned.network",
"license": "LGPL 3.0",
Expand Down
83 changes: 57 additions & 26 deletions source/guillotine/future.d
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
*/
module guillotine.future;

// TODO: Examine the below import which seemingly fixes stuff for libsnooze
import libsnooze.clib;
import libsnooze;
import core.sync.mutex : Mutex;
import core.sync.condition : Condition;
import core.sync.exception : SyncError;
import core.time : dur;

import guillotine.result : Result;

Expand All @@ -26,9 +27,16 @@ public enum State
public final class Future
{
/**
* `libsnooze` event
* Mutex for condition
* variable
*/
private Event event;
private Mutex mutex;

/**
* Condition variable
* used for signalling
*/
private Condition signal;

/**
* State of the future
Expand All @@ -50,7 +58,8 @@ public final class Future
*/
public this()
{
this.event = new Event();
this.mutex = new Mutex();
this.signal = new Condition(this.mutex);
}

/**
Expand Down Expand Up @@ -84,24 +93,7 @@ public final class Future
// calls await() can have it pickup on the NOT_STARTED case)
else
{
bool doneYet = false;
while(!doneYet)
{
try
{
event.wait();
doneYet = true;
}
catch(InterruptedException e)
{
// Do nothing
}
catch(FatalException e)
{
// TODO: Throw a FatalGuillaotine here
// TODO: make a custom guillotine exception
}
}
doWait();

// If we had an error then throw it
if(this.state == State.ERRORED)
Expand All @@ -117,6 +109,43 @@ public final class Future
}
}

/**
* Looped condition variable wait
* which checks state on entry
* just incase, notification
* was set prior to entry.
*
* Else, we to a timed-wait,
* because between the loop-condition
* check and the wait call it may
* have been notified and we should
* wake up and check again.
*
* But we don't want to spin,
* so we also can potentially
* sleep till notified.
*/
private void doWait()
{
// Lock mutex
this.mutex.lock();

scope(exit)
{
// Unlock mutex
this.mutex.unlock();
}

while(this.state != State.ERRORED && this.state != State.FINISHED)
{
// TODO: Add syncerror checking?
this.signal.wait(dur!("msecs")(400));

import std.stdio;
writeln("Awake");
}
}

/**
* Sets this `Future` as completed by storing the
* provided result into it and waking up anybody
Expand All @@ -137,7 +166,9 @@ public final class Future
this.state = State.FINISHED;

// Wake up any sleepers
this.event.notifyAll();
this.signal.notifyAll();
import std.stdio;
writeln("Sent notify");
}

/**
Expand All @@ -158,6 +189,6 @@ public final class Future
this.state = State.ERRORED;

// Wake up any sleepers
this.event.notifyAll();
this.signal.notifyAll();
}
}
89 changes: 64 additions & 25 deletions source/guillotine/providers/sequential.d
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
*/
module guillotine.providers.sequential;

import libsnooze;
import guillotine.provider;
import std.container.slist;
import std.range : walkLength;
import core.sync.mutex : Mutex;
import core.sync.condition : Condition;
import core.sync.exception : SyncError;
import core.time : Duration, dur;

import core.thread : Thread;
import guillotine.exceptions;

Expand All @@ -22,21 +25,57 @@ version(unittest)
*/
public final class Sequential : Provider
{
private Event event;
private Mutex mutex;
private Condition event;
private SList!(Task) taskQueue;
private Mutex taskQueueLock;
private Thread runner;
private bool running;
private Duration wakeInterval;

/**
* Constricts a new `Sequential` provider
*/
this()
{
this.event = new Event();
this.mutex = new Mutex();
this.event = new Condition(this.mutex);
this.taskQueueLock = new Mutex();
this.runner = new Thread(&worker);
this.event.ensure(runner);

// TODO: Choose a sane efault
this.wakeInterval = dur!("msecs")(10);
}

/**
* Sets the interval at which the runner
* must forcefully wakeup from slumber
* and check for any new jobs - in
* the case one was submitted prior
* to sleeping.
*
* Note that must of the time slumber
* will stopped if a task is submitted
* WHILST we are ALREADY sleeping. This
* is to aid in the times where that is
* NOT the case
*
* Params:
* interval = the `Duration`
*/
public void setWakeInterval(Duration interval)
{
this.wakeInterval = interval;
}

/**
* Returns the slumber interval
*
* Returns: the `Duration`
*/
public Duration getWakeInterval()
{
return this.wakeInterval;
}

/**
Expand All @@ -62,10 +101,8 @@ public final class Sequential : Provider
// Unlock the queue
taskQueueLock.unlock();

// Wake up the runner (just using all to avoid a catch for exception
// ... which would occur if wait() hasn't been called atleast once
// ... in `runner`
event.notifyAll();
// Wake up the runner
event.notify();

version(unittest)
{
Expand All @@ -92,8 +129,20 @@ public final class Sequential : Provider
{
try
{
// Lock mutex
this.mutex.lock();

// Sleep till awoken for an enqueue
event.wait();
import std.stdio;
writeln("Worker wait...");
bool b = event.wait(this.wakeInterval);
writeln("Worker wait... [done] ", b);

// TODO: Add syncerror checking?

// Unlock mutex
this.mutex.unlock();


// Check if we are running, if not, exit
if(!running)
Expand Down Expand Up @@ -127,15 +176,14 @@ public final class Sequential : Provider
}

}
catch(InterruptedException e)
catch(SyncError e)
{
// TODO: What to do?
// Handle by doing nothing, retry wait()
import std.stdio;
writeln("SyncError: ", e);
continue;
}
catch(SnoozeError e)
{
// TODO: Stop and handle this
}
}
}

Expand All @@ -158,19 +206,10 @@ public final class Sequential : Provider
// Set running flag to false
this.running = false;

try
{
// Notify the sleeping worker to wake up
this.event.notify(runner);
}
catch(SnoozeError e)
{
throw new GuillotineException("Error notifying() sleeping worker in stop()");
}
// Notify the sleeping worker to wake up
this.event.notify();

// Wait for the runner thread to fully exit
this.runner.join();

// TODO: Destroy the libsnooze event here
}
}

0 comments on commit 8f441f5

Please sign in to comment.