Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Some tests cleanups

  • Loading branch information...
commit 20f945e869df0df0b3aef8bd5dee5ef7fc9f69fa 1 parent 469ab3b
Alvaro authored
View
233 tests/test_api.py
@@ -1,233 +0,0 @@
-#
-# Evy - a concurrent networking library for Python
-#
-# Unless otherwise noted, the files in Evy are under the following MIT license:
-#
-# Copyright (c) 2012, Alvaro Saurin
-# Copyright (c) 2008-2010, Eventlet Contributors (see AUTHORS)
-# Copyright (c) 2007-2010, Linden Research, Inc.
-# Copyright (c) 2005-2006, Bob Ippolito
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-#
-
-
-import os
-import os.path
-import socket
-from unittest import TestCase, main
-import warnings
-
-import evy
-
-warnings.simplefilter('ignore', DeprecationWarning)
-from evy import api
-
-warnings.simplefilter('default', DeprecationWarning)
-from evy import greenio, util, hubs, greenthread, spawn
-
-from tests import skip_if_no_ssl
-
-
-def check_hub ():
- # Clear through the descriptor queue
- api.sleep(0)
- api.sleep(0)
- hub = hubs.get_hub()
- for nm in 'get_readers', 'get_writers':
- dct = getattr(hub, nm)()
- assert not dct, "hub.%s not empty: %s" % (nm, dct)
- # Stop the runloop (unless it's twistedhub which does not support that)
- if not getattr(hub, 'uses_twisted_reactor', None):
- hub.abort(True)
- assert not hub.running
-
-
-class TestApi(TestCase):
- certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt')
- private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key')
-
- def test_tcp_listener (self):
- socket = evy.listen(('0.0.0.0', 0))
- assert socket.getsockname()[0] == '0.0.0.0'
- socket.close()
-
- check_hub()
-
- def test_connect_tcp (self):
- def accept_once (listenfd):
- try:
- conn, addr = listenfd.accept()
- fd = conn.makefile(mode = 'w')
- conn.close()
- fd.write('hello\n')
- fd.close()
- finally:
- listenfd.close()
-
- server = evy.listen(('0.0.0.0', 0))
- api.spawn(accept_once, server)
-
- client = evy.connect(('127.0.0.1', server.getsockname()[1]))
- fd = client.makefile()
- client.close()
- assert fd.readline() == 'hello\n'
-
- assert fd.read() == ''
- fd.close()
-
- check_hub()
-
- @skip_if_no_ssl
- def test_connect_ssl (self):
- def accept_once (listenfd):
- try:
- conn, addr = listenfd.accept()
- conn.write('hello\r\n')
- greenio.shutdown_safe(conn)
- conn.close()
- finally:
- greenio.shutdown_safe(listenfd)
- listenfd.close()
-
- server = api.ssl_listener(('0.0.0.0', 0),
- self.certificate_file,
- self.private_key_file)
- api.spawn(accept_once, server)
-
- raw_client = evy.connect(('127.0.0.1', server.getsockname()[1]))
- client = util.wrap_ssl(raw_client)
- fd = socket._fileobject(client, 'rb', 8192)
-
- assert fd.readline() == 'hello\r\n'
- try:
- self.assertEquals('', fd.read(10))
- except greenio.SSL.ZeroReturnError:
- # if it's a GreenSSL object it'll do this
- pass
- greenio.shutdown_safe(client)
- client.close()
-
- check_hub()
-
- def test_001_trampoline_timeout (self):
- from evy import coros
-
- server_sock = evy.listen(('127.0.0.1', 0))
- bound_port = server_sock.getsockname()[1]
-
- def server (sock):
- client, addr = sock.accept()
- api.sleep(0.1)
-
- server_evt = spawn(server, server_sock)
- api.sleep(0)
- try:
- desc = evy.connect(('127.0.0.1', bound_port))
- api.trampoline(desc, read = True, write = False, timeout = 0.001)
- except api.TimeoutError:
- pass # test passed
- else:
- assert False, "Didn't timeout"
-
- server_evt.wait()
- check_hub()
-
- def test_timeout_cancel (self):
- server = evy.listen(('0.0.0.0', 0))
- bound_port = server.getsockname()[1]
-
- done = [False]
-
- def client_closer (sock):
- while True:
- (conn, addr) = sock.accept()
- conn.close()
-
- def go ():
- desc = evy.connect(('127.0.0.1', bound_port))
- try:
- api.trampoline(desc, read = True, timeout = 0.1)
- except api.TimeoutError:
- assert False, "Timed out"
-
- server.close()
- desc.close()
- done[0] = True
-
- greenthread.spawn_after_local(0, go)
-
- server_coro = api.spawn(client_closer, server)
- while not done[0]:
- api.sleep(0)
- api.kill(server_coro)
-
- check_hub()
-
- def test_named (self):
- named_foo = api.named('tests.api_test.Foo')
- self.assertEquals(
- named_foo.__name__,
- "Foo")
-
- def test_naming_missing_class (self):
- self.assertRaises(
- ImportError, api.named, 'this_name_should_hopefully_not_exist.Foo')
-
-
- def test_killing_dormant (self):
- DELAY = 0.1
- state = []
-
- def test ():
- try:
- state.append('start')
- api.sleep(DELAY)
- except:
- state.append('except')
- # catching GreenletExit
- pass
- # when switching to hub, hub makes itself the parent of this greenlet,
- # thus after the function's done, the control will go to the parent
- api.sleep(0)
- state.append('finished')
-
- g = api.spawn(test)
- api.sleep(DELAY / 2)
- self.assertEquals(state, ['start'])
- api.kill(g)
- # will not get there, unless switching is explicitly scheduled by kill
- self.assertEquals(state, ['start', 'except'])
- api.sleep(DELAY)
- self.assertEquals(state, ['start', 'except', 'finished'])
-
- def test_nested_with_timeout (self):
- def func ():
- return api.with_timeout(0.2, api.sleep, 2, timeout_value = 1)
-
- self.assertRaises(api.TimeoutError, api.with_timeout, 0.1, func)
-
-
-class Foo(object):
- pass
-
-
-if __name__ == '__main__':
- main()
-
View
177 tests/test_greenthread.py
@@ -28,9 +28,24 @@
#
+from unittest import TestCase, main
+import warnings
+
from tests import LimitedTestCase
+from tests import skip_if_no_ssl
+
+import evy
from evy import greenthread
from evy.support import greenlets as greenlet
+from evy import greenio, util, hubs, greenthread, spawn
+
+
+
+
+warnings.simplefilter('ignore', DeprecationWarning)
+warnings.simplefilter('default', DeprecationWarning)
+
+
_g_results = []
@@ -43,6 +58,21 @@ def waiter (a):
greenthread.sleep(0.1)
return a
+def check_hub ():
+ # Clear through the descriptor queue
+ greenthread.sleep(0)
+ greenthread.sleep(0)
+ hub = hubs.get_hub()
+ for nm in 'get_readers', 'get_writers':
+ dct = getattr(hub, nm)()
+ assert not dct, "hub.%s not empty: %s" % (nm, dct)
+ # Stop the runloop (unless it's twistedhub which does not support that)
+ if not getattr(hub, 'uses_twisted_reactor', None):
+ hub.abort(True)
+ assert not hub.running
+
+
+
class Asserts(object):
def assert_dead (self, gt):
@@ -52,10 +82,10 @@ def assert_dead (self, gt):
self.assert_(not gt)
-class Spawn(LimitedTestCase, Asserts):
+class TestSpawn(LimitedTestCase, Asserts):
def tearDown (self):
global _g_results
- super(Spawn, self).tearDown()
+ super(TestSpawn, self).tearDown()
_g_results = []
def test_simple (self):
@@ -124,7 +154,7 @@ def link_func (g, *a, **kw):
self.assertEquals(results, [gt, (4,), {'b': 5}])
-class SpawnAfter(LimitedTestCase, Asserts):
+class TestSpawnAfter(LimitedTestCase, Asserts):
def test_basic (self):
gt = greenthread.spawn_after(0.1, passthru, 20)
self.assertEquals(gt.wait(), ((20,), {}))
@@ -147,9 +177,9 @@ def test_kill_already_started (self):
self.assert_dead(gt)
-class SpawnAfterLocal(LimitedTestCase, Asserts):
+class TestSpawnAfterLocal(LimitedTestCase, Asserts):
def setUp (self):
- super(SpawnAfterLocal, self).setUp()
+ super(TestSpawnAfterLocal, self).setUp()
self.lst = [1]
def test_timer_fired (self):
@@ -179,3 +209,140 @@ def func ():
greenthread.spawn(func)
greenthread.sleep(0.1)
assert self.lst == [], self.lst
+
+
+
+class TestGreenHub(TestCase):
+
+
+ def test_tcp_listener (self):
+ socket = evy.listen(('0.0.0.0', 0))
+ assert socket.getsockname()[0] == '0.0.0.0'
+ socket.close()
+
+ check_hub()
+
+ def test_connect_tcp (self):
+ def accept_once (listenfd):
+ try:
+ conn, addr = listenfd.accept()
+ fd = conn.makefile(mode = 'w')
+ conn.close()
+ fd.write('hello\n')
+ fd.close()
+ finally:
+ listenfd.close()
+
+ server = evy.listen(('0.0.0.0', 0))
+ greenthread.spawn(accept_once, server)
+
+ client = evy.connect(('127.0.0.1', server.getsockname()[1]))
+ fd = client.makefile()
+ client.close()
+ assert fd.readline() == 'hello\n'
+
+ assert fd.read() == ''
+ fd.close()
+
+ check_hub()
+
+ def test_001_trampoline_timeout (self):
+ from evy import coros
+
+ server_sock = evy.listen(('127.0.0.1', 0))
+ bound_port = server_sock.getsockname()[1]
+
+ def server (sock):
+ client, addr = sock.accept()
+ greenthread.sleep(0.1)
+
+ server_evt = spawn(server, server_sock)
+ greenthread.sleep(0)
+ try:
+ desc = evy.connect(('127.0.0.1', bound_port))
+ hubs.trampoline(desc, read = True, write = False, timeout = 0.001)
+ except greenthread.TimeoutError:
+ pass # test passed
+ else:
+ assert False, "Didn't timeout"
+
+ server_evt.wait()
+ check_hub()
+
+ def test_timeout_cancel (self):
+ server = evy.listen(('0.0.0.0', 0))
+ bound_port = server.getsockname()[1]
+
+ done = [False]
+
+ def client_closer (sock):
+ while True:
+ (conn, addr) = sock.accept()
+ conn.close()
+
+ def go ():
+ desc = evy.connect(('127.0.0.1', bound_port))
+ try:
+ hubs.trampoline(desc, read = True, timeout = 0.1)
+ except greenthread.TimeoutError:
+ assert False, "Timed out"
+
+ server.close()
+ desc.close()
+ done[0] = True
+
+ greenthread.spawn_after_local(0, go)
+
+ server_coro = greenthread.spawn(client_closer, server)
+ while not done[0]:
+ greenthread.sleep(0)
+ greenthread.kill(server_coro)
+
+ check_hub()
+
+ def test_named (self):
+ named_foo = greenthread.named('tests.api_test.Foo')
+ self.assertEquals(
+ named_foo.__name__,
+ "Foo")
+
+ def test_naming_missing_class (self):
+ self.assertRaises(
+ ImportError, greenthread.named, 'this_name_should_hopefully_not_exist.Foo')
+
+
+ def test_killing_dormant (self):
+ DELAY = 0.1
+ state = []
+
+ def test ():
+ try:
+ state.append('start')
+ greenthread.sleep(DELAY)
+ except:
+ state.append('except')
+ # catching GreenletExit
+ pass
+ # when switching to hub, hub makes itself the parent of this greenlet,
+ # thus after the function's done, the control will go to the parent
+ greenthread.sleep(0)
+ state.append('finished')
+
+ g = greenthread.spawn(test)
+ greenthread.sleep(DELAY / 2)
+ self.assertEquals(state, ['start'])
+ greenthread.kill(g)
+ # will not get there, unless switching is explicitly scheduled by kill
+ self.assertEquals(state, ['start', 'except'])
+ greenthread.sleep(DELAY)
+ self.assertEquals(state, ['start', 'except', 'finished'])
+
+ def test_nested_with_timeout (self):
+ def func ():
+ return greenthread.with_timeout(0.2, greenthread.sleep, 2, timeout_value = 1)
+
+ self.assertRaises(greenthread.TimeoutError, greenthread.with_timeout, 0.1, func)
+
+
+class Foo(object):
+ pass
View
9 tests/test_mysqldb.py
@@ -64,7 +64,8 @@ def mysql_requirement (_f):
return False
-class MySQLdbTester(LimitedTestCase):
+class TestMySQLdb(LimitedTestCase):
+
def setUp (self):
self._auth = get_database_auth()['MySQLdb']
self.create_db()
@@ -247,9 +248,13 @@ def test_visibility_from_other_connections (self):
curs.execute("delete from gargleblatz where a=314159")
conn.commit()
+
+
from tests import test_patcher
-class MonkeyPatchTester(test_patcher.ProcessBase):
+
+class TestMySQLMonkeyPatch(test_patcher.ProcessBase):
+
@skip_unless(mysql_requirement)
def test_monkey_patching (self):
output, lines = self.run_script("""
View
21 tests/test_patcher.py
@@ -96,7 +96,7 @@ def run_script (self, contents, modname = None):
return self.launch_subprocess(modname)
-class ImportPatched(ProcessBase):
+class TestImportPatched(ProcessBase):
def test_patch_a_module (self):
self.write_to_tempfile("base", base_module_contents)
self.write_to_tempfile("patching", patching_module_contents)
@@ -126,7 +126,9 @@ def test_import_patched_defaults (self):
self.assert_('GreenSocket' in lines[1], repr(output))
-class MonkeyPatch(ProcessBase):
+
+class TestMonkeyPatch(ProcessBase):
+
def test_patched_modules (self):
new_mod = """
from evy import patcher
@@ -264,10 +266,9 @@ def do_sleep():
tpool.killall()
"""
-class Tpool(ProcessBase):
+class TestTpool(ProcessBase):
TEST_TIMEOUT = 3
- @skip_with_pyevent
def test_simple (self):
new_mod = """
import evy
@@ -285,7 +286,6 @@ def test_simple (self):
self.assert_('2' in lines[0], repr(output))
self.assert_('3' in lines[1], repr(output))
- @skip_with_pyevent
def test_unpatched_thread (self):
new_mod = """import evy
evy.monkey_patch(time=False, thread=False)
@@ -298,7 +298,6 @@ def test_unpatched_thread (self):
output, lines = self.launch_subprocess('newmod.py')
self.assertEqual(len(lines), 2, lines)
- @skip_with_pyevent
def test_patched_thread (self):
new_mod = """import evy
evy.monkey_patch(time=False, thread=True)
@@ -312,7 +311,7 @@ def test_patched_thread (self):
self.assertEqual(len(lines), 2, "\n".join(lines))
-class Subprocess(ProcessBase):
+class TestSubprocess(ProcessBase):
def test_monkeypatched_subprocess (self):
new_mod = """import evy
evy.monkey_patch()
@@ -326,7 +325,10 @@ def test_monkeypatched_subprocess (self):
self.assertEqual(output, "done\n", output)
-class Threading(ProcessBase):
+
+
+class TestThreading(ProcessBase):
+
def test_orig_thread (self):
new_mod = """import evy
evy.monkey_patch()
@@ -425,7 +427,8 @@ def test_keyerror (self):
self.assertEqual(len(lines), 1, "\n".join(lines))
-class GreenThreadWrapper(ProcessBase):
+class TestGreenThreadWrapper(ProcessBase):
+
prologue = """import evy
evy.monkey_patch()
import threading
View
3  tests/test_patcher_psycopg.py
@@ -65,7 +65,8 @@ def fetch(num, secs):
print "done"
"""
-class PatchingPsycopg(patcher_test.ProcessBase):
+class TestPatchingPsycopg(patcher_test.ProcessBase):
+
@skip_unless(postgres_requirement)
def test_psycopg_patched (self):
if 'PSYCOPG_TEST_DSN' not in os.environ:
View
2  tests/test_pool.py
@@ -292,7 +292,7 @@ def resume ():
self.assertEquals(tp.get(), 'wakeup')
-class PoolBasicTests(LimitedTestCase):
+class TestPoolBasicTests(LimitedTestCase):
klass = pool.Pool
def test_execute_async (self):
View
1  tests/test_proc.py
@@ -33,7 +33,6 @@
import warnings
warnings.simplefilter('ignore', DeprecationWarning)
-from evy import proc
warnings.simplefilter('default', DeprecationWarning)
from evy import coros
View
2  tests/test_saranwrap.py
@@ -53,6 +53,8 @@ def list_maker ():
two = 2
three = 3
+
+
class CoroutineCallingClass(object):
def __init__ (self):
self._my_dict = {}
View
47 tests/test_ssl.py
@@ -28,13 +28,20 @@
#
+import socket
+import os
+
from tests import LimitedTestCase, certificate_file, private_key_file
from tests import skip_if_no_ssl
from unittest import main
+
import evy
from evy import util, coros, greenio
-import socket
-import os
+from evy import greenthread
+
+
+
+
def listen_ssl_socket (address = ('127.0.0.1', 0)):
sock = util.wrap_ssl(socket.socket(), certificate_file,
@@ -46,6 +53,10 @@ def listen_ssl_socket (address = ('127.0.0.1', 0)):
class SSLTest(LimitedTestCase):
+
+ certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt')
+ private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key')
+
@skip_if_no_ssl
def test_duplex_response (self):
def serve (listener):
@@ -101,6 +112,36 @@ def serve (listener):
server_coro.wait()
@skip_if_no_ssl
+ def test_ssl_connect2 (self):
+ def accept_once (listenfd):
+ try:
+ conn, addr = listenfd.accept()
+ conn.write('hello\r\n')
+ greenio.shutdown_safe(conn)
+ conn.close()
+ finally:
+ greenio.shutdown_safe(listenfd)
+ listenfd.close()
+
+ server = api.ssl_listener(('0.0.0.0', 0),
+ self.certificate_file,
+ self.private_key_file)
+ greenthread.spawn(accept_once, server)
+
+ raw_client = evy.connect(('127.0.0.1', server.getsockname()[1]))
+ client = util.wrap_ssl(raw_client)
+ fd = socket._fileobject(client, 'rb', 8192)
+
+ assert fd.readline() == 'hello\r\n'
+ try:
+ self.assertEquals('', fd.read(10))
+ except greenio.SSL.ZeroReturnError:
+ # if it's a GreenSSL object it'll do this
+ pass
+ greenio.shutdown_safe(client)
+ client.close()
+
+ @skip_if_no_ssl
def test_ssl_unwrap (self):
def serve ():
sock, addr = listener.accept()
@@ -126,6 +167,8 @@ def serve ():
class SocketSSLTest(LimitedTestCase):
+
+
@skip_if_no_ssl
def test_greensslobject (self):
import warnings
Please sign in to comment.
Something went wrong with that request. Please try again.