diff --git a/tests/events_test.py b/tests/events_test.py deleted file mode 100644 index e28b2c3..0000000 --- a/tests/events_test.py +++ /dev/null @@ -1,1637 +0,0 @@ -"""Tests for events.py.""" - -import functools -import gc -import io -import os -import platform -import signal -import socket -try: - import ssl -except ImportError: - ssl = None - HAS_SNI = False -else: - from ssl import HAS_SNI -import subprocess -import sys -import threading -import time -import errno -import unittest - -from aiozmq._test_util import (find_unused_port, - IPV6_ENABLED, - requires_mac_ver, - requires_freebsd_version) - - -import asyncio -from asyncio import selector_events -from asyncio import test_utils - - -import aiozmq - -if sys.platform == 'win32': - raise unittest.SkipTest("don't check") - - -def data_file(filename): - fullname = os.path.join(os.path.dirname(__file__), filename) - if os.path.isfile(fullname): - return fullname - raise FileNotFoundError(filename) - - -def osx_tiger(): - """Return True if the platform is Mac OS 10.4 or older.""" - if sys.platform != 'darwin': - return False - version = platform.mac_ver()[0] - version = tuple(map(int, version.split('.'))) - return version < (10, 5) - - -ONLYCERT = data_file('ssl_cert.pem') -ONLYKEY = data_file('ssl_key.pem') -SIGNED_CERTFILE = data_file('keycert3.pem') -SIGNING_CA = data_file('pycacert.pem') - - -class MyBaseProto(asyncio.Protocol): - connected = None - done = None - - def __init__(self, loop=None): - self.transport = None - self.state = 'INITIAL' - self.nbytes = 0 - if loop is not None: - self.connected = asyncio.Future(loop=loop) - self.done = asyncio.Future(loop=loop) - - def connection_made(self, transport): - self.transport = transport - assert self.state == 'INITIAL', self.state - self.state = 'CONNECTED' - if self.connected: - self.connected.set_result(None) - - def data_received(self, data): - assert self.state == 'CONNECTED', self.state - self.nbytes += len(data) - - def eof_received(self): - assert self.state == 'CONNECTED', self.state - self.state = 'EOF' - - def connection_lost(self, exc): - assert self.state in ('CONNECTED', 'EOF'), self.state - self.state = 'CLOSED' - if self.done: - self.done.set_result(None) - - -class MyProto(MyBaseProto): - def connection_made(self, transport): - super().connection_made(transport) - transport.write(b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n') - - -class MyDatagramProto(asyncio.DatagramProtocol): - done = None - - def __init__(self, loop=None): - self.state = 'INITIAL' - self.nbytes = 0 - if loop is not None: - self.done = asyncio.Future(loop=loop) - - def connection_made(self, transport): - self.transport = transport - assert self.state == 'INITIAL', self.state - self.state = 'INITIALIZED' - - def datagram_received(self, data, addr): - assert self.state == 'INITIALIZED', self.state - self.nbytes += len(data) - - def error_received(self, exc): - assert self.state == 'INITIALIZED', self.state - - def connection_lost(self, exc): - assert self.state == 'INITIALIZED', self.state - self.state = 'CLOSED' - if self.done: - self.done.set_result(None) - - -class MyReadPipeProto(asyncio.Protocol): - done = None - - def __init__(self, loop=None): - self.state = ['INITIAL'] - self.nbytes = 0 - self.transport = None - if loop is not None: - self.done = asyncio.Future(loop=loop) - - def connection_made(self, transport): - self.transport = transport - assert self.state == ['INITIAL'], self.state - self.state.append('CONNECTED') - - def data_received(self, data): - assert self.state == ['INITIAL', 'CONNECTED'], self.state - self.nbytes += len(data) - - def eof_received(self): - assert self.state == ['INITIAL', 'CONNECTED'], self.state - self.state.append('EOF') - - def connection_lost(self, exc): - if 'EOF' not in self.state: - self.state.append('EOF') # It is okay if EOF is missed. - assert self.state == ['INITIAL', 'CONNECTED', 'EOF'], self.state - self.state.append('CLOSED') - if self.done: - self.done.set_result(None) - - -class MyWritePipeProto(asyncio.BaseProtocol): - done = None - - def __init__(self, loop=None): - self.state = 'INITIAL' - self.transport = None - if loop is not None: - self.done = asyncio.Future(loop=loop) - - def connection_made(self, transport): - self.transport = transport - assert self.state == 'INITIAL', self.state - self.state = 'CONNECTED' - - def connection_lost(self, exc): - assert self.state == 'CONNECTED', self.state - self.state = 'CLOSED' - if self.done: - self.done.set_result(None) - - -class MySubprocessProtocol(asyncio.SubprocessProtocol): - - def __init__(self, loop): - self.state = 'INITIAL' - self.transport = None - self.connected = asyncio.Future(loop=loop) - self.completed = asyncio.Future(loop=loop) - self.disconnects = {fd: asyncio.Future(loop=loop) for fd in range(3)} - self.data = {1: b'', 2: b''} - self.returncode = None - self.got_data = {1: asyncio.Event(loop=loop), - 2: asyncio.Event(loop=loop)} - - def connection_made(self, transport): - self.transport = transport - assert self.state == 'INITIAL', self.state - self.state = 'CONNECTED' - self.connected.set_result(None) - - def connection_lost(self, exc): - assert self.state == 'CONNECTED', self.state - self.state = 'CLOSED' - self.completed.set_result(None) - - def pipe_data_received(self, fd, data): - assert self.state == 'CONNECTED', self.state - self.data[fd] += data - self.got_data[fd].set() - - def pipe_connection_lost(self, fd, exc): - assert self.state == 'CONNECTED', self.state - if exc: - self.disconnects[fd].set_exception(exc) - else: - self.disconnects[fd].set_result(exc) - - def process_exited(self): - assert self.state == 'CONNECTED', self.state - self.returncode = self.transport.get_returncode() - - -class EventLoopTestsMixin: - - def setUp(self): - super().setUp() - self.loop = self.create_event_loop() - asyncio.set_event_loop(None) - - def tearDown(self): - # just in case if we have transport close callbacks - test_utils.run_briefly(self.loop) - - self.loop.close() - gc.collect() - asyncio.set_event_loop(None) - super().tearDown() - # zmq.Context.instance().term() - - def test_run_until_complete_nesting(self): - @asyncio.coroutine - def coro1(): - yield - - @asyncio.coroutine - def coro2(): - self.assertTrue(self.loop.is_running()) - self.loop.run_until_complete(coro1()) - - self.assertRaises( - RuntimeError, self.loop.run_until_complete, coro2()) - - # Note: because of the default Windows timing granularity of - # 15.6 msec, we use fairly long sleep times here (~100 msec). - - def test_run_until_complete(self): - t0 = self.loop.time() - self.loop.run_until_complete(asyncio.sleep(0.1, loop=self.loop)) - t1 = self.loop.time() - self.assertTrue(0.08 <= t1-t0 <= 0.8, t1-t0) - - def test_run_until_complete_stopped(self): - @asyncio.coroutine - def cb(): - self.loop.stop() - yield from asyncio.sleep(0.1, loop=self.loop) - task = cb() - self.assertRaises(RuntimeError, - self.loop.run_until_complete, task) - - def test_call_later(self): - results = [] - - def callback(arg): - results.append(arg) - self.loop.stop() - - self.loop.call_later(0.1, callback, 'hello world') - t0 = time.monotonic() - self.loop.run_forever() - t1 = time.monotonic() - self.assertEqual(results, ['hello world']) - self.assertTrue(0.08 <= t1-t0 <= 0.8, t1-t0) - - def test_call_soon(self): - results = [] - - def callback(arg1, arg2): - results.append((arg1, arg2)) - self.loop.stop() - - self.loop.call_soon(callback, 'hello', 'world') - self.loop.run_forever() - self.assertEqual(results, [('hello', 'world')]) - - def test_call_soon_threadsafe(self): - results = [] - lock = threading.Lock() - - def callback(arg): - results.append(arg) - if len(results) >= 2: - self.loop.stop() - - def run_in_thread(): - self.loop.call_soon_threadsafe(callback, 'hello') - lock.release() - - lock.acquire() - t = threading.Thread(target=run_in_thread) - t.start() - - with lock: - self.loop.call_soon(callback, 'world') - self.loop.run_forever() - t.join() - self.assertEqual(results, ['hello', 'world']) - - def test_call_soon_threadsafe_same_thread(self): - results = [] - - def callback(arg): - results.append(arg) - if len(results) >= 2: - self.loop.stop() - - self.loop.call_soon_threadsafe(callback, 'hello') - self.loop.call_soon(callback, 'world') - self.loop.run_forever() - self.assertEqual(results, ['hello', 'world']) - - def test_run_in_executor(self): - def run(arg): - return (arg, threading.get_ident()) - f2 = self.loop.run_in_executor(None, run, 'yo') - res, thread_id = self.loop.run_until_complete(f2) - self.assertEqual(res, 'yo') - self.assertNotEqual(thread_id, threading.get_ident()) - - def test_reader_callback(self): - r, w = test_utils.socketpair() - r.setblocking(False) - bytes_read = bytearray() - - def reader(): - try: - data = r.recv(1024) - except BlockingIOError: - # Spurious readiness notifications are possible - # at least on Linux -- see man select. - return - if data: - bytes_read.extend(data) - else: - self.assertTrue(self.loop.remove_reader(r.fileno())) - r.close() - - self.loop.add_reader(r.fileno(), reader) - self.loop.call_soon(w.send, b'abc') - test_utils.run_until(self.loop, lambda: len(bytes_read) >= 3) - self.loop.call_soon(w.send, b'def') - test_utils.run_until(self.loop, lambda: len(bytes_read) >= 6) - self.loop.call_soon(w.close) - self.loop.call_soon(self.loop.stop) - self.loop.run_forever() - self.assertEqual(bytes_read, b'abcdef') - - def test_writer_callback(self): - r, w = test_utils.socketpair() - w.setblocking(False) - - def writer(data): - w.send(data) - self.loop.stop() - - data = b'x' * 1024 - self.loop.add_writer(w.fileno(), writer, data) - self.loop.run_forever() - - self.assertTrue(self.loop.remove_writer(w.fileno())) - self.assertFalse(self.loop.remove_writer(w.fileno())) - - w.close() - read = r.recv(len(data) * 2) - r.close() - self.assertEqual(read, data) - - def _basetest_sock_client_ops(self, httpd, sock): - sock.setblocking(False) - self.loop.run_until_complete( - self.loop.sock_connect(sock, httpd.address)) - self.loop.run_until_complete( - self.loop.sock_sendall(sock, b'GET / HTTP/1.0\r\n\r\n')) - data = self.loop.run_until_complete( - self.loop.sock_recv(sock, 1024)) - # consume data - self.loop.run_until_complete( - self.loop.sock_recv(sock, 1024)) - sock.close() - self.assertTrue(data.startswith(b'HTTP/1.0 200 OK')) - - def test_sock_client_ops(self): - with test_utils.run_test_server() as httpd: - sock = socket.socket() - self._basetest_sock_client_ops(httpd, sock) - - @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets') - def test_unix_sock_client_ops(self): - with test_utils.run_test_unix_server() as httpd: - sock = socket.socket(socket.AF_UNIX) - self._basetest_sock_client_ops(httpd, sock) - - def test_sock_client_fail(self): - # Make sure that we will get an unused port - address = None - try: - s = socket.socket() - s.bind(('127.0.0.1', 0)) - address = s.getsockname() - finally: - s.close() - - sock = socket.socket() - sock.setblocking(False) - with self.assertRaises(ConnectionRefusedError): - self.loop.run_until_complete( - self.loop.sock_connect(sock, address)) - sock.close() - - def test_sock_accept(self): - listener = socket.socket() - listener.setblocking(False) - listener.bind(('127.0.0.1', 0)) - listener.listen(1) - client = socket.socket() - client.connect(listener.getsockname()) - - f = self.loop.sock_accept(listener) - conn, addr = self.loop.run_until_complete(f) - self.assertEqual(conn.gettimeout(), 0) - self.assertEqual(addr, client.getsockname()) - self.assertEqual(client.getpeername(), listener.getsockname()) - client.close() - conn.close() - listener.close() - - @unittest.skipUnless(hasattr(signal, 'SIGKILL'), 'No SIGKILL') - def test_add_signal_handler(self): - caught = 0 - - def my_handler(): - nonlocal caught - caught += 1 - - # Check error behavior first. - self.assertRaises( - TypeError, self.loop.add_signal_handler, 'boom', my_handler) - self.assertRaises( - TypeError, self.loop.remove_signal_handler, 'boom') - self.assertRaises( - ValueError, self.loop.add_signal_handler, signal.NSIG+1, - my_handler) - self.assertRaises( - ValueError, self.loop.remove_signal_handler, signal.NSIG+1) - self.assertRaises( - ValueError, self.loop.add_signal_handler, 0, my_handler) - self.assertRaises( - ValueError, self.loop.remove_signal_handler, 0) - self.assertRaises( - ValueError, self.loop.add_signal_handler, -1, my_handler) - self.assertRaises( - ValueError, self.loop.remove_signal_handler, -1) - self.assertRaises( - RuntimeError, self.loop.add_signal_handler, signal.SIGKILL, - my_handler) - # Removing SIGKILL doesn't raise, since we don't call signal(). - self.assertFalse(self.loop.remove_signal_handler(signal.SIGKILL)) - # Now set a handler and handle it. - self.loop.add_signal_handler(signal.SIGINT, my_handler) - - os.kill(os.getpid(), signal.SIGINT) - test_utils.run_until(self.loop, lambda: caught) - - # Removing it should restore the default handler. - self.assertTrue(self.loop.remove_signal_handler(signal.SIGINT)) - self.assertEqual(signal.getsignal(signal.SIGINT), - signal.default_int_handler) - # Removing again returns False. - self.assertFalse(self.loop.remove_signal_handler(signal.SIGINT)) - - @unittest.skipUnless(hasattr(signal, 'SIGALRM'), 'No SIGALRM') - def test_signal_handling_while_selecting(self): - # Test with a signal actually arriving during a select() call. - caught = 0 - - def my_handler(): - nonlocal caught - caught += 1 - self.loop.stop() - - self.loop.add_signal_handler(signal.SIGALRM, my_handler) - - signal.setitimer(signal.ITIMER_REAL, 0.01, 0) # Send SIGALRM once. - self.loop.run_forever() - self.assertEqual(caught, 1) - - @unittest.skipUnless(hasattr(signal, 'SIGALRM'), 'No SIGALRM') - def test_signal_handling_args(self): - some_args = (42,) - caught = 0 - - def my_handler(*args): - nonlocal caught - caught += 1 - self.assertEqual(args, some_args) - - self.loop.add_signal_handler(signal.SIGALRM, my_handler, *some_args) - - signal.setitimer(signal.ITIMER_REAL, 0.1, 0) # Send SIGALRM once. - self.loop.call_later(0.5, self.loop.stop) - self.loop.run_forever() - self.assertEqual(caught, 1) - - def _basetest_create_connection(self, connection_fut, check_sockname=True): - tr, pr = self.loop.run_until_complete(connection_fut) - self.assertIsInstance(tr, asyncio.Transport) - self.assertIsInstance(pr, asyncio.Protocol) - if check_sockname: - self.assertIsNotNone(tr.get_extra_info('sockname')) - self.loop.run_until_complete(pr.done) - self.assertGreater(pr.nbytes, 0) - tr.close() - - def test_create_connection(self): - with test_utils.run_test_server() as httpd: - conn_fut = self.loop.create_connection( - lambda: MyProto(loop=self.loop), *httpd.address) - self._basetest_create_connection(conn_fut) - - @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets') - def test_create_unix_connection(self): - # Issue #20682: On Mac OS X Tiger, getsockname() returns a - # zero-length address for UNIX socket. - check_sockname = not osx_tiger() - - with test_utils.run_test_unix_server() as httpd: - conn_fut = self.loop.create_unix_connection( - lambda: MyProto(loop=self.loop), httpd.address) - self._basetest_create_connection(conn_fut, check_sockname) - - def test_create_connection_sock(self): - with test_utils.run_test_server() as httpd: - sock = None - infos = self.loop.run_until_complete( - self.loop.getaddrinfo( - *httpd.address, type=socket.SOCK_STREAM)) - for family, type, proto, cname, address in infos: - try: - sock = socket.socket(family=family, type=type, proto=proto) - sock.setblocking(False) - self.loop.run_until_complete( - self.loop.sock_connect(sock, address)) - except: - pass - else: - break - else: - assert False, 'Can not create socket.' - - f = self.loop.create_connection( - lambda: MyProto(loop=self.loop), sock=sock) - tr, pr = self.loop.run_until_complete(f) - self.assertIsInstance(tr, asyncio.Transport) - self.assertIsInstance(pr, asyncio.Protocol) - self.loop.run_until_complete(pr.done) - self.assertGreater(pr.nbytes, 0) - tr.close() - - def _basetest_create_ssl_connection(self, connection_fut, - check_sockname=True): - tr, pr = self.loop.run_until_complete(connection_fut) - self.assertIsInstance(tr, asyncio.Transport) - self.assertIsInstance(pr, asyncio.Protocol) - self.assertTrue('ssl' in tr.__class__.__name__.lower()) - if check_sockname: - self.assertIsNotNone(tr.get_extra_info('sockname')) - self.loop.run_until_complete(pr.done) - self.assertGreater(pr.nbytes, 0) - tr.close() - - @unittest.skipIf(ssl is None, 'No ssl module') - @unittest.expectedFailure - def test_create_ssl_connection(self): - with test_utils.run_test_server(use_ssl=True) as httpd: - conn_fut = self.loop.create_connection( - lambda: MyProto(loop=self.loop), - *httpd.address, - ssl=test_utils.dummy_ssl_context()) - - self._basetest_create_ssl_connection(conn_fut) - - @unittest.skipIf(ssl is None, 'No ssl module') - @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets') - @unittest.expectedFailure - def test_create_ssl_unix_connection(self): - # Issue #20682: On Mac OS X Tiger, getsockname() returns a - # zero-length address for UNIX socket. - check_sockname = not osx_tiger() - - with test_utils.run_test_unix_server(use_ssl=True) as httpd: - conn_fut = self.loop.create_unix_connection( - lambda: MyProto(loop=self.loop), - httpd.address, - ssl=test_utils.dummy_ssl_context(), - server_hostname='127.0.0.1') - - self._basetest_create_ssl_connection(conn_fut, check_sockname) - - def test_create_connection_local_addr(self): - with test_utils.run_test_server() as httpd: - port = find_unused_port() - f = self.loop.create_connection( - lambda: MyProto(loop=self.loop), - *httpd.address, local_addr=(httpd.address[0], port)) - tr, pr = self.loop.run_until_complete(f) - expected = pr.transport.get_extra_info('sockname')[1] - self.assertEqual(port, expected) - tr.close() - - def test_create_connection_local_addr_in_use(self): - with test_utils.run_test_server() as httpd: - f = self.loop.create_connection( - lambda: MyProto(loop=self.loop), - *httpd.address, local_addr=httpd.address) - with self.assertRaises(OSError) as cm: - self.loop.run_until_complete(f) - self.assertEqual(cm.exception.errno, errno.EADDRINUSE) - self.assertIn(str(httpd.address), cm.exception.strerror) - - def test_create_server(self): - proto = MyProto(self.loop) - f = self.loop.create_server(lambda: proto, '0.0.0.0', 0) - server = self.loop.run_until_complete(f) - self.assertEqual(len(server.sockets), 1) - sock = server.sockets[0] - host, port = sock.getsockname() - self.assertEqual(host, '0.0.0.0') - client = socket.socket() - client.connect(('127.0.0.1', port)) - client.sendall(b'xxx') - - self.loop.run_until_complete(proto.connected) - self.assertEqual('CONNECTED', proto.state) - - test_utils.run_until(self.loop, lambda: proto.nbytes > 0) - self.assertEqual(3, proto.nbytes) - - # extra info is available - self.assertIsNotNone(proto.transport.get_extra_info('sockname')) - self.assertEqual('127.0.0.1', - proto.transport.get_extra_info('peername')[0]) - - # close connection - proto.transport.close() - self.loop.run_until_complete(proto.done) - - self.assertEqual('CLOSED', proto.state) - - # the client socket must be closed after to avoid ECONNRESET upon - # recv()/send() on the serving socket - client.close() - - # close server - server.close() - - def _make_unix_server(self, factory, **kwargs): - path = test_utils.gen_unix_socket_path() - self.addCleanup(lambda: os.path.exists(path) and os.unlink(path)) - - f = self.loop.create_unix_server(factory, path, **kwargs) - server = self.loop.run_until_complete(f) - - return server, path - - @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets') - def test_create_unix_server(self): - proto = MyProto(loop=self.loop) - server, path = self._make_unix_server(lambda: proto) - self.assertEqual(len(server.sockets), 1) - - client = socket.socket(socket.AF_UNIX) - client.connect(path) - client.sendall(b'xxx') - - self.loop.run_until_complete(proto.connected) - self.assertEqual('CONNECTED', proto.state) - test_utils.run_until(self.loop, lambda: proto.nbytes > 0) - self.assertEqual(3, proto.nbytes) - - # close connection - proto.transport.close() - self.loop.run_until_complete(proto.done) - - self.assertEqual('CLOSED', proto.state) - - # the client socket must be closed after to avoid ECONNRESET upon - # recv()/send() on the serving socket - client.close() - - # close server - server.close() - - def _create_ssl_context(self, certfile, keyfile=None): - sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv23) - sslcontext.options |= ssl.OP_NO_SSLv2 - sslcontext.load_cert_chain(certfile, keyfile) - return sslcontext - - def _make_ssl_server(self, factory, certfile, keyfile=None): - sslcontext = self._create_ssl_context(certfile, keyfile) - - f = self.loop.create_server(factory, '127.0.0.1', 0, ssl=sslcontext) - server = self.loop.run_until_complete(f) - - sock = server.sockets[0] - host, port = sock.getsockname() - self.assertEqual(host, '127.0.0.1') - return server, host, port - - def _make_ssl_unix_server(self, factory, certfile, keyfile=None): - sslcontext = self._create_ssl_context(certfile, keyfile) - return self._make_unix_server(factory, ssl=sslcontext) - - @unittest.skipIf(ssl is None, 'No ssl module') - def test_create_server_ssl(self): - proto = MyProto(loop=self.loop) - server, host, port = self._make_ssl_server( - lambda: proto, ONLYCERT, ONLYKEY) - - f_c = self.loop.create_connection(MyBaseProto, host, port, - ssl=test_utils.dummy_ssl_context()) - client, pr = self.loop.run_until_complete(f_c) - - client.write(b'xxx') - self.loop.run_until_complete(proto.connected) - self.assertEqual('CONNECTED', proto.state) - - test_utils.run_until(self.loop, lambda: proto.nbytes > 0) - self.assertEqual(3, proto.nbytes) - - # extra info is available - self.assertIsNotNone(proto.transport.get_extra_info('sockname')) - self.assertEqual('127.0.0.1', - proto.transport.get_extra_info('peername')[0]) - - # close connection - proto.transport.close() - self.loop.run_until_complete(proto.done) - self.assertEqual('CLOSED', proto.state) - - # the client socket must be closed after to avoid ECONNRESET upon - # recv()/send() on the serving socket - client.close() - - # stop serving - server.close() - - @unittest.skipIf(ssl is None, 'No ssl module') - @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets') - def test_create_unix_server_ssl(self): - proto = MyProto(loop=self.loop) - server, path = self._make_ssl_unix_server( - lambda: proto, ONLYCERT, ONLYKEY) - - f_c = self.loop.create_unix_connection( - MyBaseProto, path, ssl=test_utils.dummy_ssl_context(), - server_hostname='') - - client, pr = self.loop.run_until_complete(f_c) - - client.write(b'xxx') - self.loop.run_until_complete(proto.connected) - self.assertEqual('CONNECTED', proto.state) - test_utils.run_until(self.loop, lambda: proto.nbytes > 0) - self.assertEqual(3, proto.nbytes) - - # close connection - proto.transport.close() - self.loop.run_until_complete(proto.done) - self.assertEqual('CLOSED', proto.state) - - # the client socket must be closed after to avoid ECONNRESET upon - # recv()/send() on the serving socket - client.close() - - # stop serving - server.close() - - @unittest.skipIf(ssl is None, 'No ssl module') - @unittest.skipUnless(HAS_SNI, 'No SNI support in ssl module') - def test_create_server_ssl_verify_failed(self): - proto = MyProto(loop=self.loop) - server, host, port = self._make_ssl_server( - lambda: proto, SIGNED_CERTFILE) - - sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_SSLv23) - sslcontext_client.options |= ssl.OP_NO_SSLv2 - sslcontext_client.verify_mode = ssl.CERT_REQUIRED - if hasattr(sslcontext_client, 'check_hostname'): - sslcontext_client.check_hostname = True - - # no CA loaded - f_c = self.loop.create_connection(MyProto, host, port, - ssl=sslcontext_client) - with self.assertRaisesRegex(ssl.SSLError, - 'certificate verify failed '): - self.loop.run_until_complete(f_c) - - # close connection - self.assertIsNone(proto.transport) - server.close() - - @unittest.skipIf(ssl is None, 'No ssl module') - @unittest.skipUnless(HAS_SNI, 'No SNI support in ssl module') - @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets') - def test_create_unix_server_ssl_verify_failed(self): - proto = MyProto(loop=self.loop) - server, path = self._make_ssl_unix_server( - lambda: proto, SIGNED_CERTFILE) - - sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_SSLv23) - sslcontext_client.options |= ssl.OP_NO_SSLv2 - sslcontext_client.verify_mode = ssl.CERT_REQUIRED - if hasattr(sslcontext_client, 'check_hostname'): - sslcontext_client.check_hostname = True - - # no CA loaded - f_c = self.loop.create_unix_connection(MyProto, path, - ssl=sslcontext_client, - server_hostname='invalid') - with self.assertRaisesRegex(ssl.SSLError, - 'certificate verify failed '): - self.loop.run_until_complete(f_c) - - # close connection - self.assertIsNone(proto.transport) - server.close() - - @unittest.skipIf(ssl is None, 'No ssl module') - @unittest.skipUnless(HAS_SNI, 'No SNI support in ssl module') - def test_create_server_ssl_match_failed(self): - proto = MyProto(loop=self.loop) - server, host, port = self._make_ssl_server( - lambda: proto, SIGNED_CERTFILE) - - sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_SSLv23) - sslcontext_client.options |= ssl.OP_NO_SSLv2 - sslcontext_client.verify_mode = ssl.CERT_REQUIRED - sslcontext_client.load_verify_locations( - cafile=SIGNING_CA) - if hasattr(sslcontext_client, 'check_hostname'): - sslcontext_client.check_hostname = True - - # incorrect server_hostname - f_c = self.loop.create_connection(MyProto, host, port, - ssl=sslcontext_client) - with self.assertRaisesRegex( - ssl.CertificateError, - "hostname '127.0.0.1' doesn't match 'localhost'"): - self.loop.run_until_complete(f_c) - - # close connection - proto.transport.close() - server.close() - - @unittest.skipIf(ssl is None, 'No ssl module') - @unittest.skipUnless(HAS_SNI, 'No SNI support in ssl module') - @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets') - def test_create_unix_server_ssl_verified(self): - proto = MyProto(loop=self.loop) - server, path = self._make_ssl_unix_server( - lambda: proto, SIGNED_CERTFILE) - - sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_SSLv23) - sslcontext_client.options |= ssl.OP_NO_SSLv2 - sslcontext_client.verify_mode = ssl.CERT_REQUIRED - sslcontext_client.load_verify_locations(cafile=SIGNING_CA) - if hasattr(sslcontext_client, 'check_hostname'): - sslcontext_client.check_hostname = True - - # Connection succeeds with correct CA and server hostname. - f_c = self.loop.create_unix_connection(MyProto, path, - ssl=sslcontext_client, - server_hostname='localhost') - client, pr = self.loop.run_until_complete(f_c) - - # close connection - proto.transport.close() - client.close() - server.close() - - @unittest.skipIf(ssl is None, 'No ssl module') - @unittest.skipUnless(HAS_SNI, 'No SNI support in ssl module') - def test_create_server_ssl_verified(self): - proto = MyProto(loop=self.loop) - server, host, port = self._make_ssl_server( - lambda: proto, SIGNED_CERTFILE) - - sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_SSLv23) - sslcontext_client.options |= ssl.OP_NO_SSLv2 - sslcontext_client.verify_mode = ssl.CERT_REQUIRED - sslcontext_client.load_verify_locations(cafile=SIGNING_CA) - if hasattr(sslcontext_client, 'check_hostname'): - sslcontext_client.check_hostname = True - - # Connection succeeds with correct CA and server hostname. - f_c = self.loop.create_connection(MyProto, host, port, - ssl=sslcontext_client, - server_hostname='localhost') - client, pr = self.loop.run_until_complete(f_c) - - # close connection - proto.transport.close() - client.close() - server.close() - - def test_create_server_sock(self): - proto = asyncio.Future(loop=self.loop) - - class TestMyProto(MyProto): - def connection_made(self, transport): - super().connection_made(transport) - proto.set_result(self) - - sock_ob = socket.socket(type=socket.SOCK_STREAM) - sock_ob.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sock_ob.bind(('0.0.0.0', 0)) - - f = self.loop.create_server(TestMyProto, sock=sock_ob) - server = self.loop.run_until_complete(f) - sock = server.sockets[0] - self.assertIs(sock, sock_ob) - - host, port = sock.getsockname() - self.assertEqual(host, '0.0.0.0') - client = socket.socket() - client.connect(('127.0.0.1', port)) - client.send(b'xxx') - client.close() - server.close() - - def test_create_server_addr_in_use(self): - sock_ob = socket.socket(type=socket.SOCK_STREAM) - sock_ob.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sock_ob.bind(('0.0.0.0', 0)) - - f = self.loop.create_server(MyProto, sock=sock_ob) - server = self.loop.run_until_complete(f) - sock = server.sockets[0] - host, port = sock.getsockname() - - f = self.loop.create_server(MyProto, host=host, port=port) - with self.assertRaises(OSError) as cm: - self.loop.run_until_complete(f) - self.assertEqual(cm.exception.errno, errno.EADDRINUSE) - - server.close() - - @unittest.skipUnless(IPV6_ENABLED, 'IPv6 not supported or enabled') - def test_create_server_dual_stack(self): - f_proto = asyncio.Future(loop=self.loop) - - class TestMyProto(MyProto): - def connection_made(self, transport): - super().connection_made(transport) - f_proto.set_result(self) - - try_count = 0 - while True: - try: - port = find_unused_port() - f = self.loop.create_server(TestMyProto, host=None, port=port) - server = self.loop.run_until_complete(f) - except OSError as ex: - if ex.errno == errno.EADDRINUSE: - try_count += 1 - self.assertGreaterEqual(5, try_count) - continue - else: - raise - else: - break - client = socket.socket() - client.connect(('127.0.0.1', port)) - client.send(b'xxx') - proto = self.loop.run_until_complete(f_proto) - proto.transport.close() - client.close() - - f_proto = asyncio.Future(loop=self.loop) - client = socket.socket(socket.AF_INET6) - client.connect(('::1', port)) - client.send(b'xxx') - proto = self.loop.run_until_complete(f_proto) - proto.transport.close() - client.close() - - server.close() - - def test_server_close(self): - f = self.loop.create_server(MyProto, '0.0.0.0', 0) - server = self.loop.run_until_complete(f) - sock = server.sockets[0] - host, port = sock.getsockname() - - client = socket.socket() - client.connect(('127.0.0.1', port)) - client.send(b'xxx') - client.close() - - server.close() - - client = socket.socket() - self.assertRaises( - ConnectionRefusedError, client.connect, ('127.0.0.1', port)) - client.close() - - def test_create_datagram_endpoint(self): - class TestMyDatagramProto(MyDatagramProto): - def __init__(inner_self): - super().__init__(loop=self.loop) - - def datagram_received(self, data, addr): - super().datagram_received(data, addr) - self.transport.sendto(b'resp:'+data, addr) - - coro = self.loop.create_datagram_endpoint( - TestMyDatagramProto, local_addr=('127.0.0.1', 0)) - s_transport, server = self.loop.run_until_complete(coro) - host, port = s_transport.get_extra_info('sockname') - - coro = self.loop.create_datagram_endpoint( - lambda: MyDatagramProto(loop=self.loop), - remote_addr=(host, port)) - transport, client = self.loop.run_until_complete(coro) - - self.assertEqual('INITIALIZED', client.state) - transport.sendto(b'xxx') - test_utils.run_until(self.loop, lambda: server.nbytes) - self.assertEqual(3, server.nbytes) - test_utils.run_until(self.loop, lambda: client.nbytes) - - # received - self.assertEqual(8, client.nbytes) - - # extra info is available - self.assertIsNotNone(transport.get_extra_info('sockname')) - - # close connection - transport.close() - self.loop.run_until_complete(client.done) - self.assertEqual('CLOSED', client.state) - server.transport.close() - - def test_internal_fds(self): - loop = self.create_event_loop() - if not isinstance(loop, selector_events.BaseSelectorEventLoop): - self.skipTest('loop is not a BaseSelectorEventLoop') - - self.assertEqual(1, loop._internal_fds) - loop.close() - self.assertEqual(0, loop._internal_fds) - self.assertIsNone(loop._csock) - self.assertIsNone(loop._ssock) - - @unittest.skipUnless(sys.platform != 'win32', - "Don't support pipes for Windows") - def test_read_pipe(self): - proto = MyReadPipeProto(loop=self.loop) - - rpipe, wpipe = os.pipe() - pipeobj = io.open(rpipe, 'rb', 1024) - - @asyncio.coroutine - def connect(): - t, p = yield from self.loop.connect_read_pipe( - lambda: proto, pipeobj) - self.assertIs(p, proto) - self.assertIs(t, proto.transport) - self.assertEqual(['INITIAL', 'CONNECTED'], proto.state) - self.assertEqual(0, proto.nbytes) - - self.loop.run_until_complete(connect()) - - os.write(wpipe, b'1') - test_utils.run_until(self.loop, lambda: proto.nbytes >= 1) - self.assertEqual(1, proto.nbytes) - - os.write(wpipe, b'2345') - test_utils.run_until(self.loop, lambda: proto.nbytes >= 5) - self.assertEqual(['INITIAL', 'CONNECTED'], proto.state) - self.assertEqual(5, proto.nbytes) - - os.close(wpipe) - self.loop.run_until_complete(proto.done) - self.assertEqual( - ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], proto.state) - # extra info is available - self.assertIsNotNone(proto.transport.get_extra_info('pipe')) - - @unittest.skipUnless(sys.platform != 'win32', - "Don't support pipes for Windows") - # select, poll and kqueue don't support character devices (PTY) on Mac OS X - # older than 10.6 (Snow Leopard) - @requires_mac_ver(10, 6) - # Issue #20495: The test hangs on FreeBSD 7.2 but pass on FreeBSD 9 - @requires_freebsd_version(8) - def test_read_pty_output(self): - proto = MyReadPipeProto(loop=self.loop) - - master, slave = os.openpty() - master_read_obj = io.open(master, 'rb', 0) - - @asyncio.coroutine - def connect(): - t, p = yield from self.loop.connect_read_pipe(lambda: proto, - master_read_obj) - self.assertIs(p, proto) - self.assertIs(t, proto.transport) - self.assertEqual(['INITIAL', 'CONNECTED'], proto.state) - self.assertEqual(0, proto.nbytes) - - self.loop.run_until_complete(connect()) - - os.write(slave, b'1') - test_utils.run_until(self.loop, lambda: proto.nbytes) - self.assertEqual(1, proto.nbytes) - - os.write(slave, b'2345') - test_utils.run_until(self.loop, lambda: proto.nbytes >= 5) - self.assertEqual(['INITIAL', 'CONNECTED'], proto.state) - self.assertEqual(5, proto.nbytes) - - os.close(slave) - self.loop.run_until_complete(proto.done) - self.assertEqual( - ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], proto.state) - # extra info is available - self.assertIsNotNone(proto.transport.get_extra_info('pipe')) - - @unittest.skipUnless(sys.platform != 'win32', - "Don't support pipes for Windows") - def test_write_pipe(self): - rpipe, wpipe = os.pipe() - pipeobj = io.open(wpipe, 'wb', 1024) - - proto = MyWritePipeProto(loop=self.loop) - connect = self.loop.connect_write_pipe(lambda: proto, pipeobj) - transport, p = self.loop.run_until_complete(connect) - self.assertIs(p, proto) - self.assertIs(transport, proto.transport) - self.assertEqual('CONNECTED', proto.state) - - transport.write(b'1') - - data = bytearray() - - def reader(data): - chunk = os.read(rpipe, 1024) - data += chunk - return len(data) - - test_utils.run_until(self.loop, lambda: reader(data) >= 1) - self.assertEqual(b'1', data) - - transport.write(b'2345') - test_utils.run_until(self.loop, lambda: reader(data) >= 5) - self.assertEqual(b'12345', data) - self.assertEqual('CONNECTED', proto.state) - - os.close(rpipe) - - # extra info is available - self.assertIsNotNone(proto.transport.get_extra_info('pipe')) - - # close connection - proto.transport.close() - self.loop.run_until_complete(proto.done) - self.assertEqual('CLOSED', proto.state) - - @unittest.skipUnless(sys.platform != 'win32', - "Don't support pipes for Windows") - def test_write_pipe_disconnect_on_close(self): - rsock, wsock = test_utils.socketpair() - pipeobj = io.open(wsock.detach(), 'wb', 1024) - - proto = MyWritePipeProto(loop=self.loop) - connect = self.loop.connect_write_pipe(lambda: proto, pipeobj) - transport, p = self.loop.run_until_complete(connect) - self.assertIs(p, proto) - self.assertIs(transport, proto.transport) - self.assertEqual('CONNECTED', proto.state) - - transport.write(b'1') - data = self.loop.run_until_complete(self.loop.sock_recv(rsock, 1024)) - self.assertEqual(b'1', data) - - rsock.close() - - self.loop.run_until_complete(proto.done) - self.assertEqual('CLOSED', proto.state) - - @unittest.skipUnless(sys.platform != 'win32', - "Don't support pipes for Windows") - # select, poll and kqueue don't support character devices (PTY) on Mac OS X - # older than 10.6 (Snow Leopard) - @requires_mac_ver(10, 6) - def test_write_pty(self): - master, slave = os.openpty() - slave_write_obj = io.open(slave, 'wb', 0) - - proto = MyWritePipeProto(loop=self.loop) - connect = self.loop.connect_write_pipe(lambda: proto, slave_write_obj) - transport, p = self.loop.run_until_complete(connect) - self.assertIs(p, proto) - self.assertIs(transport, proto.transport) - self.assertEqual('CONNECTED', proto.state) - - transport.write(b'1') - - data = bytearray() - - def reader(data): - chunk = os.read(master, 1024) - data += chunk - return len(data) - - test_utils.run_until(self.loop, lambda: reader(data) >= 1, - timeout=10) - self.assertEqual(b'1', data) - - transport.write(b'2345') - test_utils.run_until(self.loop, lambda: reader(data) >= 5, - timeout=10) - self.assertEqual(b'12345', data) - self.assertEqual('CONNECTED', proto.state) - - os.close(master) - - # extra info is available - self.assertIsNotNone(proto.transport.get_extra_info('pipe')) - - # close connection - proto.transport.close() - self.loop.run_until_complete(proto.done) - self.assertEqual('CLOSED', proto.state) - - def test_prompt_cancellation(self): - r, w = test_utils.socketpair() - r.setblocking(False) - f = self.loop.sock_recv(r, 1) - ov = getattr(f, 'ov', None) - if ov is not None: - self.assertTrue(ov.pending) - - @asyncio.coroutine - def main(): - try: - self.loop.call_soon(f.cancel) - yield from f - except asyncio.CancelledError: - res = 'cancelled' - else: - res = None - finally: - self.loop.stop() - return res - - start = time.monotonic() - t = asyncio.Task(main(), loop=self.loop) - self.loop.run_forever() - elapsed = time.monotonic() - start - - self.assertLess(elapsed, 0.1) - self.assertEqual(t.result(), 'cancelled') - self.assertRaises(asyncio.CancelledError, f.result) - if ov is not None: - self.assertFalse(ov.pending) - self.loop._stop_serving(r) - - r.close() - w.close() - - def test_timeout_rounding(self): - def _run_once(): - self.loop._run_once_counter += 1 - orig_run_once() - - orig_run_once = self.loop._run_once - self.loop._run_once_counter = 0 - self.loop._run_once = _run_once - - @asyncio.coroutine - def wait(): - loop = self.loop - yield from asyncio.sleep(1e-2, loop=loop) - yield from asyncio.sleep(1e-4, loop=loop) - yield from asyncio.sleep(1e-6, loop=loop) - yield from asyncio.sleep(1e-8, loop=loop) - yield from asyncio.sleep(1e-10, loop=loop) - - self.loop.run_until_complete(wait()) - # The ideal number of call is 12, but on some platforms, the selector - # may sleep at little bit less than timeout depending on the resolution - # of the clock used by the kernel. Tolerate a few useless calls on - # these platforms. - self.assertLessEqual( - self.loop._run_once_counter, 20, - {'clock_resolution': self.loop._clock_resolution, - 'selector': self.loop._selector.__class__.__name__}) - - def test_sock_connect_address(self): - addresses = [(socket.AF_INET, ('www.python.org', 80))] - if IPV6_ENABLED: - addresses.extend(( - (socket.AF_INET6, ('www.python.org', 80)), - (socket.AF_INET6, ('www.python.org', 80, 0, 0)), - )) - - for family, address in addresses: - for sock_type in (socket.SOCK_STREAM, socket.SOCK_DGRAM): - sock = socket.socket(family, sock_type) - with sock: - connect = self.loop.sock_connect(sock, address) - with self.assertRaises(ValueError) as cm: - self.loop.run_until_complete(connect) - self.assertIn('address must be resolved', - str(cm.exception)) - - def xtest_remove_fds_after_closing(self): - loop = self.create_event_loop() - - def callback(): - pass - - r, w = test_utils.socketpair() - self.addCleanup(r.close) - self.addCleanup(w.close) - loop.add_reader(r, callback) - loop.add_writer(w, callback) - loop.close() - self.assertFalse(loop.remove_reader(r)) - self.assertFalse(loop.remove_writer(w)) - - def xtest_add_fds_after_closing(self): - loop = self.create_event_loop() - - def callback(): - pass - - r, w = test_utils.socketpair() - self.addCleanup(r.close) - self.addCleanup(w.close) - loop.close() - with self.assertRaises(RuntimeError): - loop.add_reader(r, callback) - with self.assertRaises(RuntimeError): - loop.add_writer(w, callback) - - -class SubprocessTestsMixin: - - def check_terminated(self, returncode): - if sys.platform == 'win32': - self.assertIsInstance(returncode, int) - # expect 1 but sometimes get 0 - else: - self.assertEqual(-signal.SIGTERM, returncode) - - def check_killed(self, returncode): - if sys.platform == 'win32': - self.assertIsInstance(returncode, int) - # expect 1 but sometimes get 0 - else: - self.assertEqual(-signal.SIGKILL, returncode) - - def test_subprocess_exec(self): - prog = os.path.join(os.path.dirname(__file__), 'echo.py') - - connect = self.loop.subprocess_exec( - functools.partial(MySubprocessProtocol, self.loop), - sys.executable, prog) - transp, proto = self.loop.run_until_complete(connect) - self.assertIsInstance(proto, MySubprocessProtocol) - self.loop.run_until_complete(proto.connected) - self.assertEqual('CONNECTED', proto.state) - - stdin = transp.get_pipe_transport(0) - stdin.write(b'Python The Winner') - self.loop.run_until_complete(proto.got_data[1].wait()) - transp.close() - self.loop.run_until_complete(proto.completed) - self.check_terminated(proto.returncode) - self.assertEqual(b'Python The Winner', proto.data[1]) - - def test_subprocess_interactive(self): - prog = os.path.join(os.path.dirname(__file__), 'echo.py') - - connect = self.loop.subprocess_exec( - functools.partial(MySubprocessProtocol, self.loop), - sys.executable, prog) - transp, proto = self.loop.run_until_complete(connect) - self.assertIsInstance(proto, MySubprocessProtocol) - self.loop.run_until_complete(proto.connected) - self.assertEqual('CONNECTED', proto.state) - - try: - stdin = transp.get_pipe_transport(0) - stdin.write(b'Python ') - self.loop.run_until_complete(proto.got_data[1].wait()) - proto.got_data[1].clear() - self.assertEqual(b'Python ', proto.data[1]) - - stdin.write(b'The Winner') - self.loop.run_until_complete(proto.got_data[1].wait()) - self.assertEqual(b'Python The Winner', proto.data[1]) - finally: - transp.close() - - self.loop.run_until_complete(proto.completed) - self.check_terminated(proto.returncode) - - def test_subprocess_shell(self): - connect = self.loop.subprocess_shell( - functools.partial(MySubprocessProtocol, self.loop), - 'echo Python') - transp, proto = self.loop.run_until_complete(connect) - self.assertIsInstance(proto, MySubprocessProtocol) - self.loop.run_until_complete(proto.connected) - - transp.get_pipe_transport(0).close() - self.loop.run_until_complete(proto.completed) - self.assertEqual(0, proto.returncode) - self.assertTrue(all(f.done() for f in proto.disconnects.values())) - self.assertEqual(proto.data[1].rstrip(b'\r\n'), b'Python') - self.assertEqual(proto.data[2], b'') - - def test_subprocess_exitcode(self): - connect = self.loop.subprocess_shell( - functools.partial(MySubprocessProtocol, self.loop), - 'exit 7', stdin=None, stdout=None, stderr=None) - transp, proto = self.loop.run_until_complete(connect) - self.assertIsInstance(proto, MySubprocessProtocol) - self.loop.run_until_complete(proto.completed) - self.assertEqual(7, proto.returncode) - - def test_subprocess_close_after_finish(self): - connect = self.loop.subprocess_shell( - functools.partial(MySubprocessProtocol, self.loop), - 'exit 7', stdin=None, stdout=None, stderr=None) - transp, proto = self.loop.run_until_complete(connect) - self.assertIsInstance(proto, MySubprocessProtocol) - self.assertIsNone(transp.get_pipe_transport(0)) - self.assertIsNone(transp.get_pipe_transport(1)) - self.assertIsNone(transp.get_pipe_transport(2)) - self.loop.run_until_complete(proto.completed) - self.assertEqual(7, proto.returncode) - self.assertIsNone(transp.close()) - - def test_subprocess_kill(self): - prog = os.path.join(os.path.dirname(__file__), 'echo.py') - - connect = self.loop.subprocess_exec( - functools.partial(MySubprocessProtocol, self.loop), - sys.executable, prog) - transp, proto = self.loop.run_until_complete(connect) - self.assertIsInstance(proto, MySubprocessProtocol) - self.loop.run_until_complete(proto.connected) - - transp.kill() - self.loop.run_until_complete(proto.completed) - self.check_killed(proto.returncode) - - def test_subprocess_terminate(self): - prog = os.path.join(os.path.dirname(__file__), 'echo.py') - - connect = self.loop.subprocess_exec( - functools.partial(MySubprocessProtocol, self.loop), - sys.executable, prog) - transp, proto = self.loop.run_until_complete(connect) - self.assertIsInstance(proto, MySubprocessProtocol) - self.loop.run_until_complete(proto.connected) - - transp.terminate() - self.loop.run_until_complete(proto.completed) - self.check_terminated(proto.returncode) - - @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP") - def test_subprocess_send_signal(self): - prog = os.path.join(os.path.dirname(__file__), 'echo.py') - - connect = self.loop.subprocess_exec( - functools.partial(MySubprocessProtocol, self.loop), - sys.executable, prog) - transp, proto = self.loop.run_until_complete(connect) - self.assertIsInstance(proto, MySubprocessProtocol) - self.loop.run_until_complete(proto.connected) - - transp.send_signal(signal.SIGHUP) - self.loop.run_until_complete(proto.completed) - self.assertEqual(-signal.SIGHUP, proto.returncode) - - def test_subprocess_stderr(self): - prog = os.path.join(os.path.dirname(__file__), 'echo2.py') - - connect = self.loop.subprocess_exec( - functools.partial(MySubprocessProtocol, self.loop), - sys.executable, prog) - transp, proto = self.loop.run_until_complete(connect) - self.assertIsInstance(proto, MySubprocessProtocol) - self.loop.run_until_complete(proto.connected) - - stdin = transp.get_pipe_transport(0) - stdin.write(b'test') - - self.loop.run_until_complete(proto.completed) - - transp.close() - self.assertEqual(b'OUT:test', proto.data[1]) - self.assertTrue(proto.data[2].startswith(b'ERR:test'), proto.data[2]) - self.assertEqual(0, proto.returncode) - - def test_subprocess_stderr_redirect_to_stdout(self): - prog = os.path.join(os.path.dirname(__file__), 'echo2.py') - - connect = self.loop.subprocess_exec( - functools.partial(MySubprocessProtocol, self.loop), - sys.executable, prog, stderr=subprocess.STDOUT) - transp, proto = self.loop.run_until_complete(connect) - self.assertIsInstance(proto, MySubprocessProtocol) - self.loop.run_until_complete(proto.connected) - - stdin = transp.get_pipe_transport(0) - self.assertIsNotNone(transp.get_pipe_transport(1)) - self.assertIsNone(transp.get_pipe_transport(2)) - - stdin.write(b'test') - self.loop.run_until_complete(proto.completed) - self.assertTrue(proto.data[1].startswith(b'OUT:testERR:test'), - proto.data[1]) - self.assertEqual(b'', proto.data[2]) - - transp.close() - self.assertEqual(0, proto.returncode) - - def test_subprocess_close_client_stream(self): - prog = os.path.join(os.path.dirname(__file__), 'echo3.py') - - connect = self.loop.subprocess_exec( - functools.partial(MySubprocessProtocol, self.loop), - sys.executable, prog) - transp, proto = self.loop.run_until_complete(connect) - self.assertIsInstance(proto, MySubprocessProtocol) - self.loop.run_until_complete(proto.connected) - - stdin = transp.get_pipe_transport(0) - stdout = transp.get_pipe_transport(1) - stdin.write(b'test') - self.loop.run_until_complete(proto.got_data[1].wait()) - self.assertEqual(b'OUT:test', proto.data[1]) - - stdout.close() - self.loop.run_until_complete(proto.disconnects[1]) - stdin.write(b'xxx') - self.loop.run_until_complete(proto.got_data[2].wait()) - if sys.platform != 'win32': - self.assertEqual(b'ERR:BrokenPipeError', proto.data[2]) - else: - # After closing the read-end of a pipe, writing to the - # write-end using os.write() fails with errno==EINVAL and - # GetLastError()==ERROR_INVALID_NAME on Windows!?! (Using - # WriteFile() we get ERROR_BROKEN_PIPE as expected.) - self.assertEqual(b'ERR:OSError', proto.data[2]) - transp.close() - self.loop.run_until_complete(proto.completed) - self.check_terminated(proto.returncode) - - def test_subprocess_wait_no_same_group(self): - # start the new process in a new session - connect = self.loop.subprocess_shell( - functools.partial(MySubprocessProtocol, self.loop), - 'exit 7', stdin=None, stdout=None, stderr=None, - start_new_session=True) - _, proto = yield self.loop.run_until_complete(connect) - self.assertIsInstance(proto, MySubprocessProtocol) - self.loop.run_until_complete(proto.completed) - self.assertEqual(7, proto.returncode) - - def test_subprocess_exec_invalid_args(self): - @asyncio.coroutine - def connect(**kwds): - yield from self.loop.subprocess_exec( - asyncio.SubprocessProtocol, - 'pwd', **kwds) - - with self.assertRaises(ValueError): - self.loop.run_until_complete(connect(universal_newlines=True)) - with self.assertRaises(ValueError): - self.loop.run_until_complete(connect(bufsize=4096)) - with self.assertRaises(ValueError): - self.loop.run_until_complete(connect(shell=True)) - - def test_subprocess_shell_invalid_args(self): - @asyncio.coroutine - def connect(cmd=None, **kwds): - if not cmd: - cmd = 'pwd' - yield from self.loop.subprocess_shell( - asyncio.SubprocessProtocol, - cmd, **kwds) - - with self.assertRaises(ValueError): - self.loop.run_until_complete(connect(['ls', '-l'])) - with self.assertRaises(ValueError): - self.loop.run_until_complete(connect(universal_newlines=True)) - with self.assertRaises(ValueError): - self.loop.run_until_complete(connect(bufsize=4096)) - with self.assertRaises(ValueError): - self.loop.run_until_complete(connect(shell=False)) - - -class ZmqSelectorEventLoopTests(EventLoopTestsMixin, unittest.TestCase): - - def create_event_loop(self): - return aiozmq.ZmqEventLoop()