Skip to content

Commit cfa9521

Browse files
committed
Fix bugs in Future; add benchmarks
1 parent f7129fc commit cfa9521

File tree

8 files changed

+130
-10
lines changed

8 files changed

+130
-10
lines changed

examples/bench/client.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# Copied with minimal modifications from curio
2+
# https://github.com/dabeaz/curio
3+
4+
from concurrent.futures import ProcessPoolExecutor
5+
6+
from socket import *
7+
import time
8+
import sys
9+
10+
if len(sys.argv) > 1:
11+
MSGSIZE = int(sys.argv[1])
12+
else:
13+
MSGSIZE = 1000
14+
15+
msg = b'x'*MSGSIZE
16+
17+
def run_test(n):
18+
print('Sending', NMESSAGES, 'messages')
19+
sock = socket(AF_INET, SOCK_STREAM)
20+
sock.connect(('localhost', 25000))
21+
while n > 0:
22+
sock.sendall(msg)
23+
nrecv = 0
24+
while nrecv < MSGSIZE:
25+
resp = sock.recv(MSGSIZE)
26+
if not resp:
27+
raise SystemExit()
28+
nrecv += len(resp)
29+
n -= 1
30+
31+
N = 3
32+
NMESSAGES = 200000
33+
start = time.time()
34+
with ProcessPoolExecutor(max_workers=N) as e:
35+
for _ in range(N):
36+
e.submit(run_test, NMESSAGES)
37+
end = time.time()
38+
duration = end-start
39+
print(NMESSAGES*N,'in', duration)
40+
print(NMESSAGES*N/duration, 'requests/sec')

examples/bench/server.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import argparse
2+
import asyncio
3+
import uvloop
4+
5+
from socket import *
6+
7+
8+
async def echo_server(loop, address):
9+
sock = socket(AF_INET, SOCK_STREAM)
10+
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
11+
sock.bind(address)
12+
sock.listen(5)
13+
sock.setblocking(False)
14+
print('Server listening at', address)
15+
with sock:
16+
while True:
17+
client, addr = await loop.sock_accept(sock)
18+
print('Connection from', addr)
19+
loop.create_task(echo_client(loop, client))
20+
21+
22+
async def echo_client(loop, client):
23+
with client:
24+
while True:
25+
data = await loop.sock_recv(client, 10000)
26+
if not data:
27+
break
28+
await loop.sock_sendall(client, data)
29+
print('Connection closed')
30+
31+
32+
if __name__ == '__main__':
33+
parser = argparse.ArgumentParser()
34+
parser.add_argument('--uvloop', default=False, action='store_true')
35+
args = parser.parse_args()
36+
37+
if args.uvloop:
38+
loop = uvloop.Loop()
39+
print('using UVLoop')
40+
else:
41+
loop = asyncio.new_event_loop()
42+
print('using asyncio loop')
43+
44+
asyncio.set_event_loop(loop)
45+
loop.set_debug(False)
46+
47+
loop.create_task(echo_server(loop, ('', 25000)))
48+
try:
49+
loop.run_forever()
50+
finally:
51+
loop.close()

setup.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ def build_extensions(self):
5151
"uvloop.futures",
5252
sources = [
5353
"uvloop/futures.c",
54-
]
54+
],
55+
extra_compile_args=['-O2']
5556
),
5657

5758
Extension(

tests/test_futures.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ def test_cancel(self):
2424
self.assertFalse(f.cancel())
2525

2626
def test_initial_state(self):
27-
f = CFuture(self.loop)
27+
f = CFuture(loop=self.loop)
2828
self.assertFalse(f.cancelled())
2929
self.assertFalse(f.done())
3030
f.cancel()

uvloop/consts.pxi

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
DEF DEBUG = 0
22

3+
# XXX: To use Future implemented in C, you need a modified
4+
# version of asyncio.
5+
DEF USE_C_FUTURE = 0
6+
37
DEF UV_STREAM_RECV_BUF_SIZE = 65536
48

59
DEF FLOW_CONTROL_HIGH_WATER = 65536

uvloop/futures.c

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
#include "Python.h"
44
#include "structmember.h"
55

6-
static struct Locals { // XXX
6+
static struct Locals { // XXX!!!
77
PyObject* is_error;
88
PyObject* ce_error;
99
} locals;
@@ -67,10 +67,12 @@ _schedule_callbacks(FutureObj *fut) {
6767
static PyObject *
6868
FutureObj_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
6969
{
70+
static char *kwlist[] = {"loop", NULL};
71+
7072
PyObject *loop;
7173
FutureObj *fut;
7274

73-
if (!PyArg_UnpackTuple(args, "Future", 1, 1, &loop))
75+
if (!PyArg_ParseTupleAndKeywords(args, kwds, "|O", kwlist, &loop))
7476
return NULL;
7577

7678
fut = PyObject_GC_New(FutureObj, type);
@@ -255,6 +257,13 @@ FutureObj_iternext(FutureObj *fut)
255257
return NULL;
256258
}
257259

260+
static PyObject *
261+
FutureObj_send(FutureObj *fut, PyObject *res) {
262+
PyErr_Format(PyExc_RuntimeError,
263+
"future.send() was called; unpatched asyncio");
264+
return NULL;
265+
}
266+
258267
static PyObject *
259268
FutureObj_add_done_callback(FutureObj *fut, PyObject *arg)
260269
{
@@ -345,6 +354,8 @@ static PyMethodDef FutureType_methods[] = {
345354
{"done", (PyCFunction)FutureObj_done, METH_NOARGS, NULL},
346355
{"result", (PyCFunction)FutureObj_result, METH_NOARGS, NULL},
347356
{"exception", (PyCFunction)FutureObj_exception, METH_NOARGS, NULL},
357+
358+
{"send", (PyCFunction)FutureObj_send, METH_O, NULL}, // XXX
348359
{NULL, NULL} /* Sentinel */
349360
};
350361

@@ -425,6 +436,7 @@ futures_exec(PyObject *module) {
425436
return -1;
426437
}
427438

439+
// XXX!!!
428440
PyObject *asyncio = PyImport_ImportModule("asyncio");
429441
if (asyncio == NULL) {
430442
return -1;

uvloop/loop.pyx

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,13 @@ include "consts.pxi"
2424
include "stdlib.pxi"
2525

2626

27+
cdef Future
28+
IF USE_C_FUTURE:
29+
Future = c_Future
30+
ELSE:
31+
Future = aio_Future
32+
33+
2734
class LoopError(Exception):
2835
pass
2936

@@ -279,7 +286,7 @@ cdef class Loop:
279286
int proto, int flags,
280287
int unpack):
281288

282-
fut = aio_Future(loop=self)
289+
fut = Future(loop=self)
283290

284291
def callback(result):
285292
if AddrInfo.isinstance(result):
@@ -655,27 +662,27 @@ cdef class Loop:
655662
return result
656663

657664
def sock_recv(self, sock, n):
658-
fut = CFuture(self)
665+
fut = Future(loop=self)
659666
self._sock_recv(fut, False, sock, n)
660667
return fut
661668

662669
def sock_sendall(self, sock, data):
663-
fut = CFuture(self)
670+
fut = Future(loop=self)
664671
if data:
665672
self._sock_sendall(fut, False, sock, data)
666673
else:
667674
fut.set_result(None)
668675
return fut
669676

670677
def sock_accept(self, sock):
671-
fut = aio_Future(loop=self)
678+
fut = Future(loop=self)
672679
self._sock_accept(fut, False, sock)
673680
return fut
674681

675682
def sock_connect(self, sock, address):
676683
if self._debug and sock.gettimeout() != 0:
677684
raise ValueError("the socket must be non-blocking")
678-
fut = aio_Future(loop=self)
685+
fut = Future(loop=self)
679686
try:
680687
if self._debug:
681688
aio__check_resolved_address(sock, address)

uvloop/stdlib.pxi

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@ import socket
77
import sys
88

99

10+
from . import futures
11+
# asyncio.Future.register(futures.Future)
12+
cdef c_Future = futures.Future
13+
14+
1015
cdef aio_CancelledError = asyncio.CancelledError
1116
cdef aio_TimeoutError = asyncio.TimeoutError
1217
cdef aio_Future = asyncio.Future
@@ -36,4 +41,4 @@ cdef str sys_platform = sys.platform
3641

3742

3843
# Cython doesn't clean-up imported objects properly in Py3 mode.
39-
del asyncio, concurrent, collections, functools, socket, os, sys
44+
del asyncio, concurrent, collections, futures, functools, socket, os, sys

0 commit comments

Comments
 (0)