diff --git a/source/libasync/dns.d b/source/libasync/dns.d index 85adc08..295bb82 100644 --- a/source/libasync/dns.d +++ b/source/libasync/dns.d @@ -85,7 +85,7 @@ public: return true; } - return sendCommand(); + return doOffThread({ process(this); }); } /// Returns an OS-specific NetworkAddress structure from the specified IP. @@ -98,47 +98,6 @@ public: return (cast(EventLoop)m_evLoop).resolveIP(url, 0, ipv6?isIPv6.yes:isIPv6.no); } - // chooses a thread or starts it if necessary - private bool sendCommand() - in { assert(!waiting, "File is busy or closed"); } - body { - waiting = true; - m_error = false; - status = StatusInfo.init; - - Waiter cmd_handler; - try { - cmd_handler = popWaiter(); - } catch (Throwable e) { - import std.stdio; - try { - status = StatusInfo(Status.ERROR, e.toString()); - m_error = true; - } catch {} - - return false; - } - - if (cmd_handler is Waiter.init) { - m_cmdInfo.addr = cast(shared)((cast()m_evLoop).resolveHost(cmdInfo.url, 0, cmdInfo.ipv6?isIPv6.yes:isIPv6.no)); - callback(); - return true; - } - assert(cmd_handler.cond); - m_cmdInfo.waiter = cast(shared)cmd_handler; - try { - synchronized(gs_wlock) - gs_jobs.insert(CommandInfo(CmdInfoType.DNS, cast(void*) this)); - - cmd_handler.cond.notifyAll(); - } - catch (Exception e){ - import std.stdio; - try writeln("Exception occured notifying foreign thread: ", e); catch {} - } - return true; - } - /// Cleans up underlying resources. Used as a placeholder for possible future purposes. bool kill() { return true; @@ -149,7 +108,6 @@ package: return m_cmdInfo; } - shared(NetworkAddress*) addr() { try synchronized(m_cmdInfo.mtx) return cast(shared)&m_cmdInfo.addr; @@ -190,7 +148,6 @@ package shared struct DNSCmdInfo bool ipv6; string url; NetworkAddress addr; - Waiter waiter; AsyncSignal ready; AsyncDNS dns; Mutex mtx; // for NetworkAddress writing @@ -206,3 +163,39 @@ package shared struct DNSReadyHandler { return; } } + +private void process(shared AsyncDNS ctxt) { + auto evLoop = getThreadEventLoop(); + + DNSCmdInfo cmdInfo = ctxt.cmdInfo(); + auto mutex = cmdInfo.mtx; + DNSCmd cmd; + string url; + cmd = cmdInfo.command; + url = cmdInfo.url; + + try final switch (cmd) + { + case DNSCmd.RESOLVEHOST: + *ctxt.addr = cast(shared) evLoop.resolveHost(url, 0, cmdInfo.ipv6 ? isIPv6.yes : isIPv6.no); + break; + + case DNSCmd.RESOLVEIP: + *ctxt.addr = cast(shared) evLoop.resolveIP(url, 0, cmdInfo.ipv6 ? isIPv6.yes : isIPv6.no); + break; + + } catch (Throwable e) { + auto status = StatusInfo.init; + status.code = Status.ERROR; + try status.text = e.toString(); catch {} + ctxt.status = status; + } + + try cmdInfo.ready.trigger(evLoop); + catch (Throwable e) { + auto status = StatusInfo.init; + status.code = Status.ERROR; + try status.text = e.toString(); catch {} + ctxt.status = status; + } +} \ No newline at end of file diff --git a/source/libasync/file.d b/source/libasync/file.d index f7ffa07..668ec64 100644 --- a/source/libasync/file.d +++ b/source/libasync/file.d @@ -10,7 +10,9 @@ import core.atomic; import libasync.internals.path; import libasync.threads; import std.file; +import std.conv : to; import libasync.internals.memory; +import libasync.internals.logging; /// Runs all blocking file I/O commands in a thread pool and calls the handler /// upon completion. @@ -159,7 +161,8 @@ public: filePath = Path(file_path); } catch {} offset = off; - return sendCommand(); + + return doOffThread({ process(this); }); } /// Writes the data from the buffer into the file at the specified path starting at the @@ -230,8 +233,8 @@ public: filePath = Path(file_path); } catch {} offset = off; - return sendCommand(); + return doOffThread({ process(this); }); } /// Appends the data from the buffer into a file at the specified path. @@ -296,47 +299,10 @@ public: m_cmdInfo.command = FileCmd.APPEND; filePath = Path(file_path); } catch {} - return sendCommand(); - } - - private bool sendCommand() - in { assert(!waiting, "File is busy or closed"); } - body { - waiting = true; - m_error = false; - status = StatusInfo.init; - - Waiter cmd_handler; - - try { - cmd_handler = popWaiter(); - } catch (Throwable e) { - import std.stdio; - try { - status = StatusInfo(Status.ERROR, e.toString()); - m_error = true; - } catch {} - - return false; - } - - assert(cmd_handler.cond, "Could not lock a thread for async operations. Note: Async file I/O in static constructors is unsupported."); - - m_cmdInfo.waiter = cast(shared) cmd_handler; - try { - synchronized(gs_wlock) - gs_jobs.insert(CommandInfo(CmdInfoType.FILE, cast(void*) this)); - cmd_handler.cond.notifyAll(); - } - catch (Exception e){ - static if (DEBUG) { - import std.stdio; - try writeln("Exception occured notifying foreign thread: ", e); catch {} - } - } - return true; + return doOffThread({ process(this); }); } + package: synchronized @property FileCmdInfo cmdInfo() { @@ -363,10 +329,7 @@ package: synchronized @property void file(ref File f) { try (cast()*m_file).opAssign(f); catch (Exception e) { - static if (DEBUG) { - import std.stdio : writeln; - try writeln(e.msg); catch {} - } + trace(e.msg); } } @@ -385,10 +348,7 @@ package: void handler() { try m_handler(); catch (Throwable e) { - static if (DEBUG) { - import std.stdio : writeln; - try writeln("Failed to send command. ", e.toString()); catch {} - } + trace("Failed to send command. ", e.toString()); } } } @@ -404,7 +364,6 @@ package shared struct FileCmdInfo FileCmd command; Path filePath; ubyte[] buffer; - Waiter waiter; AsyncSignal ready; AsyncFile file; Mutex mtx; // for buffer writing @@ -421,3 +380,55 @@ package shared struct FileReadyHandler { return; } } + +private void process(shared AsyncFile ctxt) { + auto cmdInfo = ctxt.cmdInfo; + auto mutex = cmdInfo.mtx; + FileCmd cmd; + cmd = cmdInfo.command; + + try final switch (cmd) + { + case FileCmd.READ: + File file = ctxt.file; + if (ctxt.offset != -1) + file.seek(cast(long) ctxt.offset); + ubyte[] res; + synchronized(mutex) res = file.rawRead(cast(ubyte[])ctxt.buffer); + if (res) + ctxt.offset = cast(ulong) (ctxt.offset + res.length); + + break; + + case FileCmd.WRITE: + + File file = cast(File) ctxt.file; + if (ctxt.offset != -1) + file.seek(cast(long) ctxt.offset); + synchronized(mutex) file.rawWrite(cast(ubyte[]) ctxt.buffer); + file.flush(); + ctxt.offset = cast(ulong) (ctxt.offset + ctxt.buffer.length); + break; + + case FileCmd.APPEND: + File file = cast(File) ctxt.file; + synchronized(mutex) file.rawWrite(cast(ubyte[]) ctxt.buffer); + ctxt.offset = cast(ulong) file.size(); + file.flush(); + break; + } catch (Throwable e) { + auto status = StatusInfo.init; + status.code = Status.ERROR; + try status.text = "Error in " ~ cmd.to!string ~ ", " ~ e.toString(); catch {} + ctxt.status = status; + } + + try cmdInfo.ready.trigger(getThreadEventLoop()); + catch (Throwable e) { + trace("AsyncFile Thread Error: ", e.toString()); + auto status = StatusInfo.init; + status.code = Status.ERROR; + try status.text = e.toString(); catch {} + ctxt.status = status; + } +} \ No newline at end of file diff --git a/source/libasync/threads.d b/source/libasync/threads.d index a95849f..431e438 100644 --- a/source/libasync/threads.d +++ b/source/libasync/threads.d @@ -1,324 +1,37 @@ -module libasync.threads; -import core.sync.mutex; -import core.sync.condition; -import core.thread; -import libasync.events; -import std.stdio; -import std.container : Array; -import core.atomic; -import std.conv : to; + +module libasync.threads; -nothrow { - struct Waiter { - Mutex mtx; - Condition cond; - } - - __gshared Mutex gs_wlock; - __gshared Array!Waiter gs_waiters; - __gshared Array!CommandInfo gs_jobs; - __gshared Condition gs_started; - shared(bool) gs_closing; - - __gshared ThreadGroup gs_threads; // daemon threads - shared(int) gs_threadCnt; - -} - -final class CmdProcessor : Thread -{ -nothrow: -private: - EventLoop m_evLoop; - Waiter m_waiter; - shared(bool) g_stop; - - this() { - try { - Mutex mtx = new Mutex; - Condition cond = new Condition(mtx); - m_waiter = Waiter(mtx, cond); - super(&run); - } - catch (Throwable e) { - static if (DEBUG) { - import std.stdio; - try writeln("Failed to run thread ... ", e.toString()); catch {} - } - } - } - - void process(shared AsyncDNS ctxt) { - DNSCmdInfo cmdInfo = ctxt.cmdInfo(); - auto mutex = cmdInfo.mtx; - DNSCmd cmd; - Waiter waiter; - string url; - cmd = cmdInfo.command; - waiter = cast(Waiter)cmdInfo.waiter; - url = cmdInfo.url; - try assert(m_waiter == waiter, "File processor is handling a command from the wrong thread"); catch {} - - try final switch (cmd) - { - case DNSCmd.RESOLVEHOST: - *ctxt.addr = cast(shared) m_evLoop.resolveHost(url, 0, cmdInfo.ipv6?isIPv6.yes:isIPv6.no); - break; - - case DNSCmd.RESOLVEIP: - *ctxt.addr = cast(shared) m_evLoop.resolveIP(url, 0, cmdInfo.ipv6?isIPv6.yes:isIPv6.no); - break; - - } catch (Throwable e) { - auto status = StatusInfo.init; - status.code = Status.ERROR; - try status.text = e.toString(); catch {} - ctxt.status = status; - } - - try { - cmdInfo.ready.trigger(m_evLoop); - synchronized(gs_wlock) - gs_waiters.insertBack(m_waiter); - gs_started.notifyAll(); // saves some waiting on a new thread - } - catch (Throwable e) { - auto status = StatusInfo.init; - status.code = Status.ERROR; - try status.text = e.toString(); catch {} - ctxt.status = status; - } - } - - void process(shared AsyncFile ctxt) { - auto cmdInfo = ctxt.cmdInfo; - auto mutex = cmdInfo.mtx; - FileCmd cmd; - Waiter waiter; - cmd = cmdInfo.command; - waiter = cast(Waiter)cmdInfo.waiter; - - try assert(m_waiter == waiter, "File processor is handling a command from the wrong thread"); catch {} - - try final switch (cmd) - { - case FileCmd.READ: - File file = ctxt.file; - if (ctxt.offset != -1) - file.seek(cast(long)ctxt.offset); - ubyte[] res; - synchronized(mutex) res = file.rawRead(cast(ubyte[])ctxt.buffer); - if (res) - ctxt.offset = cast(ulong) (ctxt.offset + res.length); - - break; - - case FileCmd.WRITE: - - File file = cast(File)ctxt.file; - if (ctxt.offset != -1) - file.seek(cast(long)ctxt.offset); - synchronized(mutex) { - file.rawWrite(cast(ubyte[])ctxt.buffer); - } - file.flush(); - ctxt.offset = cast(ulong) (ctxt.offset + ctxt.buffer.length); - break; - - case FileCmd.APPEND: - File file = cast(File)ctxt.file; - synchronized(mutex) file.rawWrite(cast(ubyte[]) ctxt.buffer); - ctxt.offset = cast(ulong) file.size(); - file.flush(); - break; - } catch (Throwable e) { - auto status = StatusInfo.init; - status.code = Status.ERROR; - try status.text = "Error in " ~ cmd.to!string ~ ", " ~ e.toString(); catch {} - ctxt.status = status; - } - - - try { - - cmdInfo.ready.trigger(m_evLoop); - - synchronized(gs_wlock) - gs_waiters.insertBack(m_waiter); - gs_started.notifyAll(); // saves some waiting on a new thread - } - catch (Throwable e) { - static if (DEBUG) { - try writeln("AsyncFile Thread Error: ", e.toString()); catch {} - } - auto status = StatusInfo.init; - status.code = Status.ERROR; - try status.text = e.toString(); catch {} - ctxt.status = status; - } - } +import std.parallelism; - void run() - { - core.atomic.atomicOp!"+="(gs_threadCnt, cast(int) 1); - try { - m_evLoop = new EventLoop; - synchronized(gs_wlock) { - gs_waiters.insertBack(m_waiter); - } +import libasync.internals.logging; - gs_started.notifyAll(); - - process(); - } catch (Throwable e) { - static if (DEBUG) { - try writeln("Error inserting in waiters " ~ e.toString()); catch {} - } - } - - core.atomic.atomicOp!"-="(gs_threadCnt, cast(int) 1); - } - - void stop() - { - atomicStore(g_stop, true); - try (cast(Waiter)m_waiter).cond.notifyAll(); - catch (Exception e) { - static if (DEBUG) { - try writeln("Exception occured notifying foreign thread: ", e.toString()); catch {} - } - } - } - - private void process() { - while(!atomicLoad(g_stop)) { - CommandInfo cmd; - try synchronized(m_waiter.mtx) { - if (!m_waiter.cond) return; - m_waiter.cond.wait(); - } - catch {} - if (atomicLoad(g_stop)) break; - try synchronized(gs_wlock) { - if (gs_jobs.empty) continue; - cmd = gs_jobs.back; - gs_jobs.removeBack(); - } catch {} - - final switch (cmd.type) { - case CmdInfoType.FILE: - process(cast(shared AsyncFile) cmd.data); - break; - case CmdInfoType.DNS: - process(cast(shared AsyncDNS) cmd.data); - break; - } - - } +bool spawnAsyncThreads(uint threadCount = totalCPUs > 1 ? totalCPUs - 1 : 1) nothrow +in { + assert(threadCount >= 1, "Need at least one worker thread"); +} body { + try defaultPoolThreads(threadCount); + catch (Exception e) { + critical("Failed to spawn worker threads: ", e.toString()); + return false; } - -} - -Waiter popWaiter() { - if (atomicLoad(gs_threadCnt) == 0) return Waiter.init; // we're in a static ctor probably... - Waiter cmd_handler; - bool start_thread; - do { - if (start_thread) { - Thread thr = new CmdProcessor; - thr.isDaemon = true; - thr.name = "CmdProcessor"; - thr.start(); - gs_threads.add(thr); - } - - synchronized(gs_wlock) { - if (start_thread && !gs_started.wait(5.seconds)) - continue; - - try { - if (!cmd_handler.mtx && !gs_waiters.empty) { - cmd_handler = gs_waiters.back; - gs_waiters.removeBack(); - } - else if (core.atomic.atomicLoad(gs_threadCnt) < 16) { - start_thread = true; - } - else { - Thread.sleep(50.usecs); - } - } catch (Exception e){ - static if (DEBUG) { - import std.stdio : writeln; - writeln("Exception in popWaiter: ", e); - } - } - } - } while(!cmd_handler.cond); - return cmd_handler; -} - -shared static this() { - import std.stdio : writeln; - gs_wlock = new Mutex; - gs_threads = new ThreadGroup; - gs_started = new Condition(gs_wlock); + return true; } -bool spawnAsyncThreads(int threadCount = 4) nothrow -in { assert(threadCount >= 1, "At least one file/dns worker thread is required."); } -body { - import core.atomic : atomicLoad; - try { - if(!atomicLoad(gs_closing) && atomicLoad(gs_threadCnt) == 0) { - synchronized { - if(!atomicLoad(gs_closing) && atomicLoad(gs_threadCnt) == 0) { - foreach (i; 0 .. threadCount) { - Thread thr = new CmdProcessor; - thr.isDaemon = true; - thr.name = "CmdProcessor"; - gs_threads.add(thr); - thr.start(); - synchronized(gs_wlock) - gs_started.wait(1.seconds); - } - } - } - } - } catch(Exception ignore) { +nothrow +bool doOffThread(void delegate() work) { + try taskPool.put(task(work)); + catch (Exception e) { + critical("Failed to dispatch task to thread pool: ", e.toString()); return false; } return true; } +nothrow void destroyAsyncThreads() { - if (!atomicLoad(gs_closing)) atomicStore(gs_closing, true); - else return; - synchronized(gs_wlock) foreach (thr; gs_threads) { - CmdProcessor thread = cast(CmdProcessor)thr; - thread.stop(); - import core.thread : thread_isMainThread; - if (thread_isMainThread) - thread.join(); + try taskPool.finish(true); + catch (Exception e) { + critical("Failed to terminate worker threads: ", e.toString()); + assert(false); } -} - -static ~this() { - import core.thread : thread_isMainThread; - if (thread_isMainThread) - destroyAsyncThreads(); -} - -shared static ~this() { - assert(atomicLoad(gs_closing), "You must call libasync.threads.destroyAsyncThreads() upon termination of the program to avoid segfaulting"); -} - -enum CmdInfoType { - FILE, - DNS -} - -struct CommandInfo { - CmdInfoType type; - void* data; -} +} \ No newline at end of file