Skip to content

Commit

Permalink
Cleanup and bug fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
etcimon committed Dec 12, 2023
1 parent b6d2ec3 commit 4935b3b
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 75 deletions.
3 changes: 2 additions & 1 deletion source/libasync/dns.d
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,11 @@ package struct AsyncDNSRequest
import libasync.internals.win32;
PADDRINFOEX infos;
}
mixin FreeList!1_000;
mixin FreeList!1_000;
}



package shared struct DNSCmdInfo
{
DNSCmd command;
Expand Down
12 changes: 8 additions & 4 deletions source/libasync/events.d
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,17 @@ version(Posix) {
public import libasync.posix;
}

private static EventLoop g_evLoop;
///
EventLoop getThreadEventLoop() nothrow {
static EventLoop evLoop;
if (!evLoop) {
evLoop = new EventLoop;
if (!g_evLoop) {
g_evLoop = new EventLoop();
}
return evLoop;
return g_evLoop;
}

static ~this() {
if (g_evLoop) g_evLoop.destroy();
}

/// Event handlers can be registered to the event loop by being run(), all events
Expand Down
59 changes: 3 additions & 56 deletions source/libasync/internals/freelist.d
Original file line number Diff line number Diff line change
Expand Up @@ -7,72 +7,19 @@ mixin template FreeList(alias Limit)
{
static if (!is(typeof(this) == struct)) static assert(false, "FreeList only works on structs");

private:
alias T = typeof(this);

struct FreeListInfo
{
/// Head element in the freelist of previously allocated elements.
static T* head;
/// Current number of elements in the freelist
static if (isIntegral!(typeof(Limit))) {
static typeof(Limit) count;
} else {
static size_t count;
}
/// Next element in the freelist, if this element is on it.
T* next;
}
FreeListInfo freelist;

public:
import std.exception : assumeWontThrow;
import std.traits : isIntegral;
import std.conv : emplace;

import memutils.utils : ThreadMem;

import std.exception : assumeWontThrow;
static T* alloc(Args...)(auto ref Args args) @trusted
{
T* obj = void;

// If a previously allocated instance is available,
// pull it from the freelist and return it.
if (freelist.head) {
obj = freelist.head;
freelist.head = obj.freelist.next;
freelist.count -= 1;
static if (Args.length > 0) {
emplace!(T)(obj, args);
}
static if (LOG) .tracef(T.stringof ~ ".FreeList.alloc: Pulled %s", obj);
// Otherwise, allocate a new instance.
} else {
obj = assumeWontThrow(ThreadMem.alloc!T(args));
static if (LOG) .tracef(T.stringof ~ ".FreeList.alloc: Allocated %s", obj);
}

return obj;
return assumeWontThrow(ThreadMem.alloc!T(args));
}

static void free(T* obj) @trusted
{
static if (isIntegral!(typeof(Limit))) {
if (freelist.count <= Limit / T.sizeof) {
obj.freelist.next = freelist.head;
freelist.head = obj;
freelist.count += 1;
static if (LOG) .tracef(T.stringof ~ ".FreeList.free: Pushed %s", obj);
} else {
static if (LOG) .tracef(T.stringof ~ ".FreeList.free: Deallocating %s", obj);
ThreadMem.free(obj);
}
} else {
obj.freelist.next = freelist.head;
freelist.head = obj;
freelist.count += 1;
static if (LOG) .tracef(T.stringof ~ ".FreeList.free: Pushed %s", obj);
}
assumeWontThrow(ThreadMem.free(obj));
}
}
mixin template UnlimitedFreeList() { mixin FreeList!null; }
2 changes: 1 addition & 1 deletion source/libasync/internals/win32.d
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ extern(System) nothrow
enum SO_UPDATE_CONNECT_CONTEXT = 0x7010;

bool CancelIo(HANDLE hFile);
bool CancelIOEx(HANDLE hFile, OVERLAPPED* lpOverlapped);
bool CancelIoEx(HANDLE hFile, OVERLAPPED* lpOverlapped);

SOCKET WSAAccept(SOCKET s, sockaddr *addr, INT* addrlen, LPCONDITIONPROC lpfnCondition, DWORD_PTR dwCallbackData);
int WSAAsyncSelect(SOCKET s, HWND hWnd, uint wMsg, sizediff_t lEvent);
Expand Down
5 changes: 4 additions & 1 deletion source/libasync/test.d
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import std.conv : to;
import std.datetime.stopwatch : StopWatch;
import core.stdc.stdlib : getenv;
import std.string : fromStringz, toStringz;

AsyncDirectoryWatcher g_watcher;
shared AsyncDNS g_dns;
string cache_path;
Expand Down Expand Up @@ -53,8 +52,12 @@ unittest {

assert(g_cbTimerCnt >= 3, "Multitimer expired only " ~ g_cbTimerCnt.to!string ~ " times"); // MultiTimer expired 3-4 times
g_watcher.kill();
g_watcher.destroy();
g_notifier.kill();
g_notifier.destroy();
g_listnr.kill();
g_listnr.destroy();
g_dns.destroy();
version(LDC) {
import core.stdc.stdlib; exit(0);
}
Expand Down
45 changes: 33 additions & 12 deletions source/libasync/windows.d
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,12 @@ private:
HashMap!(fd_t, DWHandlerInfo) m_dwHandlers; // todo: Change this to an array too
HashMap!(uint, DWFolderWatcher) m_dwFolders;
HashMap!(fd_t, tcp_keepalive)* kcache;
~this() { kcache.destroy(); }
~this() {
kcache.destroy();
foreach(fd, watcher; m_dwHandlers) {
watcher.destroy();
}
}
nothrow:
private:
struct TimerCache {
Expand Down Expand Up @@ -613,22 +618,21 @@ package:
bool kill(AsyncDirectoryWatcher ctxt) {

try {
Vector!DWFolderWatcher toFree;
foreach (ref const uint k, const DWFolderWatcher v; m_dwFolders) {
Vector!uint toFree;
foreach (ref const uint k, DWFolderWatcher v; m_dwFolders) {
if (v.fd == ctxt.fd) {
CloseHandle(v.handle);
toFree ~= v;
m_dwFolders.remove(k);
v.close();
ThreadMem.free(v);
toFree ~= k;
}
}
foreach (uint k; toFree[]) {
m_dwFolders.remove(k);
}

foreach (DWFolderWatcher obj; toFree[])
ThreadMem.free(obj);

// todo: close all the handlers...
m_dwHandlers.remove(ctxt.fd);
}
catch (Exception e) {
catch (Throwable e) {
setInternalError!"in kill(AsyncDirectoryWatcher)"(Status.ERROR, e.msg);
return false;
}
Expand Down Expand Up @@ -978,7 +982,9 @@ package:
DWHandlerInfo handler = m_dwHandlers.get(fd, DWHandlerInfo.init);
assert(handler !is null);
static if (LOG) log("Watching: " ~ info.path.toNativeString());
(m_dwFolders)[wd] = ThreadMem.alloc!DWFolderWatcher(m_evLoop, fd, hndl, info.path, info.events, handler, info.recursive);
auto dwWatcher = ThreadMem.alloc!DWFolderWatcher(m_evLoop, fd, hndl, info.path, info.events, handler, info.recursive);

(m_dwFolders)[wd] = dwWatcher;
} catch (Exception e) {
setInternalError!"watch"(Status.ERROR, "Could not start watching directory: " ~ e.msg);
return 0;
Expand Down Expand Up @@ -2532,6 +2538,11 @@ private:
shared AsyncSignal m_signal;
ubyte[FILE_NOTIFY_INFORMATION.sizeof + MAX_PATH + 1] m_buffer;
DWORD m_bytesTransferred;
OVERLAPPED* m_pendingOverlapped;
~this() {
if (m_handler) m_handler.destroy();
if (m_signal) m_signal.destroy();
}
public:
this(EventLoop evl, in fd_t fd, in HANDLE hndl, in Path path, in DWFileEvent events, DWHandlerInfo handler, bool recursive) {
m_fd = fd;
Expand All @@ -2547,11 +2558,16 @@ public:
}
package:
void close() {
if (m_pendingOverlapped) {
ThreadMem.free(m_pendingOverlapped);
}
CancelIoEx(m_handle, NULL);
CloseHandle(m_handle);
m_signal.kill();
}

void triggerChanged() {

m_signal.trigger();
}

Expand Down Expand Up @@ -2591,8 +2607,10 @@ package:
overlapped.Offset = 0;
overlapped.OffsetHigh = 0;
overlapped.Pointer = cast(void*)this;
m_pendingOverlapped = overlapped;
import std.stdio;
DWORD bytesReturned;
import std.stdio: writefln;
BOOL success = ReadDirectoryChangesW(m_handle, m_buffer.ptr, m_buffer.length, cast(BOOL) m_recursive, notifications, &bytesReturned, overlapped, &onIOCompleted);

static if (DEBUG) {
Expand All @@ -2615,8 +2633,10 @@ package:
void onIOCompleted(DWORD dwError, DWORD cbTransferred, OVERLAPPED* overlapped)
{
import std.stdio;
tracef("Triggered onIOCompleted: %X", cast(void*)overlapped);
DWFolderWatcher watcher = cast(DWFolderWatcher)(overlapped.Pointer);
watcher.m_bytesTransferred = cbTransferred;
watcher.m_pendingOverlapped = null;
try ThreadMem.free(overlapped); catch (Throwable) {}

static if (DEBUG) {
Expand Down Expand Up @@ -2670,6 +2690,7 @@ nothrow extern(System)
scope(exit) {
FreeAddrInfoExW(infos);
assumeWontThrow(AsyncDNSRequest.free(overlapped.resolve));
assumeWontThrow(AsyncOverlapped.free(overlapped));
}
if (err != EWIN.WSA_OK) {
evLoop.setInternalError!"GetAddrInfoExW"(Status.ABORT, string.init, err);
Expand Down

0 comments on commit 4935b3b

Please sign in to comment.