Skip to content

Commit

Permalink
use windows overlapped for async dns
Browse files Browse the repository at this point in the history
  • Loading branch information
etcimon committed Dec 12, 2023
1 parent b457f59 commit b6d2ec3
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 25 deletions.
34 changes: 28 additions & 6 deletions source/libasync/dns.d
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import core.sync.mutex;
import core.sync.condition;
import core.atomic;
import libasync.threads;
import libasync.internals.freelist;
import libasync.internals.queue;
import libasync.internals.logging;

///
enum DNSCmd {
Expand All @@ -21,8 +24,8 @@ enum DNSCmd {
shared final class AsyncDNS
{
nothrow:
package EventLoop m_evLoop;
private:
EventLoop m_evLoop;
bool m_busy;
bool m_error;
DNSReadyHandler m_handler;
Expand Down Expand Up @@ -75,6 +78,17 @@ public:
assert(m_handler.ctxt !is null, "AsyncDNS must be running before being operated on.");
}
do {
static if (LOG) .tracef("Resolving url: %s", url);
version(Windows) {
if (force_async) {
m_cmdInfo.command = DNSCmd.RESOLVEHOST;
m_cmdInfo.ipv6 = ipv6;
m_cmdInfo.url = cast(shared) url;
AsyncDNSRequest* dns_req = AsyncDNSRequest.alloc(this);
return (cast(EventLoop)m_evLoop).resolve(dns_req, cmdInfo.url, 0, cmdInfo.ipv6?isIPv6.yes:isIPv6.no);
}
}

version(Libasync_Threading)
if (force_async == true) {
synchronized(m_cmdInfo.mtx) {
Expand Down Expand Up @@ -135,21 +149,29 @@ package:
m_busy = cast(shared) b;
}

void callback() {
synchronized void callback() {

try {
m_handler(cast(NetworkAddress)m_cmdInfo.addr);
}
catch (Throwable e) {
static if (DEBUG) {
import std.stdio : writeln;
try writeln("Failed to send command. ", e.toString()); catch (Throwable) {}
}
warningf("Failed to send command. %s", e.toString());
}
}

}

package struct AsyncDNSRequest
{
AsyncDNS dns; /// DNS resolver to use
version(Windows) {
import libasync.internals.win32;
PADDRINFOEX infos;
}
mixin FreeList!1_000;
}


package shared struct DNSCmdInfo
{
DNSCmd command;
Expand Down
6 changes: 6 additions & 0 deletions source/libasync/events.d
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ package:
return m_evLoop.error;
}

/** Sends a request to underlying async DNS resolver. Non-blocking */
bool resolve(AsyncDNSRequest* req, in string host, ushort port = 0, isIPv6 ipv6 = isIPv6.no, isTCP tcp = isTCP.yes)
{
return m_evLoop.resolve(req, host, port, ipv6, tcp);
}

uint recvFrom(in fd_t fd, ubyte[] data, ref NetworkAddress addr) {
return m_evLoop.recvFrom(fd, data, addr);
}
Expand Down
13 changes: 9 additions & 4 deletions source/libasync/internals/logging.d
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ static if (__VERSION__ < 2103){
}
else {
import std.logger;
import std.stdio : stdout;
}
static __gshared FileLogger sharedLog;
static this() {
sharedLog = new FileLogger(stdout, LOGLEVEL);
}

nothrow:
Expand All @@ -23,7 +28,7 @@ template defaultLogFunction(LogLevel ll)
{
static if (ll >= LOGLEVEL)
{
try stdThreadLocalLog.log!(line, file, funcName, prettyFuncName, moduleName)("", args);
try sharedLog.log!(line, file, funcName, prettyFuncName, moduleName)("", args);
catch (Throwable e) {}
}
}
Expand All @@ -35,7 +40,7 @@ template defaultLogFunction(LogLevel ll)
{
static if (ll >= LOGLEVEL)
{
try log!(line, file, funcName, prettyFuncName, moduleName)(condition, args);
try sharedLog.log!(line, file, funcName, prettyFuncName, moduleName)(condition, args);
catch (Throwable e) {}
}
}
Expand All @@ -56,7 +61,7 @@ template defaultLogFunctionf(LogLevel ll)
{
static if (ll >= LOGLEVEL)
{
try logf!(line, file, funcName, prettyFuncName, moduleName)(msg, args);
try sharedLog.logf!(line, file, funcName, prettyFuncName, moduleName)(msg, args);
catch (Throwable e) {}
}
}
Expand All @@ -68,7 +73,7 @@ template defaultLogFunctionf(LogLevel ll)
{
static if (ll >= LOGLEVEL)
{
try logf!(line, file, funcName, prettyFuncName, moduleName)(condition, msg, args);
try sharedLog.logf!(line, file, funcName, prettyFuncName, moduleName)(condition, msg, args);
catch (Throwable e) {}
}
}
Expand Down
2 changes: 2 additions & 0 deletions source/libasync/internals/win32.d
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ extern(System) nothrow
ADDRINFOEXW* ai_next;
}

alias PADDRINFOEX = ADDRINFOEXW*;

struct ADDRINFOA {
int ai_flags;
int ai_family;
Expand Down
14 changes: 7 additions & 7 deletions source/libasync/posix.d
Original file line number Diff line number Diff line change
Expand Up @@ -2976,13 +2976,13 @@ private:
void log(StatusInfo val)
{
static if (LOG) {
import std.stdio;
import libasync.internals.logging;
try {
writeln("Backtrace: ", m_status.text);
writeln(" | Status: ", m_status.code);
writeln(" | Error: " , m_error);
trace("Backtrace: ", m_status.text);
trace(" | Status: ", m_status.code);
trace(" | Error: " , m_error);
if ((m_error in EPosixMessages) !is null)
writeln(" | Message: ", EPosixMessages[m_error]);
trace(" | Message: ", EPosixMessages[m_error]);
} catch(Exception e) {
return;
}
Expand All @@ -2992,9 +2992,9 @@ private:
void log(T)(T val)
{
static if (LOG) {
import std.stdio;
import libasync.internals.logging;
try {
writeln(val);
trace(val);
} catch(Exception e) {
return;
}
Expand Down
102 changes: 94 additions & 8 deletions source/libasync/windows.d
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ package:
}

if (signal == WAIT_IO_COMPLETION) {

if (m_status.code != Status.OK) return false;

foreach (request; m_completedSocketReceives) {
Expand Down Expand Up @@ -393,6 +394,7 @@ package:
}
}
break;

default:
.warning("Unknown event was triggered: ", signal);
break;
Expand Down Expand Up @@ -1715,6 +1717,62 @@ package:
return addr;
}

bool resolve(AsyncDNSRequest* ctx, in string host, in ushort port = 0, in bool ipv6 = true, in bool tcp = true, in bool force = true)
/*in {
debug import libasync.internals.validator : validateHost;
debug assert(validateHost(host), "Trying to connect to an invalid domain");
}
do */{
trace("resolve");
m_status = StatusInfo.init;
auto overlapped = assumeWontThrow(AsyncOverlapped.alloc());
overlapped.resolve = ctx;
ADDRINFOEXW hints;
PADDRINFOEX infos;
hints.ai_flags |= AI_CANONNAME;

LPCWSTR wPort = port.to!(wchar[]).toUTFz!(const(wchar)*);

if (ipv6) {
hints.ai_family = AF_INET6;
}
else {
hints.ai_family = AF_UNSPEC;
}

if (tcp) {
hints.ai_protocol = IPPROTO_TCP;
hints.ai_socktype = SOCK_STREAM;
}
else {
hints.ai_protocol = IPPROTO_UDP;
hints.ai_socktype = SOCK_DGRAM;
}

LPCWSTR str;

try {
str = cast(LPCWSTR) toUTFz!(immutable(wchar)*)(host);
} catch (Exception e) {
setInternalError!"toUTFz"(Status.ERROR, e.msg);
return false;
}
timeval timeout = timeval(5,0);
trace("Getting addr info");
error_t err = cast(error_t) GetAddrInfoExW(str, wPort, NS_DNS, NULL, &hints,
&overlapped.resolve.infos, &timeout, cast(WSAOVERLAPPEDX*) overlapped,
cast(LPLOOKUPSERVICE_COMPLETION_ROUTINE) &onOverlappedResolveComplete, NULL);

if (err != WSA_IO_PENDING) {
setInternalError!"GetAddrInfoW"(Status.ABORT, string.init, err);
onOverlappedResolveComplete(cast(INT)err, 0, overlapped);
return false;
}

return true;
}


NetworkAddress getAddressFromDNS(in string host, in ushort port = 0, in bool ipv6 = true, in bool tcp = true, in bool force = true)
/*in {
debug import libasync.internals.validator : validateHost;
Expand All @@ -1732,7 +1790,7 @@ package:
addr.family = AF_INET6;
}
else {
hints.ai_family = AF_INET;
hints.ai_family = AF_UNSPEC;
addr.family = AF_INET;
}

Expand Down Expand Up @@ -1765,6 +1823,8 @@ package:
ubyte* pAddr = cast(ubyte*) infos.ai_addr;
ubyte* data = cast(ubyte*) addr.sockAddr;
data[0 .. infos.ai_addrlen] = pAddr[0 .. infos.ai_addrlen]; // perform bit copy

addr.family = cast(ushort)infos.ai_family;
static if (LOG) try log("GetAddrInfoW Successfully resolved DNS to: " ~ addr.toAddressString());
catch (Exception e){}
return addr;
Expand Down Expand Up @@ -2306,13 +2366,13 @@ private:
void log(StatusInfo val)
{
static if (LOG) {
import std.stdio;
import libasync.internals.logging;
try {
writeln("Backtrace: ", m_status.text);
writeln(" | Status: ", m_status.code);
writeln(" | Error: " , m_error);
trace("Backtrace: ", m_status.text);
trace(" | Status: ", m_status.code);
trace(" | Error: " , m_error);
if ((m_error in EWSAMessages) !is null)
writeln(" | Message: ", EWSAMessages[m_error]);
trace(" | Message: ", EWSAMessages[m_error]);
} catch(Exception e) {
return;
}
Expand All @@ -2322,9 +2382,9 @@ private:
void log(T)(lazy T val)
{
static if (LOG) {
import std.stdio;
import libasync.internals.logging;
try {
writeln(val);
trace(val);
} catch(Exception e) {
return;
}
Expand Down Expand Up @@ -2587,6 +2647,7 @@ struct AsyncOverlapped
AsyncAcceptRequest* accept;
AsyncReceiveRequest* receive;
AsyncSendRequest* send;
AsyncDNSRequest* resolve;
}

@property void hEvent(HANDLE hEvent) @safe pure @nogc nothrow
Expand All @@ -2598,6 +2659,31 @@ struct AsyncOverlapped

nothrow extern(System)
{
void onOverlappedResolveComplete(error_t err, DWORD recvCount, AsyncOverlapped* overlapped)
{
static if (LOG) .tracef("onOverlappedResolveComplete: error: %s, recvCount: %s", err, recvCount);

PADDRINFOEX infos = overlapped.resolve.infos;

auto evLoopMain = &(cast()overlapped.resolve.dns.m_evLoop);
EventLoopImpl* evLoop = &evLoopMain.m_evLoop;
scope(exit) {
FreeAddrInfoExW(infos);
assumeWontThrow(AsyncDNSRequest.free(overlapped.resolve));
}
if (err != EWIN.WSA_OK) {
evLoop.setInternalError!"GetAddrInfoExW"(Status.ABORT, string.init, err);
return;
}
NetworkAddress* addr = cast(NetworkAddress*)overlapped.resolve.dns.addr;
ubyte* pAddr = cast(ubyte*) infos.ai_addr;
ubyte* data = cast(ubyte*) addr.sockAddr;

data[0 .. infos.ai_addrlen] = pAddr[0 .. infos.ai_addrlen]; // perform bit copy
addr.family = cast(ushort)infos.ai_family;
overlapped.resolve.dns.cmdInfo().ready.trigger(*evLoopMain);
}

void onOverlappedReceiveComplete(error_t error, DWORD recvCount, AsyncOverlapped* overlapped, DWORD flags)
{
static if (LOG) .tracef("onOverlappedReceiveComplete: error: %s, recvCount: %s, flags: %s", error, recvCount, flags);
Expand Down

0 comments on commit b6d2ec3

Please sign in to comment.