Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ticket97 loop arg #98

Merged
merged 8 commits into from
Apr 8, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions test/test_batched_timers_aio.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,33 @@ def foo(*args, **kw):
assert calls[2] == (("third call", ), dict())


def test_batched_successful_call_explicit_loop(framework_aio):
'''
batched calls really happen in batches
'''
# Trollius doesn't come with this, so won't work on py2
pytest.importorskip('asyncio.test_utils')
from asyncio.test_utils import TestLoop

def time_gen():
yield
yield
new_loop = TestLoop(time_gen)
calls = []

def foo(*args, **kw):
calls.append((args, kw))

txa = txaio.with_config(loop=new_loop)

batched = txa.make_batched_timer(5)

batched.call_later(1, foo, "first call")
new_loop.advance_time(2.0)
new_loop._run_once()
assert len(calls) == 1


def test_batched_cancel(framework_aio):
'''
we can cancel uncalled call_laters
Expand All @@ -88,7 +115,6 @@ def test_batched_cancel(framework_aio):
def time_gen():
yield
yield
yield
new_loop = TestLoop(time_gen)
calls = []

Expand Down Expand Up @@ -123,7 +149,6 @@ def test_batched_cancel_too_late(framework_aio):
def time_gen():
yield
yield
yield
new_loop = TestLoop(time_gen)
calls = []

Expand Down
93 changes: 92 additions & 1 deletion test/test_call_later.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import pytest
import txaio
from txaio.testutil import replace_loop
from util import run_once


def test_default_reactor(framework_tx):
Expand Down Expand Up @@ -64,6 +65,97 @@ def test_explicit_reactor_future(framework):
assert c[0] == 'call_soon'


def test_create_future_explicit_loop(framework):
"""
process events on alternate loop= for create_future later
"""
pytest.importorskip('asyncio')
if txaio.using_twisted:
pytest.skip()

import asyncio

alt_loop = asyncio.new_event_loop()

txa = txaio.with_config(loop=alt_loop)
f = txa.create_future()

results = []
f.add_done_callback(lambda r: results.append(r.result()))

assert results == []
txaio.resolve(f, 'some result')

# run_once() runs the txaio.config.loop so we shouldn't get any
# results until we spin alt_loop
assert results == []
run_once()
assert results == []
with replace_loop(alt_loop):
run_once()
assert results == ['some result']


def test_create_future_success_explicit_loop(framework):
"""
process events on alternate loop= for create_future later
"""
pytest.importorskip('asyncio')
if txaio.using_twisted:
pytest.skip()

import asyncio
alt_loop = asyncio.new_event_loop()
txa = txaio.with_config(loop=alt_loop)

f = txa.create_future_success('some result')

results = []
f.add_done_callback(lambda r: results.append(r.result()))

# run_once() runs the txaio.config.loop so we shouldn't get any
# results until we spin alt_loop
assert results == []
run_once()
assert results == []
with replace_loop(alt_loop):
run_once()
assert results == ['some result']


def test_create_future_failure_explicit_loop(framework):
"""
process events on alternate loop= for create_future later
"""
pytest.importorskip('asyncio')
if txaio.using_twisted:
pytest.skip()

import asyncio
alt_loop = asyncio.new_event_loop()
the_exception = Exception('bad')
txa = txaio.with_config(loop=alt_loop)
f = txa.create_future_error(the_exception)

results = []

def boom(r):
try:
results.append(r.result())
except Exception as e:
results.append(e)
f.add_done_callback(boom)

# run_once() runs the txaio.config.loop so we shouldn't get any
# results until we spin alt_loop
assert results == []
run_once()
assert results == []
with replace_loop(alt_loop):
run_once()
assert results == [the_exception]


def test_explicit_reactor_coroutine(framework):
"""
If we set an event-loop, Futures + Tasks should use it.
Expand Down Expand Up @@ -122,7 +214,6 @@ def time_gen():
# even though we only do one call, I guess TestLoop needs
# a "trailing" yield? "or something"
when = yield 0
print("Hmmm", when)
from asyncio.test_utils import TestLoop
new_loop = TestLoop(time_gen)

Expand Down
2 changes: 1 addition & 1 deletion test/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def run_once():
try:
import asyncio
from asyncio.test_utils import run_once as _run_once
return _run_once(asyncio.get_event_loop())
return _run_once(txaio.config.loop or asyncio.get_event_loop())

except ImportError:
import trollius as asyncio
Expand Down
3 changes: 2 additions & 1 deletion txaio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
# see aio.py for asyncio/trollius implementation


class _Config:
class _Config(object):
"""
This holds all valid configuration options, accessed as
class-level variables. For example, if you were using asyncio:
Expand All @@ -58,6 +58,7 @@ class _Config:


__all__ = (
'with_config', # allow mutliple custom configurations at once
'using_twisted', # True if we're using Twisted
'using_asyncio', # True if we're using asyncio
'use_twisted', # sets the library to use Twisted, or exception
Expand Down
3 changes: 2 additions & 1 deletion txaio/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class _BatchedTimer(IBatchedTimer):
"""

def __init__(self, bucket_milliseconds, chunk_size,
seconds_provider, delayed_call_creator):
seconds_provider, delayed_call_creator, loop=None):
if bucket_milliseconds <= 0.0:
raise ValueError(
"bucket_milliseconds must be > 0.0"
Expand All @@ -48,6 +48,7 @@ def __init__(self, bucket_milliseconds, chunk_size,
self._get_seconds = seconds_provider
self._create_delayed_call = delayed_call_creator
self._buckets = dict() # real seconds -> (IDelayedCall, list)
self._loop = loop

def call_later(self, delay, func, *args, **kwargs):
"""
Expand Down
1 change: 1 addition & 0 deletions txaio/_unframework.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def _throw_usage_error(*args, **kw):


# all the txaio API methods just raise the error
with_config = _throw_usage_error
create_future = _throw_usage_error
create_future_success = _throw_usage_error
create_future_error = _throw_usage_error
Expand Down
Loading