Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

add a listener using the socket module.

While uv watchers are evry fast it may be interresting to use the socket
module as well so we can easily wrap later connections to handle SSL and
also reuse shared TCP sockets on unix. (so we can eventually use this
listener in gunicorn as well.
  • Loading branch information...
commit abeed04a625aa9ea5ad336ffe94515c4b83e1636 1 parent e866026
Benoit Chesneau authored September 11, 2012
35  examples/echo_sockserver.py
... ...
@@ -0,0 +1,35 @@
  1
+# Echo server program
  2
+from flower import tasklet, run
  3
+from flower.net import Listen
  4
+
  5
+
  6
+# handle the connection. It return data to the sender.
  7
+def handle_connection(conn):
  8
+    while True:
  9
+        data = conn.read()
  10
+        if not data:
  11
+            break
  12
+
  13
+        conn.write(data)
  14
+
  15
+
  16
+# Listen on tcp port 8000 on localhost using the python socket module.
  17
+l = Listen(('127.0.0.1', 8000), "socktcp")
  18
+try:
  19
+    while True:
  20
+        try:
  21
+
  22
+            # wait for new connections (it doesn't block other tasks)
  23
+            conn, err = l.accept()
  24
+
  25
+            # Handle the connection in a new task.
  26
+            # The loop then returns to accepting, so that
  27
+            # multiple connections may be served concurrently.
  28
+
  29
+            t = tasklet(handle_connection)(conn)
  30
+        except KeyboardInterrupt:
  31
+            break
  32
+finally:
  33
+    l.close()
  34
+
  35
+run()
30  flower/io.py
@@ -6,6 +6,7 @@
6 6
 from flower.core import channel
7 7
 from flower.core.uv import get_fd, uv_mode, uv_server
8 8
 
  9
+from pyuv import errno
9 10
 
10 11
 class IOChannel(channel):
11 12
     """ channel to wait on IO events for a specific fd. It now use the UV server
@@ -25,12 +26,35 @@ def __init__(self, io, mode=0, label=''):
25 26
         self._poller = pyuv.Poll(uv.loop, fno)
26 27
         self._poller.start(uv_mode(mode), self._tick)
27 28
 
28  
-    def _tick(self, handle, events, errno):
29  
-        if errno:
30  
-            self.send_exception(IOError, errno)
  29
+    def _tick(self, handle, events, error):
  30
+        print("error")
  31
+        if error:
  32
+            if error == errno.EBADF:
  33
+                self.handle.close()
  34
+                self.send(None)
  35
+            else:
  36
+                self.send_exception(IOError, "uv error: %s" % errno)
31 37
         else:
32 38
             self.send(events)
33 39
 
34 40
     def stop(self):
35 41
         self._poller.stop()
36 42
         self.close()
  43
+
  44
+def wait_read(io):
  45
+    """ wrapper around IOChannel to only wait when a device become
  46
+    readable """
  47
+    c = IOChannel(io)
  48
+    try:
  49
+        return c.receive()
  50
+    finally:
  51
+        c.close()
  52
+
  53
+def wait_write(io):
  54
+    """ wrapper around IOChannel to only wait when a device become
  55
+    writanle """
  56
+    c = IOChannel(io, 1)
  57
+    try:
  58
+        return c.receive()
  59
+    finally:
  60
+        c.close()
5  flower/net/__init__.py
@@ -6,11 +6,14 @@
6 6
 from flower.net.tcp import TCPListen, dial_tcp
7 7
 from flower.net.udp import UDPListen, dial_udp
8 8
 from flower.net.pipe import PipeListen, dial_pipe
  9
+from flower.net.sock import TCPSockListen, PipeSockListen
9 10
 
10 11
 LISTEN_HANDLERS = dict(
11 12
         tcp = TCPListen,
12 13
         udp = UDPListen,
13  
-        pipe = PipeListen)
  14
+        pipe = PipeListen,
  15
+        socktcp = TCPSockListen,
  16
+        sockpipe = PipeSockListen)
14 17
 
15 18
 DIAL_HANDLERS = dict(
16 19
         tcp = dial_tcp,
387  flower/net/sock.py
... ...
@@ -0,0 +1,387 @@
  1
+# -*- coding: utf-8 -
  2
+#
  3
+# This file is part of flower. See the NOTICE for more information.
  4
+
  5
+from collections import deque
  6
+from io import DEFAULT_BUFFER_SIZE
  7
+import threading
  8
+
  9
+import socket
  10
+import sys
  11
+
  12
+import pyuv
  13
+
  14
+from flower.core import (channel, schedule, schedule_remove, getcurrent,
  15
+        get_scheduler, tasklet, bomb)
  16
+from flower.core.uv import uv_server
  17
+from flower.io import wait_read, wait_write
  18
+from flower.net.base import IConn, Listener, IListen, NoMoreListener
  19
+from flower.net.util import parse_address, is_ipv6
  20
+
  21
+IS_WINDOW = sys.platform == 'win32'
  22
+
  23
+if IS_WINDOW:
  24
+    from errno import WSAEWOULDBLOCK as EWOULDBLOCK
  25
+    EAGAIN = EWOULDBLOCK
  26
+else:
  27
+    from errno import EINVAL
  28
+    from errno import EWOULDBLOCK
  29
+
  30
+try:
  31
+    from errno import EBADF
  32
+except ImportError:
  33
+    EBADF = 9
  34
+
  35
+# sys.exc_clear was removed in Python3 as the except block of a try/except
  36
+# performs the same task. Add it as a no-op method.
  37
+try:
  38
+    sys.exc_clear
  39
+except AttributeError:
  40
+    def exc_clear():
  41
+        return
  42
+    sys.exc_clear = exc_clear
  43
+
  44
+if sys.version_info < (2, 7, 0, 'final'):
  45
+    # in python 2.6 socket.recv_into doesn't support bytesarray
  46
+    import array
  47
+
  48
+    def recv_into(sock, b):
  49
+        l = max(len(b), DEFAULT_BUFFER_SIZE)
  50
+        buf = sock.recv(l)
  51
+        recved = len(buf)
  52
+        b[0:recved] = buf
  53
+        return recved
  54
+else:
  55
+    def recv_into(sock, b):
  56
+        return sock.recv_into(b)
  57
+
  58
+# from gevent code
  59
+if sys.version_info[:2] < (2, 7):
  60
+    _get_memory = buffer
  61
+elif sys.version_info[:2] < (3, 0):
  62
+    def _get_memory(string, offset):
  63
+        try:
  64
+            return memoryview(string)[offset:]
  65
+        except TypeError:
  66
+            return buffer(string, offset)
  67
+else:
  68
+    def _get_memory(string, offset):
  69
+        return memoryview(string)[offset:]
  70
+
  71
+
  72
+class SockConn(IConn):
  73
+
  74
+    def __init__(self, client, laddr, addr):
  75
+        # set connection info
  76
+        self.client = client
  77
+        self.client.setblocking(0)
  78
+        self.timeout = socket.getdefaulttimeout()
  79
+        self.laddr = laddr
  80
+        self.addr = addr
  81
+
  82
+        # utilies used to fetch & send ata
  83
+        self.cr = channel() # channel keeping readers waiters
  84
+        self.cw = channel() # channel keeping writers waiters
  85
+        self.queue = deque() # queue of readable data
  86
+        self.uv = uv_server()
  87
+        self.rpoller = None
  88
+        self.wpoller = None
  89
+        self._lock = threading.RLock()
  90
+        self.ncr = 0 # reader refcount
  91
+        self.ncw = 0 # writer refcount
  92
+
  93
+        self.closing = False
  94
+
  95
+
  96
+    def read(self):
  97
+        if self.closing:
  98
+            return ""
  99
+
  100
+        while True:
  101
+            try:
  102
+                retval = self.queue.popleft()
  103
+                if self.cr.balance < 0:
  104
+                    self.cr.send(retval)
  105
+
  106
+                if isinstance(retval, bomb):
  107
+                    retval.raise_()
  108
+
  109
+                return retval
  110
+            except IndexError:
  111
+                pass
  112
+
  113
+            msg = None
  114
+            buf = bytearray(DEFAULT_BUFFER_SIZE)
  115
+            try:
  116
+                recvd = recv_into(self.client, buf)
  117
+                msg =  bytes(buf[0:recvd])
  118
+            except socket.error:
  119
+                ex = sys.exc_info()[1]
  120
+                if ex.args[0] == EBADF:
  121
+                    msg = ""
  122
+                    self.closing = True
  123
+                if ex.args[0] != EWOULDBLOCK or self.timeout == 0.0:
  124
+                    msg = bomb(ex, sys.exc_info()[2])
  125
+                    self.closing = True
  126
+                exc_clear()
  127
+
  128
+            if msg is None:
  129
+                res = self._watch_read()
  130
+                if res is not None:
  131
+                    self.queue.append(res)
  132
+
  133
+            else:
  134
+                self.queue.append(msg)
  135
+
  136
+    def write(self, data):
  137
+        data_sent = 0
  138
+        while data_sent < len(data):
  139
+            data_sent += self._send(_get_memory(data, data_sent))
  140
+
  141
+    def writelines(self, data):
  142
+        for s in seq:
  143
+            self.write(data)
  144
+
  145
+    def local_addr(self):
  146
+        return self.laddr
  147
+
  148
+    def remote_addr(self):
  149
+        return self.addr
  150
+
  151
+    def close(self):
  152
+        self.client.close()
  153
+
  154
+        # stop polling
  155
+        if self.wpoller is not None:
  156
+            self.wpoller.stop()
  157
+            self.wpoller = None
  158
+
  159
+        if self.rpoller is not None:
  160
+            self.rpoller.stop()
  161
+            self.rpoller = None
  162
+
  163
+    def _watch_read(self):
  164
+        self._lock.acquire()
  165
+        if not self.rpoller:
  166
+            self.rpoller = pyuv.Poll(self.uv.loop, self.client.fileno())
  167
+            self.rpoller.start(pyuv.UV_READABLE, self._on_read)
  168
+
  169
+        # increase the reader refcount
  170
+        self.ncr += 1
  171
+        self._lock.release()
  172
+        try:
  173
+            self.cr.receive()
  174
+        finally:
  175
+            self._lock.acquire()
  176
+            # decrease the refcount
  177
+            self.ncr -= 1
  178
+            # if no more waiters, close the poller
  179
+            if self.ncr <= 0:
  180
+                self.rpoller.stop()
  181
+                self.rpoller = None
  182
+            self._lock.release()
  183
+
  184
+    def _on_read(self, handle, events, error):
  185
+        if error and error is not None:
  186
+            self.readable = False
  187
+            if errno == 1:
  188
+                self.closing = True
  189
+                msg = ""
  190
+            else:
  191
+                msg = bomb(IOError, IOError("uv error: %s" % error))
  192
+        else:
  193
+            self.readable = True
  194
+
  195
+        self.cr.send(None)
  196
+
  197
+    def _send(self, data):
  198
+        while True:
  199
+            try:
  200
+               return self.client.send(data)
  201
+            except socket.error:
  202
+                ex = sys.exc_info()[1]
  203
+                if ex.args[0] == EBADF:
  204
+                    self.closing = True
  205
+                    return
  206
+                if ex.args[0] != EWOULDBLOCK or self.timeout == 0.0:
  207
+                    raise
  208
+                exc_clear()
  209
+
  210
+            # wait for newt write
  211
+            self._watch_write()
  212
+
  213
+    def _watch_write(self):
  214
+        self._lock.acquire()
  215
+
  216
+        # create a new poller
  217
+        if not self.wpoller:
  218
+            self.wpoller = pyuv.Poll(self.uv.loop, self.client.fileno())
  219
+            self.wpoller.start(pyuv.UV_WRITABLE, self._on_write)
  220
+
  221
+        # increase the writer refcount
  222
+        self.ncw += 1
  223
+
  224
+        self._lock.release()
  225
+
  226
+        try:
  227
+            self.cw.receive()
  228
+        finally:
  229
+            self._lock.acquire()
  230
+            self.ncw -= 1
  231
+            if self.ncw <= 0:
  232
+                self.wpoller.stop()
  233
+                self.wpoller = None
  234
+            self._lock.release()
  235
+
  236
+
  237
+    def _on_write(self, handle, events, errors):
  238
+        if not errors:
  239
+            self.cw.send()
  240
+
  241
+    def _read(self):
  242
+        buf = bytearray(DEFAULT_BUFFER_SIZE)
  243
+        try:
  244
+            recvd = recv_into(self.client, buf)
  245
+            msg =  bytes(buf[0:recvd])
  246
+        except socket.error:
  247
+            ex = sys.exc_info()[1]
  248
+            if ex.args[0] == EBADF:
  249
+                msg =  ""
  250
+            if ex.args[0] != EWOULDBLOCK or self.timeout == 0.0:
  251
+                msg = bomb(ex, sys.exc_info()[2])
  252
+            exc_clear()
  253
+        return msg
  254
+
  255
+
  256
+class TCPSockListen(IListen):
  257
+
  258
+    def __init__(self, addr, *args, **kwargs):
  259
+
  260
+        sock = None
  261
+        fd = None
  262
+        if "sock" in kwargs:
  263
+            # we passed a socket in the kwargs, just use it
  264
+            sock = kwargs['sock']
  265
+            fd = sock.fileno()
  266
+        elif isinstance(addr, int):
  267
+            # we are reusing a socket here
  268
+            fd = addr
  269
+            if "family" not in kwargs:
  270
+                family = socket.AF_INET
  271
+            else:
  272
+                family = kwargs['family']
  273
+            sock = socket.fromfd(fd, family, socket.SOCK_STREAM)
  274
+        else:
  275
+            # new socket
  276
+            addr = parse_address(addr)
  277
+            if is_ipv6(addr[0]):
  278
+                family = socket.AF_INET6
  279
+            else:
  280
+                family = socket.AF_INET
  281
+
  282
+            sock = socket.socket(family, socket.SOCK_STREAM)
  283
+            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  284
+
  285
+            nodelay = kwargs.get('nodelay', True)
  286
+            if family == socket.AF_INET and nodelay:
  287
+                sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
  288
+
  289
+            sock.bind(addr)
  290
+            sock.setblocking(0)
  291
+            fd = sock.fileno()
  292
+
  293
+        self.sock = sock
  294
+        self.fd = fd
  295
+        self.addr = addr
  296
+        self.backlog = kwargs.get('backlog', 128)
  297
+        self.timeout = socket.getdefaulttimeout()
  298
+        self.uv = uv_server()
  299
+        self.poller = None
  300
+        self.listeners = deque()
  301
+        self.task = getcurrent()
  302
+
  303
+        # start to listen
  304
+        self.sock.listen(self.backlog)
  305
+
  306
+    def accept(self):
  307
+        """ start the accept loop. Let the OS handle accepting balancing
  308
+        between listeners """
  309
+
  310
+        if self.poller is None:
  311
+            self.poller = pyuv.Poll(self.uv.loop, self.fd)
  312
+            self.poller.start(pyuv.UV_READABLE, self._on_read)
  313
+
  314
+        listener = Listener()
  315
+        self.listeners.append(listener)
  316
+        return listener.c.receive()
  317
+
  318
+    def addr(self):
  319
+        return self.addr
  320
+
  321
+    def close(self):
  322
+        if self.poller is not None:
  323
+            self.poller.stop()
  324
+        self.sock.close()
  325
+
  326
+    def _on_read(self, handle, events, error):
  327
+        if error:
  328
+            handle.stop()
  329
+            self.poller = None
  330
+        else:
  331
+            res = None
  332
+            try:
  333
+                res = self.sock.accept()
  334
+            except socket.error:
  335
+                exc_info = sys.exc_info()
  336
+                ex = exc_info[1]
  337
+                if ex.args[0] != EWOULDBLOCK or self.timeout == 0.0:
  338
+                    self.task.throw(*exc_info)
  339
+                exc_clear()
  340
+
  341
+            if res is not None:
  342
+                client, addr = res
  343
+                self._on_connection(client, addr)
  344
+
  345
+    def _on_connection(self, client, addr):
  346
+        if len(self.listeners):
  347
+            listener = self.listeners.popleft()
  348
+
  349
+            self.uv.wakeup()
  350
+
  351
+            # return a new connection object to the listener
  352
+            conn =  SockConn(client, self.addr, addr)
  353
+            listener.c.send((conn, None))
  354
+            schedule()
  355
+        else:
  356
+            # we should probably do something there to drop connections
  357
+            self.task.throw(NoMoreListener)
  358
+
  359
+class PipeSockListen(TCPSockListen):
  360
+
  361
+    def __init__(self, addr, *args, **kwargs):
  362
+        fd = kwargs.get('fd')
  363
+        if fd is None:
  364
+            try:
  365
+                os.remove(addr)
  366
+            except OSError:
  367
+                pass
  368
+
  369
+        sock = socket.socket(family, socket.SOCK_STREAM)
  370
+        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  371
+
  372
+        nodelay = kwargs.get('nodelay', True)
  373
+        if family == socket.AF_INET and nodelay:
  374
+            sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
  375
+
  376
+        if fd is None:
  377
+            sock.bind(addr)
  378
+        sock.setblocking(0)
  379
+
  380
+        self.sock = sock
  381
+        self.fd = fd
  382
+        self.addr = addr
  383
+        self.backlog = kwargs.get('backlog', 128)
  384
+        self.timeout = socket.getdefaulttimeout()
  385
+
  386
+        # start to listen
  387
+        self.sock.listen(self.backlog)
3  flower/net/tcp.py
@@ -30,6 +30,9 @@ def read(self):
30 30
         self.client.loop.update_time()
31 31
         try:
32 32
             retval = self.queue.popleft()
  33
+            if self.cr.balance < 0:
  34
+                self.cr.send(retval)
  35
+
33 36
             if isinstance(retval, bomb):
34 37
                 retval.raise_()
35 38
             return retval
15  flower/net/udp.py
@@ -2,6 +2,7 @@
2 2
 #
3 3
 # This file is part of flower. See the NOTICE for more information.
4 4
 
  5
+from collections import deque
5 6
 from threading import Lock
6 7
 
7 8
 from flower.core import channel, schedule, getcurrent
@@ -18,12 +19,24 @@ def __init__(self, addr, raddr, client=None):
18 19
         else:
19 20
             self.client = client
20 21
         self.reading = true
  22
+        self.queue = deque()
21 23
         self.cr = channel
22 24
         self._raddr = raddr
23 25
         self.addr = addr
24 26
 
25 27
     def read(self):
26  
-         return self.cr.receive()
  28
+        try:
  29
+            retval = self.queue.popleft()
  30
+            if self.cr.balance < 0:
  31
+                self.cr.send(retval)
  32
+
  33
+            if isinstance(retval, bomb):
  34
+                retval.raise_()
  35
+            return retval
  36
+        except IndexError:
  37
+            pass
  38
+
  39
+        return self.cr.receive()
27 40
 
28 41
     def write(self, data):
29 42
         self.client.send(self._remote_addr, data)
2  setup.py
@@ -35,7 +35,7 @@
35 35
         ]
36 36
 
37 37
 
38  
-setup(name='pyflower',
  38
+setup(name='tulip',
39 39
       version='0.1.0',
40 40
       description = 'collection of modules to build distributed and reliable concurrent systems',
41 41
       long_description = long_description,

0 notes on commit abeed04

Saúl Ibarra Corretgé

pyuv errnos have UV_ prefix, that would be UV_EBADF

Please sign in to comment.
Something went wrong with that request. Please try again.