Skip to content

Commit

Permalink
Replace the specialised thread pool
Browse files Browse the repository at this point in the history
- Use phobos' inbuilt thread pool (aka std.parallelism.TaskPool)
- Simplify the interface to the thread pool
- Allow arbitrary tasks to be send to the thread pool
  • Loading branch information
Moritz Maxeiner committed Jun 14, 2016
1 parent 0058e26 commit e222235
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 405 deletions.
81 changes: 37 additions & 44 deletions source/libasync/dns.d
Expand Up @@ -85,7 +85,7 @@ public:
return true;
}

return sendCommand();
return doOffThread({ process(this); });
}

/// Returns an OS-specific NetworkAddress structure from the specified IP.
Expand All @@ -98,47 +98,6 @@ public:
return (cast(EventLoop)m_evLoop).resolveIP(url, 0, ipv6?isIPv6.yes:isIPv6.no);
}

// chooses a thread or starts it if necessary
private bool sendCommand()
in { assert(!waiting, "File is busy or closed"); }
body {
waiting = true;
m_error = false;
status = StatusInfo.init;

Waiter cmd_handler;
try {
cmd_handler = popWaiter();
} catch (Throwable e) {
import std.stdio;
try {
status = StatusInfo(Status.ERROR, e.toString());
m_error = true;
} catch {}

return false;
}

if (cmd_handler is Waiter.init) {
m_cmdInfo.addr = cast(shared)((cast()m_evLoop).resolveHost(cmdInfo.url, 0, cmdInfo.ipv6?isIPv6.yes:isIPv6.no));
callback();
return true;
}
assert(cmd_handler.cond);
m_cmdInfo.waiter = cast(shared)cmd_handler;
try {
synchronized(gs_wlock)
gs_jobs.insert(CommandInfo(CmdInfoType.DNS, cast(void*) this));

cmd_handler.cond.notifyAll();
}
catch (Exception e){
import std.stdio;
try writeln("Exception occured notifying foreign thread: ", e); catch {}
}
return true;
}

/// Cleans up underlying resources. Used as a placeholder for possible future purposes.
bool kill() {
return true;
Expand All @@ -149,7 +108,6 @@ package:
return m_cmdInfo;
}


shared(NetworkAddress*) addr() {
try synchronized(m_cmdInfo.mtx)
return cast(shared)&m_cmdInfo.addr;
Expand Down Expand Up @@ -190,7 +148,6 @@ package shared struct DNSCmdInfo
bool ipv6;
string url;
NetworkAddress addr;
Waiter waiter;
AsyncSignal ready;
AsyncDNS dns;
Mutex mtx; // for NetworkAddress writing
Expand All @@ -206,3 +163,39 @@ package shared struct DNSReadyHandler {
return;
}
}

private void process(shared AsyncDNS ctxt) {
auto evLoop = getThreadEventLoop();

DNSCmdInfo cmdInfo = ctxt.cmdInfo();
auto mutex = cmdInfo.mtx;
DNSCmd cmd;
string url;
cmd = cmdInfo.command;
url = cmdInfo.url;

try final switch (cmd)
{
case DNSCmd.RESOLVEHOST:
*ctxt.addr = cast(shared) evLoop.resolveHost(url, 0, cmdInfo.ipv6 ? isIPv6.yes : isIPv6.no);
break;

case DNSCmd.RESOLVEIP:
*ctxt.addr = cast(shared) evLoop.resolveIP(url, 0, cmdInfo.ipv6 ? isIPv6.yes : isIPv6.no);
break;

} catch (Throwable e) {
auto status = StatusInfo.init;
status.code = Status.ERROR;
try status.text = e.toString(); catch {}
ctxt.status = status;
}

try cmdInfo.ready.trigger(evLoop);
catch (Throwable e) {
auto status = StatusInfo.init;
status.code = Status.ERROR;
try status.text = e.toString(); catch {}
ctxt.status = status;
}
}
111 changes: 61 additions & 50 deletions source/libasync/file.d
Expand Up @@ -10,7 +10,9 @@ import core.atomic;
import libasync.internals.path;
import libasync.threads;
import std.file;
import std.conv : to;
import libasync.internals.memory;
import libasync.internals.logging;

/// Runs all blocking file I/O commands in a thread pool and calls the handler
/// upon completion.
Expand Down Expand Up @@ -159,7 +161,8 @@ public:
filePath = Path(file_path);
} catch {}
offset = off;
return sendCommand();

return doOffThread({ process(this); });
}

/// Writes the data from the buffer into the file at the specified path starting at the
Expand Down Expand Up @@ -230,8 +233,8 @@ public:
filePath = Path(file_path);
} catch {}
offset = off;
return sendCommand();

return doOffThread({ process(this); });
}

/// Appends the data from the buffer into a file at the specified path.
Expand Down Expand Up @@ -296,47 +299,10 @@ public:
m_cmdInfo.command = FileCmd.APPEND;
filePath = Path(file_path);
} catch {}
return sendCommand();
}

private bool sendCommand()
in { assert(!waiting, "File is busy or closed"); }
body {
waiting = true;
m_error = false;
status = StatusInfo.init;

Waiter cmd_handler;

try {
cmd_handler = popWaiter();

} catch (Throwable e) {
import std.stdio;
try {
status = StatusInfo(Status.ERROR, e.toString());
m_error = true;
} catch {}

return false;
}

assert(cmd_handler.cond, "Could not lock a thread for async operations. Note: Async file I/O in static constructors is unsupported.");

m_cmdInfo.waiter = cast(shared) cmd_handler;
try {
synchronized(gs_wlock)
gs_jobs.insert(CommandInfo(CmdInfoType.FILE, cast(void*) this));
cmd_handler.cond.notifyAll();
}
catch (Exception e){
static if (DEBUG) {
import std.stdio;
try writeln("Exception occured notifying foreign thread: ", e); catch {}
}
}
return true;
return doOffThread({ process(this); });
}

package:

synchronized @property FileCmdInfo cmdInfo() {
Expand All @@ -363,10 +329,7 @@ package:
synchronized @property void file(ref File f) {
try (cast()*m_file).opAssign(f);
catch (Exception e) {
static if (DEBUG) {
import std.stdio : writeln;
try writeln(e.msg); catch {}
}
trace(e.msg);
}
}

Expand All @@ -385,10 +348,7 @@ package:
void handler() {
try m_handler();
catch (Throwable e) {
static if (DEBUG) {
import std.stdio : writeln;
try writeln("Failed to send command. ", e.toString()); catch {}
}
trace("Failed to send command. ", e.toString());
}
}
}
Expand All @@ -404,7 +364,6 @@ package shared struct FileCmdInfo
FileCmd command;
Path filePath;
ubyte[] buffer;
Waiter waiter;
AsyncSignal ready;
AsyncFile file;
Mutex mtx; // for buffer writing
Expand All @@ -421,3 +380,55 @@ package shared struct FileReadyHandler {
return;
}
}

private void process(shared AsyncFile ctxt) {
auto cmdInfo = ctxt.cmdInfo;
auto mutex = cmdInfo.mtx;
FileCmd cmd;
cmd = cmdInfo.command;

try final switch (cmd)
{
case FileCmd.READ:
File file = ctxt.file;
if (ctxt.offset != -1)
file.seek(cast(long) ctxt.offset);
ubyte[] res;
synchronized(mutex) res = file.rawRead(cast(ubyte[])ctxt.buffer);
if (res)
ctxt.offset = cast(ulong) (ctxt.offset + res.length);

break;

case FileCmd.WRITE:

File file = cast(File) ctxt.file;
if (ctxt.offset != -1)
file.seek(cast(long) ctxt.offset);
synchronized(mutex) file.rawWrite(cast(ubyte[]) ctxt.buffer);
file.flush();
ctxt.offset = cast(ulong) (ctxt.offset + ctxt.buffer.length);
break;

case FileCmd.APPEND:
File file = cast(File) ctxt.file;
synchronized(mutex) file.rawWrite(cast(ubyte[]) ctxt.buffer);
ctxt.offset = cast(ulong) file.size();
file.flush();
break;
} catch (Throwable e) {
auto status = StatusInfo.init;
status.code = Status.ERROR;
try status.text = "Error in " ~ cmd.to!string ~ ", " ~ e.toString(); catch {}
ctxt.status = status;
}

try cmdInfo.ready.trigger(getThreadEventLoop());
catch (Throwable e) {
trace("AsyncFile Thread Error: ", e.toString());
auto status = StatusInfo.init;
status.code = Status.ERROR;
try status.text = e.toString(); catch {}
ctxt.status = status;
}
}

0 comments on commit e222235

Please sign in to comment.