Skip to content

Commit

Permalink
Merge pull request #962 from benoitc/fix/908
Browse files Browse the repository at this point in the history
stop to accept more requests when maximum accepted is achieved
  • Loading branch information
benoitc committed Jan 21, 2015
2 parents cc1ddf1 + b1bde15 commit fa1b7cc
Showing 1 changed file with 91 additions and 82 deletions.
173 changes: 91 additions & 82 deletions gunicorn/workers/gthread.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import socket
import ssl
import sys
from threading import RLock
import time

from .. import http
Expand Down Expand Up @@ -65,13 +66,14 @@ def init(self):

# initialize the parser
self.parser = http.RequestParser(self.cfg, self.sock)
return True
return False

def set_timeout(self):
# set the timeout
self.timeout = time.time() + self.cfg.keepalive

def close(self):
util.close(self.sock)

def __lt__(self, other):
return self.timeout < other.timeout

Expand All @@ -83,68 +85,94 @@ class ThreadWorker(base.Worker):
def __init__(self, *args, **kwargs):
super(ThreadWorker, self).__init__(*args, **kwargs)
self.worker_connections = self.cfg.worker_connections
self.max_keepalived = self.cfg.worker_connections - self.cfg.threads

# initialise the pool
self.tpool = None
self.poller = None
self._lock = None
self.futures = deque()
self._keep = deque()

def init_process(self):
self.tpool = futures.ThreadPoolExecutor(max_workers=self.cfg.threads)
self.poller = selectors.DefaultSelector()
self._lock = RLock()
super(ThreadWorker, self).init_process()

def _wrap_future(self, fs, conn):
fs.conn = conn
self.futures.append(fs)
fs.add_done_callback(self.finish_request)

def init_process(self):
self.tpool = futures.ThreadPoolExecutor(max_workers=self.cfg.threads)
self.poller = selectors.DefaultSelector()
super(ThreadWorker, self).init_process()
def enqueue_req(self, conn):
conn.init()
# submit the connection to a worker
fs = self.tpool.submit(self.handle, conn)
self._wrap_future(fs, conn)

def accept(self, listener):
if not self.alive:
return

try:
client, addr = listener.accept()
# initialize the connection object
conn = TConn(self.cfg, listener, client, addr)

# wait for the read event to handle the connection
self.poller.register(client, selectors.EVENT_READ,
partial(self.handle_client, conn))

self.nr += 1
# enqueue the job
self.enqueue_req(conn)
except socket.error as e:
if e.args[0] not in (errno.EAGAIN,
errno.ECONNABORTED, errno.EWOULDBLOCK):
raise

def handle_client(self, conn, client):
# unregister the client from the poller
self.poller.unregister(client)
def reuse_connection(self, conn, client):
with self._lock:
# unregister the client from the poller
self.poller.unregister(client)
# remove the connection from keepalive
try:
self._keep.remove(conn)
except ValueError:
# race condition
return

# submit the connection to a worker
fs = self.tpool.submit(self.handle, conn)
self._wrap_future(fs, conn)
self.enqueue_req(conn)

def murder_keepalived(self):
now = time.time()
while True:
try:
# remove the connection from the queue
conn = self._keep.popleft()
except IndexError:
break
with self._lock:
try:
# remove the connection from the queue
conn = self._keep.popleft()
except IndexError:
break

delta = conn.timeout - now
if delta > 0:
# add the connection back to the queue
self._keep.appendleft(conn)
with self._lock:
self._keep.appendleft(conn)
break
else:
self.nr -= 1
# remove the socket from the poller
self.poller.unregister(conn.sock)
with self._lock:
try:
self.poller.unregister(conn.sock)
except socket.error as e:
if e.args[0] != errno.EBADF:
raise

# close the socket
util.close(conn.sock)
conn.close()

def is_parent_alive(self):
# If our parent changed then we shut down.
if self.ppid != os.getppid():
self.log.info("Parent changed, shutting down: %s", self)
return False
return True

def run(self):
# init listeners, add them to the event loop
Expand All @@ -155,55 +183,46 @@ def run(self):
timeout = self.cfg.timeout or 0.5

while self.alive:
# If our parent changed then we shut down.
if self.ppid != os.getppid():
self.log.info("Parent changed, shutting down: %s", self)
return

# notify the arbiter we are alive
self.notify()

events = self.poller.select(0.2)
for key, mask in events:
callback = key.data
callback(key.fileobj)
# can we accept more connections?
if self.nr < self.worker_connections:
# wait for an event
events = self.poller.select(0.02)
for key, mask in events:
callback = key.data
callback(key.fileobj)

if not self.is_parent_alive():
break

# hanle keepalive timeouts
self.murder_keepalived()

# if we more connections than the max number of connections
# accepted on a worker, wait until some complete or exit.
if len(self.futures) >= self.worker_connections:
res = futures.wait(self.futures, timeout=timeout)
if not res:
self.alive = False
self.log.info("max requests achieved")
break

# shutdown the pool
self.poller.close()
self.tpool.shutdown(False)
# if the number of connections is < to the max we can handle at
# the same time there is no need to wait for one
if len(self.futures) < self.cfg.threads:
continue

# wait for the workers
futures.wait(self.futures, timeout=self.cfg.graceful_timeout)
result = futures.wait(self.futures, timeout=timeout,
return_when=futures.FIRST_COMPLETED)

# if we have still fures running, try to close them
while True:
try:
fs = self.futures.popleft()
except IndexError:
if not result.done:
break

sock = fs.conn.sock
else:
[self.futures.remove(f) for f in result.done]

# the future is not running, cancel it
if not fs.done() and not fs.running():
fs.cancel()
self.tpool.shutdown(False)
self.poller.close()

# make sure we close the sockets after the graceful timeout
util.close(sock)

def finish_request(self, fs):
if fs.cancelled():
fs.conn.close()
return

try:
(keepalive, conn) = fs.result()
# if the connection should be kept alived add it
Expand All @@ -214,32 +233,22 @@ def finish_request(self, fs):

# register the connection
conn.set_timeout()
self._keep.append(conn)
with self._lock:
self._keep.append(conn)

# add the socket to the event loop
self.poller.register(conn.sock, selectors.EVENT_READ,
partial(self.handle_client, conn))
# add the socket to the event loop
self.poller.register(conn.sock, selectors.EVENT_READ,
partial(self.reuse_connection, conn))
else:
util.close(conn.sock)
self.nr -= 1
conn.close()
except:
# an exception happened, make sure to close the
# socket.
util.close(fs.conn.sock)
finally:
# remove the future from our list
try:
self.futures.remove(fs)
except ValueError:
pass
self.nr -= 1
fs.conn.close()

def handle(self, conn):
if not conn.init():
# connection kept alive
try:
self._keep.remove(conn)
except ValueError:
pass

keepalive = False
req = None
try:
Expand Down Expand Up @@ -287,15 +296,15 @@ def handle_request(self, req, conn):
conn.listener.getsockname(), self.cfg)
environ["wsgi.multithread"] = True

self.nr += 1

if self.alive and self.nr >= self.max_requests:
self.log.info("Autorestarting worker after current request.")
resp.force_close()
self.alive = False

if not self.cfg.keepalive:
resp.force_close()
elif len(self._keep) >= self.max_keepalived:
resp.force_close()

respiter = self.wsgi(environ, resp.start_response)
try:
Expand Down

0 comments on commit fa1b7cc

Please sign in to comment.