forked from tornadoweb/tornado
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Dusty Phillips
committed
Jan 25, 2010
1 parent
393f3bf
commit 1abd2e5
Showing
2 changed files
with
43 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
from psyclone import ioloop | ||
|
||
def pytest_funcarg__IOloop(request): | ||
'''Funcarg to create a ioloop that is not the standard ioloop. This allows | ||
test teardown more easily. It also allows testing of independent | ||
IOLoops.''' | ||
return ioloop.IOLoop() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
from psyclone import ioloop | ||
import os | ||
def test_class_instance(): | ||
'''Ensure two calls to ioloop.instance create only one object.''' | ||
assert not hasattr(ioloop.IOLoop, '_instance') | ||
assert ioloop.IOLoop.instance() == ioloop.IOLoop.instance() | ||
assert hasattr(ioloop.IOLoop, '_instance') | ||
assert ioloop.IOLoop.initialized() | ||
del ioloop.IOLoop._instance | ||
|
||
def test_ioloop_funcarg(IOloop): | ||
'''Ensure the ioloop funcarg isn't from instance''' | ||
assert isinstance(IOloop, ioloop.IOLoop) | ||
assert not hasattr(ioloop.IOLoop, '_instance') | ||
|
||
def test_run_loop(IOloop): | ||
'''Test adding a file descriptor to the IOLoop and it receives events.''' | ||
fdr, fdw = os.pipe() | ||
reader = os.fdopen(fdr, "rb", 0) | ||
writer = os.fdopen(fdw, "wb", 0) | ||
IOloop._set_nonblocking(fdr) | ||
IOloop._set_nonblocking(fdw) | ||
read_values = [] | ||
def read_callback(fd, events): | ||
assert fd == fdr | ||
assert events & ioloop.IOLoop.READ | ||
assert IOloop.running() | ||
val = reader.read() | ||
if val != None: | ||
assert val == b"ho" | ||
IOloop.stop() | ||
def send_val(): | ||
writer.write("ho") | ||
IOloop.add_handler(fdr, read_callback, IOloop.READ) | ||
IOloop.add_callback(send_val) | ||
IOloop.start() |