Skip to content

Commit

Permalink
dramatically improve the gevent worker.
Browse files Browse the repository at this point in the history
  • Loading branch information
benoitc committed Aug 12, 2010
1 parent d7d6fa3 commit d2c10a9
Showing 1 changed file with 28 additions and 78 deletions.
106 changes: 28 additions & 78 deletions gunicorn/workers/ggevent.py
Expand Up @@ -13,9 +13,8 @@
import gevent
from gevent import monkey
monkey.noisy = False
from gevent import core
from gevent import greenlet
from gevent.pool import Pool
from gevent.server import StreamServer
from gevent import pywsgi, wsgi

import gunicorn
Expand All @@ -34,17 +33,24 @@
'wsgi.run_once': False
}


class GGeventServer(StreamServer):
def __init__(self, listener, handle, spawn='defaul', worker=None):
StreamServer.__init__(self, listener, spawn=spawn)
self.handle_func = handle
self.worker = worker

def stop(self):
super(GGeventServer, self).stop()
self.worker.alive = False

def handle(self, sock, addr):
self.handle_func(sock, addr)

class GeventWorker(AsyncWorker):

min_delay = 0.01
max_delay = 1

def __init__(self, *args, **kwargs):
super(GeventWorker, self).__init__(*args, **kwargs)
self.pool = None
self._accept_event = None
self._acceptor_timer = None
self.delay = self.min_delay

@classmethod
def setup(cls):
Expand All @@ -53,90 +59,34 @@ def setup(cls):

def timeout_ctx(self):
return gevent.Timeout(self.cfg.keepalive, False)

def acceptor(self):
if self._accept_event is None:
self._accept_event = core.read_event(self.socket.fileno(), self._do_accept, persist=True)


def _acceptor(self, event):
if self._accept_event is None:
if not self.alive:
return

# create a read event
self._accept_event = core.read_event(self.socket.fileno(), self._do_accept, persist=True)

def _do_accept(self, event, _evtype):
try:
try:
conn, addr = self.socket.accept()
except socket.error, e:
if e[0] == errno.EAGAIN:
sys.exc_clear()
return
raise

self.delay = self.min_delay
self.pool.spawn(self.handle, conn, addr)
except socket.error, e:
if e[0] not in (errno.EBADF, errno.EINVAL, errno.ENOTSOCK):
self.alive = False
return
except:
self.log.exception("Unexpected error in acceptor. Sepuku.")
self.stop()
return

if self.delay >= 0:
self.stop_acceptor()
self._start_accepting_timer = core.timer(self.delay,
self.acceptor)
self.delay = min(self.max_delay, self.delay*2)

def stop_acceptor(self):
if self._accept_event is not None:
self._accept_event.cancel()
self._accept_event = None

if self._acceptor_timer is not None:
self._acceptor_timer.cancel()
self.acceptor_timer = None

def stop(self):
self.stop_acceptor()
self.pool.join(timeout=self.timeout)
self.pool.kill(block=True, timeout=1)
self.alive = False

def run(self):
self.socket.setblocking(1)

# start to accept
self.acceptor()

# enter in the main loop
pool = Pool(self.worker_connections)
server = GGeventServer(self.socket, self.handle, spawn=pool,
worker=self)

server.start()

try:
while self.alive:
gevent.spawn(self.notify)
self.notify()
if self.ppid != os.getppid():
self.log.info("Parent changed, shutting down: %s" % self)
break
gevent.sleep(self.timeout)
except:
pass
self.stop()

with gevent.Timeout(self.timeout, False):
gevent.spawn(server.stop).join()

def init_process(self):
#gevent doesn't reinitialize dns for us after forking
#here's the workaround
gevent.core.dns_shutdown(fail_requests=1)
gevent.core.dns_init()

# init the pool
self.pool = Pool(self.worker_connections)
self.pool._semaphore.rawlink(self._acceptor)

super(GeventWorker, self).init_process()

class GeventBaseWorker(Worker):
Expand Down Expand Up @@ -176,11 +126,11 @@ def run(self):
server.stop()
break
gevent.sleep(0.1)
self.pool.join(timeout=self.timeout)
self.pool.kill(block=True, timeout=1)
except KeyboardInterrupt:
pass


with gevent.Timeout(self.timeout, False):
gevent.spawn(server.stop).join()

class WSGIHandler(wsgi.WSGIHandler):
def log_request(self, *args):
Expand Down

0 comments on commit d2c10a9

Please sign in to comment.