Skip to content

Commit

Permalink
add Dial feature.
Browse files Browse the repository at this point in the history
A Dial is a generic client for stream-oriented protocols.

    Example::

        conn, err = Dial("tcp", ('127.0.0.1', 8000))
        conn.write("hello")
        print(conn.read()
  • Loading branch information
benoitc committed Sep 11, 2012
1 parent be1b0da commit 95ac73b
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 53 deletions.
20 changes: 20 additions & 0 deletions examples/echo_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from flower import tasklet, run, schedule
from flower.net import Dial


# connect to the remote server
conn = Dial("tcp", ('127.0.0.1', 8000))

# start to handle the connection
# we send a string to the server and fetch the
# response back

for i in range(3):
msg = "hello"
print("sent %s" % msg)
resp = conn.write(msg)
ret = conn.read()
print("echo: %s" % ret)

conn.close()
run()
2 changes: 1 addition & 1 deletion examples/echo_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ def handle_connection(conn):
data = conn.read()
if not data:
break

conn.write(data)


# Listen on tcp port 8000 on localhost
l = Listen(('127.0.0.1', 8000), "tcp")

try:
while True:
try:
Expand Down
56 changes: 37 additions & 19 deletions flower/net/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,41 @@
#
# This file is part of flower. See the NOTICE for more information.

from flower.net.base import Listen
from flower.net.tcp import TCPListen
from flower.net.udp import UDPListen
from flower.net.pipe import PIPEListen
from flower.net.base import IListen
from flower.net.tcp import TCPListen, dial_tcp
from flower.net.udp import UDPListen, dial_udp
from flower.net.pipe import PipeListen, dial_pipe

UV_HANDLERS = dict(
LISTEN_HANDLERS = dict(
tcp = TCPListen,
udp = UDPListen,
pipe = PIPEListen)
pipe = PipeListen)

class Listen(Listen):
DIAL_HANDLERS = dict(
tcp = dial_tcp,
udp = dial_udp,
pipe = dial_pipe)


def Dial(proto, *args):
""" A Dial is a generic client for stream-oriented protocols.
Example::
conn, err = Dial("tcp", ('127.0.0.1', 8000))
conn.write("hello")
print(conn.read())
"""

try:
dial_func = DIAL_HANDLERS[proto]
except KeyError:
raise ValueError("type should be tcp, udp or unix")
return dial_func(*args)

dial = Dial # for pep8 lovers

def Listen(addr=('0.0.0.0', 0), proto="tcp", *args):
"""A Listener is a generic network listener for stream-oriented protocols.
Multiple tasks may invoke methods on a Listener simultaneously.
Expand Down Expand Up @@ -40,17 +64,11 @@ def handle_connection(conn):
run()
"""

try:
listen_class = LISTEN_HANDLERS[proto]
except KeyError:
raise ValueError("type should be tcp, udp or unix")

def __init__(self, addr=('0.0.0.0', 0), proto="tcp", *args):
try:
self.listen_class = UV_HANDLERS[proto]
except KeyError:
raise ValueError("type should be tcp, udp or unix")

self.listen_handle = self.listen_class(addr, *args)

def accept(self):
return self.listen_handle.accept()
return listen_class(addr, *args)

def close(self):
return self.listen_handle.close()
listen = Listen # for pep8 lovers
23 changes: 21 additions & 2 deletions flower/net/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def loop(self):
return self.uv.loop


class Conn(object):
class IConn(object):
""" connection interface """

def read(self):
Expand All @@ -41,7 +41,22 @@ def local_addr(self):
def remote_addr(self):
""" return the remote address """

class Listen(object):
@property
def status(self):
""" return current status """
if self.client.closed:
return "closed"
elif self.client.readable and self.client.writable:
return "open"
elif self.client.readable and not self.client.writable:
return "readonly"
elif not self.client.readable and self.client.writable:
return "writeonly"
else:
return "closed"


class IListen(object):

def accept(self):
""" accept a connection. Return a Conn instance. It always
Expand All @@ -52,3 +67,7 @@ def close(self):

def addr(self):
" return the bound address """

class IDial(object):

""" base interface for Dial class """
24 changes: 18 additions & 6 deletions flower/net/pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,24 @@

from flower.net.tcp import TCPListen, TCPConn

class PIPEConn(TCPConn):
""" A PIPE connection """
class PipeConn(TCPConn):
""" A Pipe connection """


class PIPEListen(TCPListen):
""" A PIPE listener """
class PipeListen(TCPListen):
""" A Pipe listener """

CONN_CLASS = PIPEConn
HANDLER_CLASS = pyuv.PIPE
CONN_CLASS = PipeConn
HANDLER_CLASS = pyuv.Pipe

def dial_pipe(addr):
uv = uv_server()
h = pyuv.Pipe(uv.loop)

c = channel()
def _on_connect(handle, error):
c.send((handle, error))

h.connect(addr, _on_connect)
h1, error = c.receive()
return (PipeConn(h1), error)
66 changes: 56 additions & 10 deletions flower/net/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,40 @@
#
# This file is part of flower. See the NOTICE for more information.

import os
from collections import deque
import threading

import pyuv


from flower.core.uv import uv_server
from flower.core.util import nanotime
from flower.core import (channel, schedule, schedule_remove, getcurrent,
get_scheduler, tasklet)
from flower.net.base import Conn, Listener, Listen, NoMoreListener
get_scheduler, tasklet, bomb)
from flower.net.base import IConn, Listener, IListen, NoMoreListener

class TCPConn(Conn):
class TCPConn(IConn):

def __init__(self, client):
self.client = client
self.reading = False
self.cr = channel()
self._read_task = None
self.queue = deque()

def read(self):
if not self.reading:
self.client.start_read(self._on_read)
self.reading = True
self.client.start_read(self._on_read)

self.client.loop.update_time()
try:
retval = self.queue.popleft()
if isinstance(retval, bomb):
retval.raise_()
return retval
except IndexError:
pass

return self.cr.receive()

def write(self, data):
Expand All @@ -32,6 +44,15 @@ def write(self, data):
def writelines(self, seq):
return self.client.writelines(seq)

def _wait_write(self, func, data):
c = channel()
def _wait_cb(handle, err):
c.send(True)

func(data, _wait_cb)
c.receive()


def local_address(self):
return self.client.getsockame()

Expand All @@ -43,12 +64,21 @@ def close(self):

def _on_read(self, handle, data, error):
if error:
self.cr.send_exception(IOError(error))
if error == 1: # EOF
msg = ""
else:
msg = bomb(IOError, IOError("uv error: %s" % error))
else:
self.cr.send(data)
schedule()
msg = data

class TCPListen(Listen):
# append the message to the queue
self.queue.append(msg)

if self.cr.balance < 0:
# someone is waiting, return last message
self.cr.send(self.queue.popleft())

class TCPListen(IListen):
""" A TCP listener """

CONN_CLASS = TCPConn # connection object returned
Expand Down Expand Up @@ -92,3 +122,19 @@ def on_connection(self, server, error):
else:
# we should probably do something there to drop connections
self.task.throw(NoMoreListener)


def dial_tcp(addr):
uv = uv_server()
h = pyuv.TCP(uv.loop)

c = channel()
def _on_connect(handle, error):
if error:
c.send_exception(OError, "uv error: %s" % error)
else:
c.send(handle)

h.connect(addr, _on_connect)
h1 = c.receive()
return TCPConn(h1)
50 changes: 35 additions & 15 deletions flower/net/udp.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,20 @@

from flower.core import channel, schedule, getcurrent
from flower.core.uv import uv_server
from flower.net.base import Listener, Conn, Listen, NoMoreListener
from flower.net.base import Listener, IConn, IListen, IDial, NoMoreListener

class UDPConn(Conn):
class UDPConn(IConn):

def __init__(self, addr, bind_addr, client=None):
def __init__(self, addr, raddr, client=None):
self.uv = uv_server()
if client is None:
self.client = pyuv.UDP(self.uv.loop)
self.client.bind(bind_addr)
self.client.bind(raddr)
else:
self.client = client
self.reading = true
self.cr = channel
self._bind_addr = bind_addr
self._raddr = raddr
self.addr = addr

def read(self):
Expand All @@ -37,7 +37,7 @@ def local_addr(self):
def remote_addr(self):
return self.remote_addr

class UDPListen(Listen):
class UDPListen(IListen):

def __init__(self, addr=('0.0.0.0', 0)):
# listeners are all couroutines waiting for a connections
Expand All @@ -63,23 +63,43 @@ def on_recv(self, handler, addr, data, err):
with self._lock:
if addr in self.conns:
conn = self.conns[addr]
if conn.cr.balance < 0:
conn.cr.send(data, err)

if error:
if error == 1:
msg = ""
else:
msg = bomb(IOError, IOError("uv error: %s" % error))
else:
tasklet(conn.cr.send)(data, err)
msg = data

# emit last message
conn.queue.append(msg)
if conn.cr.balance < 0:
# someone is waiting, return last message
conn.cr.send(self.queue.popleft())

elif len(self.listeners):
listener = self.listeners.popleft()
conn = UDPConn(addr)
self.conns[addr] = conn

# send the result async waiting someone eventually read
# the connection. Eventually the listener
tasklet(conn.cr.send)(data, err)
if error:
listener.c.send_exception(IOError, "uv error: %s" % error)
else:
conn = UDPConn(addr)
conn.queue.append(data)
self.conns[addr] = conn
listener.c.send(conn, error)
else:
# we should probably do something there to drop connections
self.task.throw(NoMoreListener)


schedule()

def addr(self):
return self.handler.getsockname()

def dial_udp(laddr, raddr):
uv = uv_server()
h = pyuv.UDP(uv.loop)
h.bind(laddr)

return (UDPConn(laddr, raddr, h), None)

0 comments on commit 95ac73b

Please sign in to comment.