Skip to content

Commit

Permalink
Merge pull request #103 from njsmith/sync-traps
Browse files Browse the repository at this point in the history
Sync traps
  • Loading branch information
dabeaz committed Nov 12, 2016
2 parents 95dca56 + b144a5d commit 03af124
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 87 deletions.
124 changes: 66 additions & 58 deletions curio/kernel.py
Expand Up @@ -18,7 +18,7 @@
from .errors import *
from .errors import _CancelRetry
from .task import Task
from .traps import _read_wait, Traps
from .traps import _read_wait, Traps, SyncTraps
from .local import _enable_tasklocal_for, _copy_tasklocal

# kqueue is the datatype used by the kernel for all of its queuing functionality.
Expand Down Expand Up @@ -366,7 +366,7 @@ def _set_timeout(clock, sleep_type='timeout'):
# to invoke a specific trap.

# Wait for I/O
def _trap_io(_, fileobj, event, state):
def _trap_io(fileobj, event, state):
# See comment about deferred unregister in run(). If the requested
# I/O operation is *different* than the last I/O operation that was
# performed by the task, we need to unregister the last I/O resource used
Expand All @@ -383,7 +383,7 @@ def _trap_io(_, fileobj, event, state):
current.cancel_func = lambda: selector_unregister(fileobj)

# Wait on a Future
def _trap_future_wait(_, future, event):
def _trap_future_wait(future, event):
if self._kernel_task_id is None:
_init_loopback_task()

Expand Down Expand Up @@ -415,57 +415,51 @@ def _trap_future_wait(_, future, event):
event.set()

# Add a new task to the kernel
def _trap_spawn(_, coro, daemon):
def _trap_spawn(coro, daemon):
task = _new_task(coro, daemon)
_copy_tasklocal(current, task)
_reschedule_task(current, value=task)

# Reschedule one or more tasks from a queue
def _trap_reschedule_tasks(_, queue, n):
while n > 0:
def _trap_reschedule_tasks(queue, n):
for _ in range(n):
_reschedule_task(queue.popleft())
n -= 1
_reschedule_task(current)

# Trap that returns a function for rescheduling tasks from synchronous code
def _trap_queue_reschedule_function(_, queue):
def _sync_trap_queue_reschedule_function(queue):
def _reschedule(n):
while n > 0:
for _ in range(n):
_reschedule_task(queue.popleft())
n -= 1
ready_appendleft(current)
current.next_value = _reschedule
return _reschedule

# Join with a task
def _trap_join_task(_, task):
def _trap_join_task(task):
if task.terminated:
_reschedule_task(current)
else:
if task.joining is None:
task.joining = kqueue()
_trap_wait_queue(_, task.joining, 'TASK_JOIN')
_trap_wait_queue(task.joining, 'TASK_JOIN')

# Enter or exit a 'with curio.defer_cancellation' block:
def _trap_adjust_cancel_defer_depth(_, n):
# Enter or exit an 'async with curio.defer_cancellation' block:
def _sync_trap_adjust_cancel_defer_depth(n):
current.cancel_defer_depth += n
if current.cancel_defer_depth == 0 and current.cancel_pending:
current.cancel_pending = False
if current.cancelled:
return
_reschedule_task(current, exc=CancelledError("CancelledError"))
else:
ready_appendleft(current)
raise CancelledError("CancelledError")

# Cancel a task
def _trap_cancel_task(_, task):
def _trap_cancel_task(task):
if task == current:
_reschedule_task(current, exc=CurioError("A task can't cancel itself"))
return

if task.cancelled:
ready_appendleft(current)
elif task.cancel_defer_depth > 0:
print("Deferred")
task.cancel_pending = True
ready_appendleft(current)
elif _cancel_task(task, CancelledError):
Expand All @@ -479,26 +473,31 @@ def _trap_cancel_task(_, task):
_reschedule_task(current, exc=_CancelRetry())

# Wait on a queue
def _trap_wait_queue(_, queue, state):
def _trap_wait_queue(queue, state):
queue.append(current)
current.state = state
current.cancel_func = lambda current=current: queue.remove(current)

# Sleep for a specified period. Returns value of monotonic clock.
# absolute flag indicates whether or not an absolute or relative clock
# interval has been provided
def _trap_sleep(_, clock, absolute):
if clock > 0:
if not absolute:
clock += time_monotonic()
_set_timeout(clock, 'sleep')
current.state = 'TIME_SLEEP'
current.cancel_func = lambda task=current: setattr(task, 'sleep', None)
else:
_reschedule_task(current, value=time_monotonic())
def _trap_sleep(clock, absolute):
# We used to have a special case where sleep periods <= 0 would
# simply reschedule the task to the end of the ready queue without
# actually putting it on the sleep queue first. But this meant
# that if a task looped while calling sleep(0), it would allow
# other *ready* tasks to run, but block ever checking for I/O or
# timeouts, so sleeping tasks would never wake up. That's not what
# we want; sleep(0) should mean "please give other stuff a chance
# to run". So now we always go through the whole sleep machinery.
if not absolute:
clock += time_monotonic()
_set_timeout(clock, 'sleep')
current.state = 'TIME_SLEEP'
current.cancel_func = lambda task=current: setattr(task, 'sleep', None)

# Watch signals
def _trap_sigwatch(_, sigset):
def _sync_trap_sigwatch(sigset):
# Initialize the signal handling part of the kernel if not done already
# Note: This only works if running in the main thread
if self._kernel_task_id is None:
Expand All @@ -508,21 +507,19 @@ def _trap_sigwatch(_, sigset):
self._init_signals()

self._signal_watch(sigset)
_reschedule_task(current)

# Unwatch signals
def _trap_sigunwatch(_, sigset):
def _sync_trap_sigunwatch(sigset):
self._signal_unwatch(sigset)
_reschedule_task(current)

# Wait for a signal
def _trap_sigwait(_, sigset):
def _trap_sigwait(sigset):
sigset.waiting = current
current.state = 'SIGNAL_WAIT'
current.cancel_func = lambda: setattr(sigset, 'waiting', None)

# Set a timeout to be delivered to the calling task
def _trap_set_timeout(_, timeout):
def _sync_trap_set_timeout(timeout):
old_timeout = current.timeout
if timeout:
_set_timeout(timeout)
Expand All @@ -531,11 +528,10 @@ def _trap_set_timeout(_, timeout):
else:
current.timeout = None

current.next_value = old_timeout
ready_appendleft(current)
return old_timeout

# Clear a previously set timeout
def _trap_unset_timeout(_, previous):
def _sync_trap_unset_timeout(previous):
# Here's an evil corner case. Suppose the previous timeout in effect
# has already expired? If so, then we need to arrange for a timeout
# to be generated. However, this has to happen on the *next* blocking
Expand All @@ -549,30 +545,27 @@ def _trap_unset_timeout(_, previous):
_set_timeout(previous)
else:
current.timeout = previous
current.next_value = None
ready_appendleft(current)

# Return the running kernel
def _trap_get_kernel(_):
ready_appendleft(current)
current.next_value = self
def _sync_trap_get_kernel():
return self

# Return the currently running task
def _trap_get_current(_):
ready_appendleft(current)
current.next_value = current
def _sync_trap_get_current():
return current

# Return the current value of the kernel clock
def _trap_clock(_):
ready_appendleft(current)
current.next_value = time_monotonic()
def _sync_trap_clock():
return time_monotonic()

# Create the traps tables
traps = [None] * len(Traps)
for trap in Traps:
traps[trap] = locals()[trap.name]

# Create the traps table
trap_funcs = [ val for key, val in locals().items() if key.startswith('_trap') ]
traps = [None]*len(trap_funcs)
for func in trap_funcs:
trapno = getattr(Traps, func.__name__)
traps[trapno] = func
sync_traps = [None] * len(SyncTraps)
for trap in SyncTraps:
sync_traps[trap] = locals()[trap.name]

# If a coroutine was given, add it as the first task
maintask = _new_task(coro) if coro else None
Expand Down Expand Up @@ -647,10 +640,24 @@ def _trap_clock(_):
with _enable_tasklocal_for(current):
if current.next_exc is None:
trap = current._send(current.next_value)
current.next_value = None
else:
trap = current._throw(current.next_exc)
current.next_exc = None

# If the trap is synchronous, then handle it
# immediately without rescheduling. Sync trap handlers
# have a different API than regular traps -- they just
# return or raise whatever the trap should return or
# raise.
while type(trap[0]) is SyncTraps:
try:
next_value = sync_traps[trap[0]](*trap[1:])
except Exception as next_exc:
trap = current._throw(next_exc)
else:
trap = current._send(next_value)

except StopIteration as e:
_cleanup_task(current, value=e.value)

Expand Down Expand Up @@ -681,7 +688,8 @@ def _trap_clock(_):

else:
# Execute the trap
traps[trap[0]](*trap)
assert type(trap[0]) is Traps
traps[trap[0]](*trap[1:])

finally:
# Unregister previous I/O request. Discussion follows:
Expand Down
43 changes: 23 additions & 20 deletions curio/traps.py
Expand Up @@ -33,18 +33,21 @@ class Traps(IntEnum):
_trap_join_task = 5
_trap_wait_queue = 6
_trap_reschedule_tasks = 7
_trap_sigwatch = 8
_trap_sigunwatch = 9
_trap_sigwait = 10
_trap_get_kernel = 11
_trap_get_current = 12
_trap_set_timeout = 13
_trap_unset_timeout = 14
_trap_queue_reschedule_function = 15
_trap_clock = 16
_trap_adjust_cancel_defer_depth = 17
_trap_sigwait = 8

globals().update((key,val) for key, val in vars(Traps).items() if key.startswith('_trap'))
class SyncTraps(IntEnum):
_sync_trap_adjust_cancel_defer_depth = 0
_sync_trap_get_kernel = 1
_sync_trap_get_current = 2
_sync_trap_set_timeout = 3
_sync_trap_unset_timeout = 4
_sync_trap_queue_reschedule_function = 5
_sync_trap_clock = 6
_sync_trap_sigwatch = 7
_sync_trap_sigunwatch = 8

globals().update((trap.name, trap) for trap in Traps)
globals().update((trap.name, trap) for trap in SyncTraps)

@coroutine
def _read_wait(fileobj):
Expand Down Expand Up @@ -103,7 +106,7 @@ def _adjust_cancel_defer_depth(n):
Increment or decrement the current task's cancel_defer_depth. If it goes
to 0, and the task was previously cancelled, then raises CancelledError.
'''
yield (_trap_adjust_cancel_defer_depth, n)
yield (_sync_trap_adjust_cancel_defer_depth, n)

@coroutine
def _join_task(task):
Expand Down Expand Up @@ -131,14 +134,14 @@ def _sigwatch(sigset):
'''
Start monitoring a signal set
'''
yield (_trap_sigwatch, sigset)
yield (_sync_trap_sigwatch, sigset)

@coroutine
def _sigunwatch(sigset):
'''
Stop watching a signal set
'''
yield (_trap_sigunwatch, sigset)
yield (_sync_trap_sigunwatch, sigset)

@coroutine
def _sigwait(sigset):
Expand All @@ -152,29 +155,29 @@ def _get_kernel():
'''
Get the kernel executing the task.
'''
return (yield (_trap_get_kernel,))
return (yield (_sync_trap_get_kernel,))

@coroutine
def _get_current():
'''
Get the currently executing task
'''
return (yield (_trap_get_current,))
return (yield (_sync_trap_get_current,))

@coroutine
def _set_timeout(clock):
'''
Set a timeout for the current task that occurs at the specified clock value.
Setting a clock of None clears any previous timeout.
'''
return (yield (_trap_set_timeout, clock))
return (yield (_sync_trap_set_timeout, clock))

@coroutine
def _unset_timeout(previous):
'''
Restore the previous timeout for the current task.
'''
yield (_trap_unset_timeout, previous)
yield (_sync_trap_unset_timeout, previous)

@coroutine
def _queue_reschedule_function(queue):
Expand All @@ -183,11 +186,11 @@ def _queue_reschedule_function(queue):
the use of await. Can be used in synchronous code as long as it runs
in the same thread as the Curio kernel.
'''
return (yield (_trap_queue_reschedule_function, queue))
return (yield (_sync_trap_queue_reschedule_function, queue))

@coroutine
def _clock():
'''
Return the value of the kernel clock
'''
return (yield (_trap_clock,))
return (yield (_sync_trap_clock,))

0 comments on commit 03af124

Please sign in to comment.