Skip to content

Commit

Permalink
Fix status-im#8 and related issues, added more tests for it.
Browse files Browse the repository at this point in the history
Fix Unix connection failed bug.
  • Loading branch information
cheatfate committed Aug 24, 2018
1 parent d2f9e27 commit 2a28139
Show file tree
Hide file tree
Showing 8 changed files with 454 additions and 145 deletions.
2 changes: 1 addition & 1 deletion asyncdispatch2.nimble
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
packageName = "asyncdispatch2"
version = "2.0.8"
version = "2.0.9"
author = "Status Research & Development GmbH"
description = "Asyncdispatch2"
license = "Apache License 2.0 or MIT"
Expand Down
118 changes: 74 additions & 44 deletions asyncdispatch2/asyncloop.nim
Original file line number Diff line number Diff line change
Expand Up @@ -287,12 +287,14 @@ when defined(windows) or defined(nimdoc):
var gDisp{.threadvar.}: PDispatcher ## Global dispatcher

proc setGlobalDispatcher*(disp: PDispatcher) =
## Set current thread's dispatcher instance to ``disp``.
if not gDisp.isNil:
assert gDisp.callbacks.len == 0
gDisp = disp
initCallSoonProc()

proc getGlobalDispatcher*(): PDispatcher =
## Returns current thread's dispatcher instance.
if gDisp.isNil:
setGlobalDispatcher(newDispatcher())
result = gDisp
Expand All @@ -303,14 +305,15 @@ when defined(windows) or defined(nimdoc):
return disp.ioPort

proc register*(fd: AsyncFD) =
## Registers ``fd`` with the dispatcher.
let p = getGlobalDispatcher()
if createIoCompletionPort(fd.Handle, p.ioPort,
## Register file descriptor ``fd`` in thread's dispatcher.
let loop = getGlobalDispatcher()
if createIoCompletionPort(fd.Handle, loop.ioPort,
cast[CompletionKey](fd), 1) == 0:
raiseOSError(osLastError())
p.handles.incl(fd)
loop.handles.incl(fd)

proc poll*() =
## Perform single asynchronous step.
let loop = getGlobalDispatcher()
var curTime = fastEpochTime()
var curTimeout = DWORD(0)
Expand Down Expand Up @@ -397,16 +400,21 @@ when defined(windows) or defined(nimdoc):
loop.transmitFile = cast[WSAPROC_TRANSMITFILE](funcPointer)
close(sock)

proc closeSocket*(socket: AsyncFD) =
proc closeSocket*(socket: AsyncFD, aftercb: CallbackFunc = nil) =
## Closes a socket and ensures that it is unregistered.
let loop = getGlobalDispatcher()
socket.SocketHandle.close()
getGlobalDispatcher().handles.excl(socket)
loop.handles.excl(socket)
if not isNil(aftercb):
var acb = AsyncCallback(function: aftercb)
loop.callbacks.addLast(acb)

proc unregister*(fd: AsyncFD) =
## Unregisters ``fd``.
getGlobalDispatcher().handles.excl(fd)

proc contains*(disp: PDispatcher, fd: AsyncFD): bool =
## Returns ``true`` if ``fd`` is registered in thread's dispatcher.
return fd in disp.handles

else:
Expand Down Expand Up @@ -435,6 +443,7 @@ else:
proc `==`*(x, y: AsyncFD): bool {.borrow.}

proc newDispatcher*(): PDispatcher =
## Create new dispatcher.
new result
result.selector = newSelector[SelectorData]()
result.timers.newHeapQueue()
Expand All @@ -444,89 +453,117 @@ else:
var gDisp{.threadvar.}: PDispatcher ## Global dispatcher

proc setGlobalDispatcher*(disp: PDispatcher) =
## Set current thread's dispatcher instance to ``disp``.
if not gDisp.isNil:
assert gDisp.callbacks.len == 0
gDisp = disp
initCallSoonProc()

proc getGlobalDispatcher*(): PDispatcher =
## Returns current thread's dispatcher instance.
if gDisp.isNil:
setGlobalDispatcher(newDispatcher())
result = gDisp

proc getIoHandler*(disp: PDispatcher): Selector[SelectorData] =
## Returns system specific OS queue.
return disp.selector

proc register*(fd: AsyncFD) =
## Register file descriptor ``fd`` in selector.
## Register file descriptor ``fd`` in thread's dispatcher.
let loop = getGlobalDispatcher()
var data: SelectorData
data.rdata.fd = fd
data.wdata.fd = fd
let loop = getGlobalDispatcher()
loop.selector.registerHandle(int(fd), {}, data)

proc unregister*(fd: AsyncFD) =
## Unregister file descriptor ``fd`` from selector.
## Unregister file descriptor ``fd`` from thread's dispatcher.
getGlobalDispatcher().selector.unregister(int(fd))

proc contains*(disp: PDispatcher, fd: AsyncFd): bool {.inline.} =
## Returns ``true`` if ``fd`` is registered in thread's dispatcher.
result = int(fd) in disp.selector

proc addReader*(fd: AsyncFD, cb: CallbackFunc, udata: pointer = nil) =
## Start watching the file descriptor ``fd`` for read availability and then
## call the callback ``cb`` with specified argument ``udata``.
let p = getGlobalDispatcher()
let loop = getGlobalDispatcher()
var newEvents = {Event.Read}
withData(p.selector, int(fd), adata) do:
withData(loop.selector, int(fd), adata) do:
let acb = AsyncCallback(function: cb, udata: addr adata.rdata)
adata.reader = acb
adata.rdata = CompletionData(fd: fd, udata: udata)
newEvents.incl(Event.Read)
if not isNil(adata.writer.function): newEvents.incl(Event.Write)
do:
raise newException(ValueError, "File descriptor not registered.")
p.selector.updateHandle(int(fd), newEvents)
loop.selector.updateHandle(int(fd), newEvents)

proc removeReader*(fd: AsyncFD) =
## Stop watching the file descriptor ``fd`` for read availability.
let p = getGlobalDispatcher()
let loop = getGlobalDispatcher()
var newEvents: set[Event]
withData(p.selector, int(fd), adata) do:
withData(loop.selector, int(fd), adata) do:
# We need to clear `reader` data, because `selectors` don't do it
adata.reader = AsyncCallback()
adata.rdata = CompletionData()
adata.reader.function = nil
# adata.rdata = CompletionData()
if not isNil(adata.writer.function): newEvents.incl(Event.Write)
do:
raise newException(ValueError, "File descriptor not registered.")
p.selector.updateHandle(int(fd), newEvents)
loop.selector.updateHandle(int(fd), newEvents)

proc addWriter*(fd: AsyncFD, cb: CallbackFunc, udata: pointer = nil) =
## Start watching the file descriptor ``fd`` for write availability and then
## call the callback ``cb`` with specified argument ``udata``.
let p = getGlobalDispatcher()
let loop = getGlobalDispatcher()
var newEvents = {Event.Write}
withData(p.selector, int(fd), adata) do:
withData(loop.selector, int(fd), adata) do:
let acb = AsyncCallback(function: cb, udata: addr adata.wdata)
adata.writer = acb
adata.wdata = CompletionData(fd: fd, udata: udata)
newEvents.incl(Event.Write)
if not isNil(adata.reader.function): newEvents.incl(Event.Read)
do:
raise newException(ValueError, "File descriptor not registered.")
p.selector.updateHandle(int(fd), newEvents)
loop.selector.updateHandle(int(fd), newEvents)

proc removeWriter*(fd: AsyncFD) =
## Stop watching the file descriptor ``fd`` for write availability.
let p = getGlobalDispatcher()
let loop = getGlobalDispatcher()
var newEvents: set[Event]
withData(p.selector, int(fd), adata) do:
withData(loop.selector, int(fd), adata) do:
# We need to clear `writer` data, because `selectors` don't do it
adata.writer = AsyncCallback()
adata.wdata = CompletionData()
adata.writer.function = nil
# adata.wdata = CompletionData()
if not isNil(adata.reader.function): newEvents.incl(Event.Read)
do:
raise newException(ValueError, "File descriptor not registered.")
p.selector.updateHandle(int(fd), newEvents)
loop.selector.updateHandle(int(fd), newEvents)

proc closeSocket*(fd: AsyncFD, aftercb: CallbackFunc = nil) =
## Close asynchronous socket.
##
## Please note, that socket is not closed immediately. To avoid bugs with
## closing socket, while operation pending, socket will be closed as
## soon as all pending operations will be notified.
## You can execute ``aftercb`` before actual socket close operation.
let loop = getGlobalDispatcher()

proc continuation(udata: pointer) =
aftercb(nil)
unregister(fd)
close(SocketHandle(fd))

withData(loop.selector, int(fd), adata) do:
if not isNil(adata.reader.function):
loop.callbacks.addLast(adata.reader)
if not isNil(adata.writer.function):
loop.callbacks.addLast(adata.writer)

if not isNil(aftercb):
var acb = AsyncCallback(function: continuation)
loop.callbacks.addLast(acb)

when ioselSupportedPlatform:
proc addSignal*(signal: int, cb: CallbackFunc,
Expand All @@ -535,10 +572,10 @@ else:
## callback ``cb`` with specified argument ``udata``. Returns signal
## identifier code, which can be used to remove signal callback
## via ``removeSignal``.
let p = getGlobalDispatcher()
let loop = getGlobalDispatcher()
var data: SelectorData
result = p.selector.registerSignal(signal, data)
withData(p.selector, result, adata) do:
result = loop.selector.registerSignal(signal, data)
withData(loop.selector, result, adata) do:
adata.reader = AsyncCallback(function: cb, udata: addr adata.rdata)
adata.rdata.fd = AsyncFD(result)
adata.rdata.udata = udata
Expand All @@ -547,8 +584,8 @@ else:

proc removeSignal*(sigfd: int) =
## Remove watching signal ``signal``.
let p = getGlobalDispatcher()
p.selector.unregister(sigfd)
let loop = getGlobalDispatcher()
loop.selector.unregister(sigfd)

proc poll*() =
## Perform single asynchronous step.
Expand All @@ -569,21 +606,18 @@ else:
let fd = loop.keys[i].fd
let events = loop.keys[i].events

if Event.Read in events or events == {Event.Error}:
withData(loop.selector, fd, adata) do:
withData(loop.selector, fd, adata) do:
if Event.Read in events or events == {Event.Error}:
loop.callbacks.addLast(adata.reader)

if Event.Write in events or events == {Event.Error}:
withData(loop.selector, fd, adata) do:
if Event.Write in events or events == {Event.Error}:
loop.callbacks.addLast(adata.writer)

if Event.User in events:
withData(loop.selector, fd, adata) do:
if Event.User in events:
loop.callbacks.addLast(adata.reader)

when ioselSupportedPlatform:
if customSet * events != {}:
withData(loop.selector, fd, adata) do:
when ioselSupportedPlatform:
if customSet * events != {}:
loop.callbacks.addLast(adata.reader)

# Moving expired timers to `loop.callbacks`.
Expand Down Expand Up @@ -618,10 +652,6 @@ proc removeTimer*(at: uint64, cb: CallbackFunc, udata: pointer = nil) =
if index != -1:
loop.timers.del(index)

# proc completeProxy*[T](data: pointer) =
# var future = cast[Future[T]](data)
# future.complete()

proc sleepAsync*(ms: int): Future[void] =
## Suspends the execution of the current async procedure for the next
## ``ms`` milliseconds.
Expand Down Expand Up @@ -656,7 +686,7 @@ proc withTimeout*[T](fut: Future[T], timeout: int): Future[bool] =
proc wait*[T](fut: Future[T], timeout = -1): Future[T] =
## Returns a future which will complete once future ``fut`` completes
## or if timeout of ``timeout`` milliseconds has been expired.
##
##
## If ``timeout`` is ``-1``, then statement ``await wait(fut)`` is
## equal to ``await fut``.
var retFuture = newFuture[T]("asyncdispatch.wait")
Expand Down
5 changes: 0 additions & 5 deletions asyncdispatch2/handles.nim
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,3 @@ proc wrapAsyncSocket*(sock: SocketHandle): AsyncFD =
return asyncInvalidSocket
result = AsyncFD(sock)
register(result)

proc closeAsyncSocket*(s: AsyncFD) {.inline.} =
## Closes asynchronous socket handle ``s``.
unregister(s)
close(SocketHandle(s))
Loading

0 comments on commit 2a28139

Please sign in to comment.