Permalink
Browse files

Rename tcp.py to aio.py

  • Loading branch information...
1 parent b12dfd1 commit 46a4c5319c617910d9a09d6330e515bf46bd6b66 Ben Weaver committed Feb 4, 2010
Showing with 55 additions and 28 deletions.
  1. +1 −1 xmpp/__init__.py
  2. +22 −13 xmpp/{tcp.py → aio.py}
  3. +0 −1 xmpp/core.py
  4. +27 −8 xmpp/readstream.py
  5. +5 −5 xmpp/xmppstream.py
View
@@ -1,6 +1,6 @@
from __future__ import absolute_import
from .interfaces import *
-from .tcp import *
+from .aio import *
from .readstream import *
from .xmppstream import *
from .state import *
@@ -1,15 +1,16 @@
## Copyright (c) 2010, Coptix, Inc. All rights reserved.
## See the LICENSE file for license terms and warranty disclaimer.
-"""tcpserver -- a simple tcp server"""
+"""aio -- asynchronous IO"""
from __future__ import absolute_import
import socket, ssl, select, errno, logging, fcntl
from tornado import ioloop
__all__ = (
- 'TCPServer', 'TCPClient', 'starttls', 'is_ssl',
- 'event_loop', 'start', 'IOLoop'
+ 'TCPServer', 'TCPClient', 'SocketError', 'would_block', 'in_progress',
+ 'starttls', 'is_ssl',
+ 'loop', 'start', 'IOLoop'
)
class TCPServer(object):
@@ -41,7 +42,7 @@ def loop():
def __init__(self, handler, io=None):
self.handler = handler
- self.io = io or event_loop()
+ self.io = io or loop()
self.socket = None
def stop(self):
@@ -79,8 +80,8 @@ def _accept(self, fd, events):
while True:
try:
conn, addr = self.socket.accept()
- except socket.error as exc:
- if exc[0] not in (errno.EWOULDBLOCK, errno.EAGAIN):
+ except SocketError as exc:
+ if not would_block(exc):
raise
return
try:
@@ -114,7 +115,7 @@ def handle(data):
"""
def __init__(self, handler, io=None):
self.handler = handler
- self.io = io or event_loop()
+ self.io = io or loop()
self.socket = None
self.address = None
@@ -131,8 +132,8 @@ def connect(self, addr, port):
try:
self.address = (addr, int(port))
sock.connect(self.address)
- except socket.error as exc:
- if exc[0] != errno.EINPROGRESS:
+ except SocketError as exc:
+ if not in_progress(exc):
raise
self.socket = sock
@@ -152,6 +153,14 @@ def _ready(self, fd, events):
)
self.stop()
+SocketError = socket.error
+
+def would_block(exc):
+ return exc[0] in (errno.EWOULDBLOCK, errno.EAGAIN)
+
+def in_progress(exc):
+ return exc[0] == errno.EINPROGRESS
+
### TLS
@@ -259,7 +268,7 @@ def send(self, data, flags=0):
return self.write(data)
except ssl.SSLError as exc:
if exc.args[0] in (ssl.SSL_ERROR_WANT_WRITE, ssl.SSL_ERROR_WANT_READ):
- raise socket.error(errno.EAGAIN)
+ raise SocketError(errno.EAGAIN)
raise
def recv(self, buflen=1024, flags=0):
@@ -274,20 +283,20 @@ def recv(self, buflen=1024, flags=0):
return self.read(buflen)
except ssl.SSLError as exc:
if exc.args[0] == ssl.SSL_ERROR_WANT_READ:
- raise socket.error(errno.EAGAIN)
+ raise SocketError(errno.EAGAIN)
raise
### IO Loop
-def event_loop():
+def loop():
return IOLoop.instance()
def start(services=(), io=None):
"""Start an event loop. If services are given, start them before
starting the loop and stop them before stopping the loop."""
- io = io or event_loop()
+ io = io or loop()
for svc in services:
svc.start()
View
@@ -314,7 +314,6 @@ def _close(self):
## self.parser.close()
try:
self.state.clear()
- #self.stream.write('', self.stream.close)
self.stream.shutdown()
finally:
self.stream = None
View
@@ -4,8 +4,7 @@
"""readstream -- non-blocking unbuffered reads / buffered writes"""
from __future__ import absolute_import
-import socket, errno
-from . import tcp
+from . import aio
from .prelude import *
__all__ = ('ReadStream', )
@@ -16,7 +15,7 @@ class ReadStream(object):
def __init__(self, socket, io=None, read_chunk_size=4096):
self.socket = socket
- self.io = io or tcp.event_loop()
+ self.io = io or aio.loop()
self._state = io.ERROR
self._read_chunk_size = read_chunk_size
@@ -29,18 +28,29 @@ def __init__(self, socket, io=None, read_chunk_size=4096):
self.io.add_handler(socket.fileno(), self._handle, self._state)
def read(self, reader):
+ """Add a reader to this stream. There can only be one reader
+ at a time; it is called with each chunk received from the
+ socket."""
+
assert not self._reader, "There's already a reader installed."
self._reader = reader
self._add_io_state(self.io.READ)
return self
def write(self, data, callback=None):
+ """Write data to the stream. The data is sent immediately;
+ any data that cannot be sent is buffered. Once the write
+ buffer is emptied, the optional callback is called."""
+
self._wb += data
self._write_callback = callback
self._wb and self._write()
return self
def shutdown(self, callback=None):
+ """Close this stream once the write buffer is emptied and
+ optionally run callback."""
+
self._close_callback = callback
self._reader = None
if self._wb:
@@ -50,17 +60,26 @@ def shutdown(self, callback=None):
return self
def close(self):
+ """Immediately close the stream."""
+
if self.socket:
self.io.remove_handler(self.socket.fileno())
self.socket.close()
self.socket = None
+ self._close_callback and self._close_callback()
return self
def on_close(self, callback):
+ """Register a callback that is run after the stream is closed."""
+
self._close_callback = callback
return self
def starttls(self, callback=None, **options):
+ """Begin TLS negotiation; options are passed through to
+ do_handshake(). If callback is given, it is called after a
+ successful negotiation."""
+
## Delay starttls until the write-buffer is emptied.
if self._wb:
self._write_callback = partial(self.starttls, callback, **options)
@@ -76,7 +95,7 @@ def failure(socket):
## Wrap the socket; give startttls() control until the
## handshake is finished.
- tcp.starttls(
+ aio.starttls(
self.socket, self._handle, self._state, self.io,
success=success,
failure=failure,
@@ -113,8 +132,8 @@ def _handle(self, fd, events):
def _read(self):
try:
chunk = self.socket.recv(self._read_chunk_size)
- except socket.error as exc:
- if exc[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
+ except aio.SocketError as exc:
+ if aio.would_block(exc):
return
else:
self.close()
@@ -131,8 +150,8 @@ def _write(self):
try:
sent = self.socket.send(self._wb)
self._wb = self._wb[sent:]
- except socket.error as exc:
- if exc[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
+ except aio.SocketError as exc:
+ if aio.would_block(exc):
break
else:
self.close()
View
@@ -81,11 +81,11 @@ def start(self, name, attrs, nsmap):
if self.stack:
## A <stream:stream> has already been received. This is
## the beginning of a stanza or part of a stanza.
- # if len(self.stack) == 1 and not self.core.is_stanza(name):
- # raise StreamError(
- # 'unsupported-stanza-type',
- # 'Unrecognized stanza %r.' % name
- # )
+ if len(self.stack) == 1 and not self.core.is_stanza(name):
+ raise StreamError(
+ 'unsupported-stanza-type',
+ 'Unrecognized stanza %r.' % name
+ )
parent = self.stack[-1]
self.stack.append(xml.SubElement(parent, name, attrs, nsmap))
elif name == self.STREAM:

0 comments on commit 46a4c53

Please sign in to comment.