Skip to content

Commit

Permalink
Increase EventLoopScheduler coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
erikkemperman committed Mar 12, 2019
1 parent 63571fb commit e3a5580
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 20 deletions.
5 changes: 5 additions & 0 deletions rx/concurrency/eventloopscheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ def dispose():

return Disposable(dispose)

def _has_thread(self) -> bool:
"""Checks if there is an event loop thread running."""
with self._condition:
return not self._is_disposed and self._thread is not None

def _ensure_thread(self) -> None:
"""Ensures there is an event loop thread running. Should be
called under the gate."""
Expand Down
182 changes: 162 additions & 20 deletions tests/test_concurrency/test_eventloopscheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,47 @@

from datetime import datetime, timedelta
import threading
from time import sleep
from rx.concurrency import EventLoopScheduler
from rx.internal import DisposedException


class TestEventLoopScheduler(unittest.TestCase):
def test_event_loop_now(self):
scheduler = EventLoopScheduler()
res = scheduler.now - datetime.utcnow()

assert res < timedelta(microseconds=1000)

def test_event_loop_schedule_action(self):
scheduler = EventLoopScheduler(exit_if_empty=True)
ran = [False]
ran = False
gate = threading.Semaphore(0)

def action(scheduler, state):
ran[0] = True
nonlocal ran
ran = True
gate.release()

scheduler.schedule(action)
gate.acquire()
assert (ran[0] is True)
assert ran is True
assert scheduler._has_thread() is False

def test_event_loop_different_thread(self):
thread_id = [None]
thread_id = None
scheduler = EventLoopScheduler(exit_if_empty=True)
gate = threading.Semaphore(0)

def action(scheduler, state):
thread_id[0] = threading.current_thread().ident
nonlocal thread_id
thread_id = threading.current_thread().ident
gate.release()

scheduler.schedule(action)
gate.acquire()
assert (thread_id[0] != threading.current_thread().ident)
assert thread_id != threading.current_thread().ident
assert scheduler._has_thread() is False

def test_event_loop_schedule_ordered_actions(self):
scheduler = EventLoopScheduler(exit_if_empty=True)
Expand All @@ -50,39 +57,174 @@ def action(scheduler, state):

scheduler.schedule(action)
gate.acquire()
assert (result == [1, 2])
assert result == [1, 2]
assert scheduler._has_thread() is False

def test_event_loop_schedule_ordered_actions_due(self):
scheduler = EventLoopScheduler(exit_if_empty=True)
gate = threading.Semaphore(0)
result = []

def action(scheduler, state):
result.append(3)
gate.release()

scheduler.schedule_relative(0.10, action)
scheduler.schedule_relative(0.05, lambda s, t: result.append(2))
scheduler.schedule(lambda s, t: result.append(1))

gate.acquire()
assert result == [1, 2, 3]
assert scheduler._has_thread() is False

def test_event_loop_schedule_ordered_actions_due_mixed(self):
scheduler = EventLoopScheduler(exit_if_empty=True)
gate = threading.Semaphore(0)
result = []

def action(scheduler, state):
result.append(1)
scheduler.schedule(action2)
scheduler.schedule_relative(0.10, action3)
sleep(0.10)

def action2(scheduler, state):
result.append(2)

def action3(scheduler, state):
result.append(3)
gate.release()

scheduler.schedule(action)

gate.acquire()
assert result == [1, 2, 3]
assert scheduler._has_thread() is False

def test_event_loop_schedule_action_due(self):
def test_event_loop_schedule_action_relative_due(self):
scheduler = EventLoopScheduler(exit_if_empty=True)
gate = threading.Semaphore(0)
starttime = datetime.utcnow()
endtime = [None]
endtime = None

def action(scheduler, state):
endtime[0] = datetime.utcnow()
nonlocal endtime
endtime = datetime.utcnow()
gate.release()

scheduler.schedule_relative(timedelta(milliseconds=200), action)
gate.acquire()
diff = endtime[0]-starttime
assert(diff > timedelta(milliseconds=180))
diff = endtime - starttime
assert diff > timedelta(milliseconds=180)
assert scheduler._has_thread() is False

def test_event_loop_schedule_action_absolute_due(self):
scheduler = EventLoopScheduler(exit_if_empty=True)
gate = threading.Semaphore(0)
starttime = datetime.utcnow()
endtime = None

def action(scheduler, state):
nonlocal endtime
endtime = datetime.utcnow()
gate.release()

scheduler.schedule_absolute(scheduler.now, action)
gate.acquire()
diff = endtime - starttime
assert diff < timedelta(milliseconds=180)
assert scheduler._has_thread() is False

def test_eventloop_schedule_action_periodic(self):
scheduler = EventLoopScheduler()
scheduler = EventLoopScheduler(exit_if_empty=False)
gate = threading.Semaphore(0)
period = 0.050
counter = [3]
period = 0.05
counter = 3

def action(state):
nonlocal counter
if state:
counter[0] -= 1
counter -= 1
return state - 1
if counter[0] == 0:
if counter == 0:
gate.release()

scheduler.schedule_periodic(period, action, counter[0])
disp = scheduler.schedule_periodic(period, action, counter)

def done():
assert counter[0] == 0
def dispose(scheduler, state):
disp.dispose()
gate.release()

gate.acquire()
assert counter == 0
assert scheduler._has_thread() is True
scheduler.schedule(dispose)
gate.acquire()
assert scheduler._has_thread() is True
sleep(period)
scheduler.dispose()
sleep(period)
assert scheduler._has_thread() is False

def test_eventloop_schedule_dispose(self):
scheduler = EventLoopScheduler(exit_if_empty=False)

scheduler.dispose()

ran = False

def action(scheduler, state):
nonlocal ran
ran = True

exc = None
try:
scheduler.schedule(action)
except Exception as e:
exc = e
finally:
assert isinstance(exc, DisposedException)
assert ran is False
assert scheduler._has_thread() is False

def test_eventloop_schedule_absolute_dispose(self):
scheduler = EventLoopScheduler(exit_if_empty=False)

scheduler.dispose()

ran = False

def action(scheduler, state):
nonlocal ran
ran = True

exc = None
try:
scheduler.schedule_absolute(scheduler.now, action)
except Exception as e:
exc = e
finally:
assert isinstance(exc, DisposedException)
assert ran is False
assert scheduler._has_thread() is False

def test_eventloop_schedule_periodic_dispose(self):
scheduler = EventLoopScheduler(exit_if_empty=False)

scheduler.dispose()

ran = False

def action(scheduler, state):
nonlocal ran
ran = True

exc = None
try:
scheduler.schedule_periodic(0.1, scheduler.now, action)
except Exception as e:
exc = e
finally:
assert isinstance(exc, DisposedException)
assert ran is False
assert scheduler._has_thread() is False

0 comments on commit e3a5580

Please sign in to comment.