Skip to content

Commit

Permalink
add helper to block on threads finishing
Browse files Browse the repository at this point in the history
  • Loading branch information
cjw296 committed Nov 26, 2018
1 parent b490dd5 commit 71a1d2b
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 0 deletions.
1 change: 1 addition & 0 deletions carly/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
from .clock import cancelDelayedCalls, advanceTime
from .context import Context
from .hook import hook, cleanup, decoder, register
from .threads import waitForThreads
18 changes: 18 additions & 0 deletions carly/threads.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from time import sleep

from twisted.internet import reactor
from twisted.internet.threads import deferToThread

from .clock import withTimeout


def pendingIsEmpty():
while True:
stats = reactor.threadpool._team.statistics()
if not (stats.backloggedWorkCount or stats.busyWorkerCount > 1):
break
sleep(0.001)


def waitForThreads(timeout=None):
return withTimeout(deferToThread(pendingIsEmpty), timeout)
46 changes: 46 additions & 0 deletions tests/test_threads.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from time import sleep

from testfixtures import compare, ShouldRaise
from twisted.internet.defer import inlineCallbacks, TimeoutError
from twisted.internet.task import LoopingCall
from twisted.internet.threads import deferToThread
from twisted.trial.unittest import TestCase

from carly import advanceTime, cancelDelayedCalls, waitForThreads


def setState(state, duration):
sleep(duration)
state.fired += 1


def work(state, duration):
deferToThread(setState, state, duration)


class TestDeferToThread(TestCase):

fired = 0

@inlineCallbacks
def testWorkOne(self):
LoopingCall(work, self, 0.001).start(2)
advanceTime(seconds=0.2)
yield waitForThreads()
compare(self.fired, expected=1)
cancelDelayedCalls()

@inlineCallbacks
def testWorkTwo(self):
LoopingCall(work, self, 0.001).start(2)
advanceTime(seconds=0.2)
yield waitForThreads()
compare(self.fired, expected=1)
cancelDelayedCalls()

@inlineCallbacks
def testTimeout(self):
LoopingCall(work, self, duration=0.1).start(2)
with ShouldRaise(TimeoutError(0.02, 'Deferred')):
yield waitForThreads(timeout=0.02)
cancelDelayedCalls()

0 comments on commit 71a1d2b

Please sign in to comment.