Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync traps #103

Merged
merged 7 commits into from Nov 12, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
124 changes: 66 additions & 58 deletions curio/kernel.py
Expand Up @@ -18,7 +18,7 @@
from .errors import *
from .errors import _CancelRetry
from .task import Task
from .traps import _read_wait, Traps
from .traps import _read_wait, Traps, SyncTraps
from .local import _enable_tasklocal_for, _copy_tasklocal

# kqueue is the datatype used by the kernel for all of its queuing functionality.
Expand Down Expand Up @@ -366,7 +366,7 @@ def _set_timeout(clock, sleep_type='timeout'):
# to invoke a specific trap.

# Wait for I/O
def _trap_io(_, fileobj, event, state):
def _trap_io(fileobj, event, state):
# See comment about deferred unregister in run(). If the requested
# I/O operation is *different* than the last I/O operation that was
# performed by the task, we need to unregister the last I/O resource used
Expand All @@ -383,7 +383,7 @@ def _trap_io(_, fileobj, event, state):
current.cancel_func = lambda: selector_unregister(fileobj)

# Wait on a Future
def _trap_future_wait(_, future, event):
def _trap_future_wait(future, event):
if self._kernel_task_id is None:
_init_loopback_task()

Expand Down Expand Up @@ -415,57 +415,51 @@ def _trap_future_wait(_, future, event):
event.set()

# Add a new task to the kernel
def _trap_spawn(_, coro, daemon):
def _trap_spawn(coro, daemon):
task = _new_task(coro, daemon)
_copy_tasklocal(current, task)
_reschedule_task(current, value=task)

# Reschedule one or more tasks from a queue
def _trap_reschedule_tasks(_, queue, n):
while n > 0:
def _trap_reschedule_tasks(queue, n):
for _ in range(n):
_reschedule_task(queue.popleft())
n -= 1
_reschedule_task(current)

# Trap that returns a function for rescheduling tasks from synchronous code
def _trap_queue_reschedule_function(_, queue):
def _sync_trap_queue_reschedule_function(queue):
def _reschedule(n):
while n > 0:
for _ in range(n):
_reschedule_task(queue.popleft())
n -= 1
ready_appendleft(current)
current.next_value = _reschedule
return _reschedule

# Join with a task
def _trap_join_task(_, task):
def _trap_join_task(task):
if task.terminated:
_reschedule_task(current)
else:
if task.joining is None:
task.joining = kqueue()
_trap_wait_queue(_, task.joining, 'TASK_JOIN')
_trap_wait_queue(task.joining, 'TASK_JOIN')

# Enter or exit a 'with curio.defer_cancellation' block:
def _trap_adjust_cancel_defer_depth(_, n):
# Enter or exit an 'async with curio.defer_cancellation' block:
def _sync_trap_adjust_cancel_defer_depth(n):
current.cancel_defer_depth += n
if current.cancel_defer_depth == 0 and current.cancel_pending:
current.cancel_pending = False
if current.cancelled:
return
_reschedule_task(current, exc=CancelledError("CancelledError"))
else:
ready_appendleft(current)
raise CancelledError("CancelledError")

# Cancel a task
def _trap_cancel_task(_, task):
def _trap_cancel_task(task):
if task == current:
_reschedule_task(current, exc=CurioError("A task can't cancel itself"))
return

if task.cancelled:
ready_appendleft(current)
elif task.cancel_defer_depth > 0:
print("Deferred")
task.cancel_pending = True
ready_appendleft(current)
elif _cancel_task(task, CancelledError):
Expand All @@ -479,26 +473,31 @@ def _trap_cancel_task(_, task):
_reschedule_task(current, exc=_CancelRetry())

# Wait on a queue
def _trap_wait_queue(_, queue, state):
def _trap_wait_queue(queue, state):
queue.append(current)
current.state = state
current.cancel_func = lambda current=current: queue.remove(current)

# Sleep for a specified period. Returns value of monotonic clock.
# absolute flag indicates whether or not an absolute or relative clock
# interval has been provided
def _trap_sleep(_, clock, absolute):
if clock > 0:
if not absolute:
clock += time_monotonic()
_set_timeout(clock, 'sleep')
current.state = 'TIME_SLEEP'
current.cancel_func = lambda task=current: setattr(task, 'sleep', None)
else:
_reschedule_task(current, value=time_monotonic())
def _trap_sleep(clock, absolute):
# We used to have a special case where sleep periods <= 0 would
# simply reschedule the task to the end of the ready queue without
# actually putting it on the sleep queue first. But this meant
# that if a task looped while calling sleep(0), it would allow
# other *ready* tasks to run, but block ever checking for I/O or
# timeouts, so sleeping tasks would never wake up. That's not what
# we want; sleep(0) should mean "please give other stuff a chance
# to run". So now we always go through the whole sleep machinery.
if not absolute:
clock += time_monotonic()
_set_timeout(clock, 'sleep')
current.state = 'TIME_SLEEP'
current.cancel_func = lambda task=current: setattr(task, 'sleep', None)

# Watch signals
def _trap_sigwatch(_, sigset):
def _sync_trap_sigwatch(sigset):
# Initialize the signal handling part of the kernel if not done already
# Note: This only works if running in the main thread
if self._kernel_task_id is None:
Expand All @@ -508,21 +507,19 @@ def _trap_sigwatch(_, sigset):
self._init_signals()

self._signal_watch(sigset)
_reschedule_task(current)

# Unwatch signals
def _trap_sigunwatch(_, sigset):
def _sync_trap_sigunwatch(sigset):
self._signal_unwatch(sigset)
_reschedule_task(current)

# Wait for a signal
def _trap_sigwait(_, sigset):
def _trap_sigwait(sigset):
sigset.waiting = current
current.state = 'SIGNAL_WAIT'
current.cancel_func = lambda: setattr(sigset, 'waiting', None)

# Set a timeout to be delivered to the calling task
def _trap_set_timeout(_, timeout):
def _sync_trap_set_timeout(timeout):
old_timeout = current.timeout
if timeout:
_set_timeout(timeout)
Expand All @@ -531,11 +528,10 @@ def _trap_set_timeout(_, timeout):
else:
current.timeout = None

current.next_value = old_timeout
ready_appendleft(current)
return old_timeout

# Clear a previously set timeout
def _trap_unset_timeout(_, previous):
def _sync_trap_unset_timeout(previous):
# Here's an evil corner case. Suppose the previous timeout in effect
# has already expired? If so, then we need to arrange for a timeout
# to be generated. However, this has to happen on the *next* blocking
Expand All @@ -549,30 +545,27 @@ def _trap_unset_timeout(_, previous):
_set_timeout(previous)
else:
current.timeout = previous
current.next_value = None
ready_appendleft(current)

# Return the running kernel
def _trap_get_kernel(_):
ready_appendleft(current)
current.next_value = self
def _sync_trap_get_kernel():
return self

# Return the currently running task
def _trap_get_current(_):
ready_appendleft(current)
current.next_value = current
def _sync_trap_get_current():
return current

# Return the current value of the kernel clock
def _trap_clock(_):
ready_appendleft(current)
current.next_value = time_monotonic()
def _sync_trap_clock():
return time_monotonic()

# Create the traps tables
traps = [None] * len(Traps)
for trap in Traps:
traps[trap] = locals()[trap.name]

# Create the traps table
trap_funcs = [ val for key, val in locals().items() if key.startswith('_trap') ]
traps = [None]*len(trap_funcs)
for func in trap_funcs:
trapno = getattr(Traps, func.__name__)
traps[trapno] = func
sync_traps = [None] * len(SyncTraps)
for trap in SyncTraps:
sync_traps[trap] = locals()[trap.name]

# If a coroutine was given, add it as the first task
maintask = _new_task(coro) if coro else None
Expand Down Expand Up @@ -647,10 +640,24 @@ def _trap_clock(_):
with _enable_tasklocal_for(current):
if current.next_exc is None:
trap = current._send(current.next_value)
current.next_value = None
else:
trap = current._throw(current.next_exc)
current.next_exc = None

# If the trap is synchronous, then handle it
# immediately without rescheduling. Sync trap handlers
# have a different API than regular traps -- they just
# return or raise whatever the trap should return or
# raise.
while type(trap[0]) is SyncTraps:
try:
next_value = sync_traps[trap[0]](*trap[1:])
except Exception as next_exc:
trap = current._throw(next_exc)
else:
trap = current._send(next_value)

except StopIteration as e:
_cleanup_task(current, value=e.value)

Expand Down Expand Up @@ -681,7 +688,8 @@ def _trap_clock(_):

else:
# Execute the trap
traps[trap[0]](*trap)
assert type(trap[0]) is Traps
traps[trap[0]](*trap[1:])

finally:
# Unregister previous I/O request. Discussion follows:
Expand Down
43 changes: 23 additions & 20 deletions curio/traps.py
Expand Up @@ -33,18 +33,21 @@ class Traps(IntEnum):
_trap_join_task = 5
_trap_wait_queue = 6
_trap_reschedule_tasks = 7
_trap_sigwatch = 8
_trap_sigunwatch = 9
_trap_sigwait = 10
_trap_get_kernel = 11
_trap_get_current = 12
_trap_set_timeout = 13
_trap_unset_timeout = 14
_trap_queue_reschedule_function = 15
_trap_clock = 16
_trap_adjust_cancel_defer_depth = 17
_trap_sigwait = 8

globals().update((key,val) for key, val in vars(Traps).items() if key.startswith('_trap'))
class SyncTraps(IntEnum):
_sync_trap_adjust_cancel_defer_depth = 0
_sync_trap_get_kernel = 1
_sync_trap_get_current = 2
_sync_trap_set_timeout = 3
_sync_trap_unset_timeout = 4
_sync_trap_queue_reschedule_function = 5
_sync_trap_clock = 6
_sync_trap_sigwatch = 7
_sync_trap_sigunwatch = 8

globals().update((trap.name, trap) for trap in Traps)
globals().update((trap.name, trap) for trap in SyncTraps)

@coroutine
def _read_wait(fileobj):
Expand Down Expand Up @@ -103,7 +106,7 @@ def _adjust_cancel_defer_depth(n):
Increment or decrement the current task's cancel_defer_depth. If it goes
to 0, and the task was previously cancelled, then raises CancelledError.
'''
yield (_trap_adjust_cancel_defer_depth, n)
yield (_sync_trap_adjust_cancel_defer_depth, n)

@coroutine
def _join_task(task):
Expand Down Expand Up @@ -131,14 +134,14 @@ def _sigwatch(sigset):
'''
Start monitoring a signal set
'''
yield (_trap_sigwatch, sigset)
yield (_sync_trap_sigwatch, sigset)

@coroutine
def _sigunwatch(sigset):
'''
Stop watching a signal set
'''
yield (_trap_sigunwatch, sigset)
yield (_sync_trap_sigunwatch, sigset)

@coroutine
def _sigwait(sigset):
Expand All @@ -152,29 +155,29 @@ def _get_kernel():
'''
Get the kernel executing the task.
'''
return (yield (_trap_get_kernel,))
return (yield (_sync_trap_get_kernel,))

@coroutine
def _get_current():
'''
Get the currently executing task
'''
return (yield (_trap_get_current,))
return (yield (_sync_trap_get_current,))

@coroutine
def _set_timeout(clock):
'''
Set a timeout for the current task that occurs at the specified clock value.
Setting a clock of None clears any previous timeout.
'''
return (yield (_trap_set_timeout, clock))
return (yield (_sync_trap_set_timeout, clock))

@coroutine
def _unset_timeout(previous):
'''
Restore the previous timeout for the current task.
'''
yield (_trap_unset_timeout, previous)
yield (_sync_trap_unset_timeout, previous)

@coroutine
def _queue_reschedule_function(queue):
Expand All @@ -183,11 +186,11 @@ def _queue_reschedule_function(queue):
the use of await. Can be used in synchronous code as long as it runs
in the same thread as the Curio kernel.
'''
return (yield (_trap_queue_reschedule_function, queue))
return (yield (_sync_trap_queue_reschedule_function, queue))

@coroutine
def _clock():
'''
Return the value of the kernel clock
'''
return (yield (_trap_clock,))
return (yield (_sync_trap_clock,))