Skip to content

Commit

Permalink
Merge 06490cb into 4d2a4ff
Browse files Browse the repository at this point in the history
  • Loading branch information
wooparadog committed Jan 10, 2019
2 parents 4d2a4ff + 06490cb commit e359185
Show file tree
Hide file tree
Showing 9 changed files with 249 additions and 48 deletions.
33 changes: 17 additions & 16 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,27 +1,28 @@
language: python
sudo: required
dist: xenial

matrix:
include:
- python: 2.7
env: TOXENV=py27
- python: 3.4
env: TOXENV=py34
- python: 3.5
env: TOXENV=py35
- python: 3.6
env: TOXENV=py36

install:
- pip install tox
env:
- TOXENV=py27
- TOXENV=py34
- TOXENV=py35

script:
- tox

cache:
directories:
- thrift-0.9.3

before_install:
- pyenv global system 3.5
- ls "thrift-0.9.3" | grep "bootstrap.sh" || wget https://github.com/apache/thrift/archive/0.9.3.tar.gz -O thrift.tar.gz
- ls "thrift-0.9.3" | grep "bootstrap.sh" || tar xf thrift.tar.gz
- cd thrift-0.9.3
- ./bootstrap.sh
- ./configure --without-haskell --without-java --without-php --without-nodejs --without-cpp --without-lua --without-perl --without-ruby --without-erlang --without-rust --without-c_glib --without-go
- make
- sudo make install
- sudo apt-get update
- sudo apt-get install -y thrift-compiler

after_success:
- pip install python-coveralls
- coveralls
12 changes: 10 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
gunicorn_thrift
===============

[![Build Status](https://travis-ci.org/eleme/gunicorn_thrift.svg?branch=master)](https://travis-ci.org/eleme/gunicorn_thrift)
[![Coverage Status](https://coveralls.io/repos/github/eleme/gunicorn_thrift/badge.svg?branch=master)](https://coveralls.io/github/eleme/gunicorn_thrift?branch=master)
[![Build Status](https://travis-ci.org/Thriftpy/gunicorn_thrift.svg?branch=master)](https://travis-ci.org/Thriftpy/gunicorn_thrift)
[![Coverage Status](https://coveralls.io/repos/github/Thriftpy/gunicorn_thrift/badge.svg?branch=master)](https://coveralls.io/github/Thriftpy/gunicorn_thrift?branch=master)

Thrift app and worker for gunicorn! Hence, a multi-process python thrift server!

Expand Down Expand Up @@ -96,6 +96,7 @@ There are 4 types of workers available.
* `thrift_gevent`: gevent worker.
* `thriftpy_sync`: sync worker, adapted for [`thriftpy2`](https://github.com/thriftpy/thriftpy2)
* `thriftpy_gevent`: gevent worker, adapted for [`thriftpy2`](https://github.com/thriftpy/thriftpy2)
* `thriftpy_thread`: thread worker, adapted for [`thriftpy2`](https://github.com/thriftpy/thriftpy2)

note: If you want to use `thriftpy_sync` or `thriftpy_gevent`, make sure the following:

Expand All @@ -107,6 +108,13 @@ note: If you want to use `thriftpy_sync` or `thriftpy_gevent`, make sure the fol
1. `thriftpy2.transport:TCyBufferedTransportFactory` or
1. `thriftpy2.transport:TBufferedTransportFactory`

### Threads

How many threads per worker. Only effective when using `thriftpy_thread` worker.

Parameter: `--threads`
Config file: `threads`
Default: 10

### Transport factory

Expand Down
3 changes: 2 additions & 1 deletion gunicorn_thrift/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@

from gunicorn import six
from gunicorn.config import Setting, validate_string, validate_pos_int,\
WorkerClass, validate_callable, validate_bool, validate_dict
WorkerClass, validate_callable, validate_bool, validate_dict, WorkerThreads

from .six import DEFAULT_WORKER, DEFAULT_TRANSPORT, DEFAULT_PROTOCOL


WorkerClass.default = DEFAULT_WORKER
WorkerThreads.default = 10


class ThriftTransportFactoryClass(Setting):
Expand Down
2 changes: 2 additions & 0 deletions gunicorn_thrift/six.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
'thrift_gevent': 'gunicorn_thrift.gevent_worker.GeventThriftWorker',
'thriftpy_sync': 'gunicorn_thrift.thriftpy_sync_worker.SyncThriftPyWorker',
'thriftpy_gevent': 'gunicorn_thrift.thriftpy_gevent_worker.GeventThriftPyWorker',
'thriftpy_thread': 'gunicorn_thrift.thriftpy_thread_worker.ThriftpyThreadWorker',
}
else:
DEFAULT_WORKER = "thriftpy_sync"
Expand All @@ -23,4 +24,5 @@
AVAILABLE_WORKERS = {
'thriftpy_sync': 'gunicorn_thrift.thriftpy_sync_worker.SyncThriftPyWorker',
'thriftpy_gevent': 'gunicorn_thrift.thriftpy_gevent_worker.GeventThriftPyWorker',
'thriftpy_thread': 'gunicorn_thrift.thriftpy_thread_worker.ThriftpyThreadWorker',
}
30 changes: 6 additions & 24 deletions gunicorn_thrift/thriftpy_gevent_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
'gevent is not installed')

try:
import thriftpy2
import thriftpy2 # noqa
except ImportError:
raise RuntimeError('`thriftpy_gevent` worker is unavailable because '
'thriftpy2 is not installed')
Expand All @@ -35,10 +35,12 @@
from thriftpy2.protocol.cybin import ProtocolError
from thriftpy2.thrift import TDecodeException

from gunicorn.errors import AppImportError
from gunicorn.workers.ggevent import GeventWorker

from .utils import ProcessorMixin
from .utils import (
ProcessorMixin,
check_protocol_and_transport_for_thriftpy_woker,
)

logger = logging.getLogger(__name__)

Expand All @@ -60,26 +62,6 @@ def __get_real_function(module, name):
_real_get_ident = __get_real_function(thread, 'get_ident')


def check_protocol_and_transport(app):
if not app.cfg.thrift_protocol_factory.startswith('thriftpy'):
raise AppImportError(
'Thriftpy worker can only use protocol from thriftpy,'
'specify `thrift_protocol_factory` as one of the '
'following:'
'`thriftpy2.protocol:TCyBinaryProtocolFactory`, '
'`thriftpy2.protocol:TBinaryProtocolFactory`'
)

if not app.cfg.thrift_transport_factory.startswith('thriftpy'):
raise AppImportError(
'Thriftpy worker can only use transport from thriftpy,'
'specify `thrift_transport_factory` as one of the '
'following:'
'`thriftpy2.transport:TCyBufferedTransportFactory`, '
'`thriftpy2.transport:TBufferedTransportFactory`'
)


class GeventThriftPyWorker(GeventWorker, ProcessorMixin):
def init_process(self):
# Set up a greenlet tracing hook to monitor for event-loop blockage,
Expand Down Expand Up @@ -154,7 +136,7 @@ def _check_greenlet_blocking(self):
self._greenlet_switch_counter = 0

def run(self):
check_protocol_and_transport(self.app)
check_protocol_and_transport_for_thriftpy_woker(self.app.cfg)
super(GeventThriftPyWorker, self).run()

def handle(self, listener, client, addr):
Expand Down
187 changes: 187 additions & 0 deletions gunicorn_thrift/thriftpy_thread_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
# -*- coding: utf-8 -*-

#
# This file is largely copied from gunicorn,
# which is released under the MIT license.
#

# The difference is that there's no default value for client timeout.
# If `thrift_client_timeout` is not provided, connection will be alive forever

import socket
import errno
import time

from functools import partial

from thriftpy2.transport import TSocket
from thriftpy2.transport import TTransportException
from thriftpy2.protocol.exc import TProtocolException
from thriftpy2.protocol.cybin import ProtocolError
from thriftpy2.thrift import TDecodeException

from gunicorn.workers.gthread import ThreadWorker, selectors

from .utils import (
ProcessorMixin,
check_protocol_and_transport_for_thriftpy_woker,
)


class TThreadClient(object):
def __init__(self, app, client, addr, t_processor):
self.app = app

self.sock = client
self.result = TSocket()
self.result.set_handle(client)
self.addr = addr

self.t_processor = t_processor

self.itrans = self.app.tfactory.get_transport(self.result)
self.otrans = self.app.tfactory.get_transport(self.result)
self.iprot = self.app.pfactory.get_protocol(self.itrans)
self.oprot = self.app.pfactory.get_protocol(self.otrans)

self.last_used = 0

self.client_timeout = self.app.cfg.thrift_client_timeout

def init(self):
# flag the socket as blocked
self.sock.setblocking(True)

def close(self):
self.result.close()

def before_put_into_ioloop(self):
# flag the socket as non blocked
self.last_used = time.time()
self.sock.setblocking(False)

def is_expired(self):
if self.client_timeout:
return time.time() > self.client_timeout + self.last_used
return False

def process_single_rpc(self):
keepalive = False
try:
self.t_processor.process(self.iprot, self.oprot)
keepalive = True
except TTransportException:
pass
except (TProtocolException, ProtocolError) as err:
self.log.warning(
"Protocol error, is client(%s) correct? %s", self.addr, err
)
except TDecodeException as err:
self.log.exception('%r: %r', self.addr, err)
self.app.cfg.on_tdecode_exception(err)
except socket.timeout:
self.log.warning('Client timeout: %r', self.addr)
except socket.error as e:
if e.args[0] == errno.ECONNRESET:
self.log.debug('%r: %r', self.addr, e)
elif e.args[0] == errno.EPIPE:
self.log.warning('%r: %r', self.addr, e)
else:
self.log.exception('%r: %r', self.addr, e)
except Exception as e:
self.log.exception('%r: %r', self.addr, e)
finally:
if not keepalive:
self.itrans.close()
self.otrans.close()
self.app.cfg.post_connect_closed(self)
return (keepalive, self)


class ThriftpyThreadWorker(ThreadWorker, ProcessorMixin):
def init_process(self):
self.log.info("Starting thread worker, threads: %s", self.cfg.threads)
return super(ThriftpyThreadWorker, self).init_process()

@classmethod
def check_config(cls, cfg, log):
super(ThriftpyThreadWorker, cls).check_config(cfg, log)
check_protocol_and_transport_for_thriftpy_woker(cfg)

def accept(self, listener):
try:
client, addr = listener.accept()
self.cfg.on_connected(self, addr)
# initialize the connection object
conn = TThreadClient(
self.app, client, addr, self.get_thrift_processor()
)
# enqueue the job
self.enqueue_req(conn)
except socket.error as e:
if e.args[0] not in (
errno.EAGAIN, errno.ECONNABORTED, errno.EWOULDBLOCK):
raise

def handle(self, conn):
return conn.process_single_rpc()

def handle_exit(self, sig, frame):
ret = super(ThriftpyThreadWorker, self).handle_exit(sig, frame)
self.cfg.worker_term(self)
return ret

def finish_request(self, fs):
if fs.cancelled():
fs.conn.close()
return

try:
(keepalive, conn) = fs.result()
# if the connection should be kept alived add it
# to the eventloop and record it
if keepalive:
conn.before_put_into_ioloop()

with self._lock:
self._keep.append(conn)

# add the socket to the event loop
self.poller.register(conn.sock, selectors.EVENT_READ,
partial(self.reuse_connection, conn))
else:
self.nr -= 1
conn.close()
except Exception:
# an exception happened, make sure to close the
# socket.
self.nr -= 1
fs.conn.close()

def murder_keepalived(self):
while True:
with self._lock:
try:
# remove the connection from the queue
conn = self._keep.popleft()
except IndexError:
break

if conn.is_expired():
self.nr -= 1
# remove the socket from the poller
with self._lock:
try:
self.poller.unregister(conn.sock)
except socket.error as e:
if e.args[0] != errno.EBADF:
raise

# close the socket
self.log.warning("Client timedout, closing: %s", conn.addr)
conn.close()
else:
# add the connection back to the queue
with self._lock:
self._keep.appendleft(conn)
break
20 changes: 20 additions & 0 deletions gunicorn_thrift/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,23 @@ def get_thrift_processor(self):
return self.app.thrift_app.get_processor() if \
self.app.cfg.thrift_processor_as_factory else \
self.app.thrift_app


def check_protocol_and_transport_for_thriftpy_woker(cfg):
if not cfg.thrift_protocol_factory.startswith('thriftpy'):
raise AppImportError(
'Thriftpy worker can only use protocol from thriftpy,'
'specify `thrift_protocol_factory` as one of the '
'following:'
'`thriftpy2.protocol:TCyBinaryProtocolFactory`, '
'`thriftpy2.protocol:TBinaryProtocolFactory`'
)

if not cfg.thrift_transport_factory.startswith('thriftpy'):
raise AppImportError(
'Thriftpy worker can only use transport from thriftpy,'
'specify `thrift_transport_factory` as one of the '
'following:'
'`thriftpy2.transport:TCyBufferedTransportFactory`, '
'`thriftpy2.transport:TBufferedTransportFactory`'
)
Loading

0 comments on commit e359185

Please sign in to comment.