Skip to content
This repository has been archived by the owner on Dec 10, 2018. It is now read-only.

Commit

Permalink
Add a mostly cythonized version of the framed transport.
Browse files Browse the repository at this point in the history
Use the TCyBufferedTransport to wrap the framed transport. This gets most of the performance since there's ~ 1 call to read/write to the framed transport anyway.
Also simplify TFramedTransport - don't buffer writes; just wrap the transport in a buffered transport.
Also don't rewrite sendall.
  • Loading branch information
Mike Kaplinskiy committed Dec 1, 2014
1 parent 4ab64d5 commit 0cf1024
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 26 deletions.
13 changes: 11 additions & 2 deletions tests/test_framed_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
import thriftpy
from thriftpy.tornado import make_server
from thriftpy.rpc import make_client
from thriftpy.transport import TFramedTransportFactory
from thriftpy.transport import (
TFramedTransportFactory,
TCyFramedTransportFactory,
)

logging.basicConfig(level=logging.INFO)

Expand Down Expand Up @@ -47,6 +50,8 @@ def get(self, name):


class FramedTransportTestCase(TestCase):
TRANSPORT_FACTORY = TFramedTransportFactory()

def mk_server(self):
self.io_loop = ioloop.IOLoop()
server = make_server(addressbook.AddressBookService,
Expand All @@ -70,7 +75,7 @@ def listen(self):
def mk_client(self):
return make_client(addressbook.AddressBookService,
'127.0.0.1', self.port,
trans_factory=TFramedTransportFactory())
trans_factory=self.TRANSPORT_FACTORY)

def setUp(self):
self.mk_server()
Expand All @@ -90,3 +95,7 @@ def test_zero_length_string(self):
assert success
success = self.client.get(name='')
assert success


class CyFramedTransportTestCase(FramedTransportTestCase):
TRANSPORT_FACTORY = TCyFramedTransportFactory()
7 changes: 6 additions & 1 deletion thriftpy/transport/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@

from thriftpy._compat import PYPY
if not PYPY:
from .cytransport import TCyBufferedTransport, TCyBufferedTransportFactory
from .cytransport import (
TCyBufferedTransport,
TCyBufferedTransportFactory,
TCyFramedTransportFactory,
)
else:
TCyBufferedTransport = TBufferedTransport
TCyBufferedTransportFactory = TBufferedTransportFactory
TCyFramedTransportFactory = TFramedTransportFactory
11 changes: 8 additions & 3 deletions thriftpy/transport/cytransport.pyx
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from libc.stdlib cimport malloc, free
from libc.string cimport memcpy, memmove

from ..transport import TTransportException
from ..transport import TFramedTransport, TTransportException

DEF DEFAULT_BUFFER = 4096
DEF MIN_BUFFER_SZIE = 1024
DEF MIN_BUFFER_SIZE = 1024


cdef class TCyBuffer(object):
Expand Down Expand Up @@ -50,7 +50,7 @@ cdef class TCyBufferedTransport(object):
"""binary reader/writer"""

def __init__(self, trans, int buf_size=DEFAULT_BUFFER):
if buf_size < MIN_BUFFER_SZIE:
if buf_size < MIN_BUFFER_SIZE:
raise Exception("buffer too small")

self.trans = trans
Expand Down Expand Up @@ -136,6 +136,7 @@ cdef class TCyBufferedTransport(object):
if self.wbuf.data_size > 0:
data = self.wbuf.buf[:self.wbuf.data_size]
self.trans.write(data)
self.trans.flush()
self.wbuf.clean()

def getvalue(self):
Expand All @@ -145,3 +146,7 @@ cdef class TCyBufferedTransport(object):
class TCyBufferedTransportFactory(object):
def get_transport(self, trans):
return TCyBufferedTransport(trans)

class TCyFramedTransportFactory(object):
def get_transport(self, trans):
return TCyBufferedTransport(TFramedTransport(trans))
10 changes: 1 addition & 9 deletions thriftpy/transport/socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,7 @@ def write(self, buff):
if not self.handle:
raise TTransportException(type=TTransportException.NOT_OPEN,
message='Transport not open')
sent = 0
have = len(buff)
while sent < have:
plus = self.handle.send(buff)
if plus == 0:
raise TTransportException(type=TTransportException.END_OF_FILE,
message='TSocket sent 0 bytes')
sent += plus
buff = buff[plus:]
self.handle.sendall(buff)

def flush(self):
pass
Expand Down
18 changes: 7 additions & 11 deletions thriftpy/transport/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ class TFramedTransport(TTransportBase):
def __init__(self, trans):
self.__trans = trans
self.__rbuf = BytesIO()
self.__wbuf = BytesIO()

def is_open(self):
return self.__trans.is_open()
Expand Down Expand Up @@ -174,23 +173,20 @@ def read_frame(self):
self.__rbuf = BytesIO(self.__trans.read(sz))

def write(self, buf):
self.__wbuf.write(buf)

def flush(self):
wout = self.__wbuf.getvalue()
wsz = len(wout)
# reset wbuf before write/flush to preserve state on underlying failure
self.__wbuf = BytesIO()
wsz = len(buf)
# N.B.: Doing this string concatenation is WAY cheaper than making
# two separate calls to the underlying socket object. Socket writes in
# Python turn out to be REALLY expensive, but it seems to do a pretty
# good job of managing string buffer operations without excessive
# copies
buf = struct.pack("!i", wsz) + wout
self.__trans.write(buf)
towrite = struct.pack("!i", wsz) + buf
self.__trans.write(towrite)
self.__trans.flush()

def flush(self):
pass


class TFramedTransportFactory(object):
def get_transport(self, trans):
return TFramedTransport(trans)
return TBufferedTransport(TFramedTransport(trans))

0 comments on commit 0cf1024

Please sign in to comment.