Skip to content

Commit

Permalink
Polish schedulers (typing, docstrings)
Browse files Browse the repository at this point in the history
  • Loading branch information
erikkemperman committed Mar 12, 2019
1 parent 37de367 commit b96bef5
Show file tree
Hide file tree
Showing 12 changed files with 532 additions and 265 deletions.
107 changes: 71 additions & 36 deletions rx/concurrency/mainloopscheduler/asyncioscheduler.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,71 @@
import logging
import asyncio
from datetime import datetime

from concurrent.futures import Future
from datetime import datetime
from typing import Optional

from rx.disposable import Disposable
from rx.core import typing
from rx.disposable import SingleAssignmentDisposable, CompositeDisposable
from rx.concurrency.schedulerbase import SchedulerBase
from rx.disposable import CompositeDisposable, Disposable, SingleAssignmentDisposable

from ..schedulerbase import SchedulerBase


log = logging.getLogger("Rx")


class AsyncIOScheduler(SchedulerBase):
"""A scheduler that schedules work via the asyncio mainloop."""

def __init__(self, loop=None, threadsafe=False):
self.loop = loop or asyncio.get_event_loop()
self.threadsafe = threadsafe
def __init__(self,
loop: Optional[asyncio.AbstractEventLoop] = None,
threadsafe: bool = False
) -> None:
self.loop: asyncio.AbstractEventLoop = loop or asyncio.get_event_loop()
self.threadsafe: bool = threadsafe

def schedule(self,
action: typing.ScheduledAction,
state: Optional[typing.TState] = None
) -> typing.Disposable:
"""Schedules an action to be executed.
Args:
action: Action to be executed.
state: [Optional] state to be given to the action function.
def schedule(self, action: typing.ScheduledAction, state: typing.TState = None) -> typing.Disposable:
"""Schedules an action to be executed."""
Returns:
The disposable object used to cancel the scheduled action
(best effort).
"""
if self.threadsafe is False:
return self._schedule(action, state)
return self._schedule(action, state=state)

return self._schedule_threadsafe(action, state)
return self._schedule_threadsafe(action, state=state)

def _schedule(self, action: typing.ScheduledAction, state: typing.TState = None) -> typing.Disposable:
def _schedule(self,
action: typing.ScheduledAction,
state: Optional[typing.TState] = None
) -> typing.Disposable:
sad = SingleAssignmentDisposable()

def interval():
sad.disposable = self.invoke_action(action, state)
sad.disposable = self.invoke_action(action, state=state)
handle = self.loop.call_soon(interval)

def dispose():
handle.cancel()

return CompositeDisposable(sad, Disposable(dispose))

def _schedule_threadsafe(self, action, state=None):
def _schedule_threadsafe(self,
action: typing.ScheduledAction,
state: Optional[typing.TState] = None
) -> typing.Disposable:
sad = SingleAssignmentDisposable()

def interval():
sad.disposable = self.invoke_action(action, state)
sad.disposable = self.invoke_action(action, state=state)

handle = self.loop.call_soon_threadsafe(interval)

Expand All @@ -57,29 +81,35 @@ def cancel_handle():

return CompositeDisposable(sad, Disposable(dispose))

def schedule_relative(self, duetime: typing.RelativeTime, action: typing.ScheduledAction,
state: typing.TState = None) -> typing.Disposable:
"""Schedules an action to be executed at duetime.
def schedule_relative(self,
duetime: typing.RelativeTime,
action: typing.ScheduledAction,
state: Optional[typing.TState] = None
) -> typing.Disposable:
"""Schedules an action to be executed after duetime.
Args:
duetime: Relative time after which to execute the
action.
duetime: Relative time after which to execute the action.
action: Action to be executed.
state: [Optional] state to be given to the action function.
Returns:
The disposable object used to cancel the scheduled action
(best effort).
"""
if self.threadsafe is False:
return self._schedule_relative(duetime, action, state)
return self._schedule_relative(duetime, action, state=state)
else:
return self._schedule_relative_threadsafe(duetime, action, state)
return self._schedule_relative_threadsafe(duetime, action, state=state)

def _schedule_relative(self, duetime, action, state=None):
scheduler = self
def _schedule_relative(self,
duetime: typing.RelativeTime,
action: typing.ScheduledAction,
state: Optional[typing.TState] = None
) -> typing.Disposable:
seconds = self.to_seconds(duetime)
if seconds == 0:
return scheduler.schedule(action, state)
return self.schedule(action, state)

sad = SingleAssignmentDisposable()

Expand All @@ -93,16 +123,19 @@ def dispose():

return CompositeDisposable(sad, Disposable(dispose))

def _schedule_relative_threadsafe(self, duetime, action, state=None):
scheduler = self
def _schedule_relative_threadsafe(self,
duetime: typing.RelativeTime,
action: typing.ScheduledAction,
state: Optional[typing.TState] = None
) -> typing.Disposable:
seconds = self.to_seconds(duetime)
if seconds == 0:
return scheduler.schedule(action, state)
return self.schedule(action, state=state)

sad = SingleAssignmentDisposable()

def interval():
sad.disposable = self.invoke_action(action, state)
sad.disposable = self.invoke_action(action, state=state)

# the operations on the list used here are atomic, so there is no
# need to protect its access with a lock
Expand All @@ -129,23 +162,25 @@ def cancel_handle():

return CompositeDisposable(sad, Disposable(dispose))

def schedule_absolute(self, duetime: typing.AbsoluteTime, action: typing.ScheduledAction,
state: typing.TState = None) -> typing.Disposable:
def schedule_absolute(self,
duetime: typing.AbsoluteTime,
action: typing.ScheduledAction,
state: Optional[typing.TState] = None
) -> typing.Disposable:
"""Schedules an action to be executed at duetime.
Args:
duetime: Absolute time after which to execute the
action.
duetime: Absolute time after which to execute the action.
action: Action to be executed.
state: Optional state to be given to the action function.
state: [Optional] state to be given to the action function.
Returns:
The disposable object used to cancel the scheduled action
(best effort).
"""

duetime = self.to_datetime(duetime)
return self.schedule_relative(duetime - self.now, action, state)
return self.schedule_relative(duetime - self.now, action, state=state)

@property
def now(self) -> datetime:
Expand All @@ -154,4 +189,4 @@ def now(self) -> datetime:
property.
"""

return self.to_datetime(self.loop.time()*1000)
return self.to_datetime(self.loop.time() * 1000)
67 changes: 45 additions & 22 deletions rx/concurrency/mainloopscheduler/eventletscheduler.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
import logging
from typing import Any

from datetime import datetime
from typing import Optional

from rx.disposable import Disposable
from rx.core import typing
from rx.disposable import SingleAssignmentDisposable, CompositeDisposable
from rx.concurrency.schedulerbase import SchedulerBase
from rx.disposable import CompositeDisposable, Disposable, SingleAssignmentDisposable

from ..schedulerbase import SchedulerBase


log = logging.getLogger("Rx")


eventlet = None


Expand All @@ -24,29 +27,44 @@ def __init__(self) -> None:
import eventlet
import eventlet.hubs

def schedule(self,
action: typing.ScheduledAction,
state: Optional[typing.TState] = None
) -> typing.Disposable:
"""Schedules an action to be executed.
Args:
action: Action to be executed.
state: [Optional] state to be given to the action function.
def schedule(self, action: typing.ScheduledAction, state: Any = None) -> typing.Disposable:
"""Schedules an action to be executed."""
Returns:
The disposable object used to cancel the scheduled action
(best effort).
"""

sad = SingleAssignmentDisposable()

def interval():
sad.disposable = self.invoke_action(action, state)
sad.disposable = self.invoke_action(action, state=state)

timer = [eventlet.spawn(interval)]
timer = eventlet.spawn(interval)

def dispose():
timer[0].kill()
timer.kill()

return CompositeDisposable(sad, Disposable(dispose))

def schedule_relative(self, duetime: typing.RelativeTime, action: typing.ScheduledAction,
state: typing.TState = None) -> typing.Disposable:
def schedule_relative(self,
duetime: typing.RelativeTime,
action: typing.ScheduledAction,
state: Optional[typing.TState] = None
) -> typing.Disposable:
"""Schedules an action to be executed after duetime.
Args:
duetime: Relative time after which to execute the action.
action: Action to be executed.
state: [Optional] state to be given to the action function.
Returns:
The disposable object used to cancel the scheduled action
Expand All @@ -55,40 +73,45 @@ def schedule_relative(self, duetime: typing.RelativeTime, action: typing.Schedul

seconds = self.to_seconds(duetime)
if not seconds:
return self.schedule(action, state)
return self.schedule(action, state=state)

sad = SingleAssignmentDisposable()

def interval():
sad.disposable = self.invoke_action(action, state)
sad.disposable = self.invoke_action(action, state=state)

timer = [eventlet.spawn_after(seconds, interval)]
timer = eventlet.spawn_after(seconds, interval)

def dispose():
# nonlocal timer
timer[0].kill()
timer.kill()

return CompositeDisposable(sad, Disposable(dispose))

def schedule_absolute(self, duetime: typing.AbsoluteTime, action: typing.ScheduledAction,
state: typing.TState = None) -> typing.Disposable:
def schedule_absolute(self,
duetime: typing.AbsoluteTime,
action: typing.ScheduledAction,
state: Optional[typing.TState] = None
) -> typing.Disposable:
"""Schedules an action to be executed at duetime.
Args:
duetime: Absolute time after which to execute the action.
action: Action to be executed.
state: [Optional] state to be given to the action function.
Returns:
The disposable object used to cancel the scheduled action
(best effort)."""
(best effort).
"""

duetime = self.to_datetime(duetime)
return self.schedule_relative(duetime - self.now, action, state)
return self.schedule_relative(duetime - self.now, action, state=state)

@property
def now(self) -> datetime:
"""Represents a notion of time for this scheduler. Tasks being
scheduled on a scheduler will adhere to the time denoted by
this property."""
scheduled on a scheduler will adhere to the time denoted by this
property.
"""

return self.to_datetime(eventlet.hubs.get_hub().clock())

0 comments on commit b96bef5

Please sign in to comment.