Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

workers/gthread: Remove locks + one event queue + general cleanup #3157

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
258 changes: 111 additions & 147 deletions gunicorn/workers/gthread.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from concurrent import futures
import errno
import os
import queue
import selectors
import socket
import ssl
Expand All @@ -22,7 +23,6 @@
from collections import deque
from datetime import datetime
from functools import partial
from threading import RLock

from . import base
from .. import http
Expand All @@ -41,44 +41,64 @@ def __init__(self, cfg, sock, client, server):

self.timeout = None
self.parser = None
self.initialized = False

# set the socket to non blocking
self.sock.setblocking(False)

def init(self):
self.initialized = True
self.sock.setblocking(True)

if self.parser is None:
# wrap the socket if needed
if self.cfg.is_ssl:
self.sock = sock.ssl_wrap_socket(self.sock, self.cfg)

# initialize the parser
self.parser = http.RequestParser(self.cfg, self.sock, self.client)

def set_timeout(self):
# set the timeout
self.timeout = time.time() + self.cfg.keepalive
def is_initialized(self):
return bool(self.parser)

def set_keepalive_timeout(self):
self.timeout = time.monotonic() + self.cfg.keepalive

def close(self):
util.close(self.sock)


class PollableMethodQueue(object):

def __init__(self):
self.fds = []
self.method_queue = None

def init(self):
self.fds = os.pipe()
self.method_queue = queue.SimpleQueue()

def close(self):
for fd in self.fds:
os.close(fd)

def get_fd(self):
return self.fds[0]

def defer(self, callback, *args):
self.method_queue.put(partial(callback, *args))
os.write(self.fds[1], b'0')

def run_callbacks(self, max_callbacks_at_a_time=10):
zeroes = os.read(self.fds[0], max_callbacks_at_a_time)
for _ in range(0, len(zeroes)):
method = self.method_queue.get()
method()


class ThreadWorker(base.Worker):

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.worker_connections = self.cfg.worker_connections
self.max_keepalived = self.cfg.worker_connections - self.cfg.threads
# initialise the pool
self.tpool = None
self.thread_pool = None
self.poller = None
self._lock = None
self.futures = deque()
self._keep = deque()
self.keepalived_conns = deque()
self.nr_conns = 0
self.method_queue = PollableMethodQueue()

@classmethod
def check_config(cls, cfg, log):
Expand All @@ -89,100 +109,65 @@ def check_config(cls, cfg, log):
"Check the number of worker connections and threads.")

def init_process(self):
self.tpool = self.get_thread_pool()
self.thread_pool = self.get_thread_pool()
self.poller = selectors.DefaultSelector()
self._lock = RLock()
self.method_queue.init()
super().init_process()

def get_thread_pool(self):
"""Override this method to customize how the thread pool is created"""
return futures.ThreadPoolExecutor(max_workers=self.cfg.threads)

def handle_quit(self, sig, frame):
def handle_exit(self, sig, frame):
self.alive = False
# worker_int callback
self.cfg.worker_int(self)
self.tpool.shutdown(False)
time.sleep(0.1)
sys.exit(0)

def _wrap_future(self, fs, conn):
fs.conn = conn
self.futures.append(fs)
fs.add_done_callback(self.finish_request)

def enqueue_req(self, conn):
conn.init()
# submit the connection to a worker
fs = self.tpool.submit(self.handle, conn)
self._wrap_future(fs, conn)
self.method_queue.defer(lambda: None) # To wake up poller.select()

def handle_quit(self, sig, frame):
self.thread_pool.shutdown(False)
super().handle_quit(sig, frame)

def set_accept_enabled(self, enabled, register_fd=False):
event = selectors.EVENT_READ if enabled else 0
method = self.poller.register if register_fd else self.poller.modify
for sock in self.sockets:
method(sock, event, self.accept)

def accept(self, server, listener):
def accept(self, listener):
try:
sock, client = listener.accept()
# initialize the connection object
conn = TConn(self.cfg, sock, client, server)

self.nr_conns += 1
# wait until socket is readable
with self._lock:
self.poller.register(conn.sock, selectors.EVENT_READ,
partial(self.on_client_socket_readable, conn))
sock.setblocking(True) # Explicitly set behavior since it differs per OS
conn = TConn(self.cfg, sock, client, listener.getsockname())

self.poller.register(conn.sock, selectors.EVENT_READ,
partial(self.on_client_socket_readable, conn))
except EnvironmentError as e:
if e.errno not in (errno.EAGAIN, errno.ECONNABORTED,
errno.EWOULDBLOCK):
raise

def on_client_socket_readable(self, conn, client):
with self._lock:
# unregister the client from the poller
self.poller.unregister(client)
self.poller.unregister(client)

if conn.initialized:
# remove the connection from keepalive
try:
self._keep.remove(conn)
except ValueError:
# race condition
return
if conn.is_initialized():
self.keepalived_conns.remove(conn)
conn.init()

# submit the connection to a worker
self.enqueue_req(conn)
fs = self.thread_pool.submit(self.handle, conn)
fs.add_done_callback(
lambda fut: self.method_queue.defer(self.finish_request, conn, fut))

def murder_keepalived(self):
now = time.time()
while True:
with self._lock:
try:
# remove the connection from the queue
conn = self._keep.popleft()
except IndexError:
break

delta = conn.timeout - now
now = time.monotonic()
while self.keepalived_conns:
delta = self.keepalived_conns[0].timeout - now
if delta > 0:
# add the connection back to the queue
with self._lock:
self._keep.appendleft(conn)
break
else:
self.nr_conns -= 1
# remove the socket from the poller
with self._lock:
try:
self.poller.unregister(conn.sock)
except EnvironmentError as e:
if e.errno != errno.EBADF:
raise
except KeyError:
# already removed by the system, continue
pass
except ValueError:
# already removed by the system continue
pass

# close the socket
conn.close()

conn = self.keepalived_conns.popleft()
self.poller.unregister(conn.sock)
self.nr_conns -= 1
conn.close()

def is_parent_alive(self):
# If our parent changed then we shut down.
Expand All @@ -191,97 +176,76 @@ def is_parent_alive(self):
return False
return True

def wait_for_and_dispatch_events(self, timeout):
for key, _ in self.poller.select(timeout):
callback = key.data
callback(key.fileobj)

def run(self):
# init listeners, add them to the event loop
for sock in self.sockets:
sock.setblocking(False)
# a race condition during graceful shutdown may make the listener
# name unavailable in the request handler so capture it once here
server = sock.getsockname()
acceptor = partial(self.accept, server)
self.poller.register(sock, selectors.EVENT_READ, acceptor)
self.set_accept_enabled(True, register_fd=True)
self.poller.register(self.method_queue.get_fd(),
selectors.EVENT_READ,
self.method_queue.run_callbacks)

while self.alive:
# notify the arbiter we are alive
self.notify()

# can we accept more connections?
if self.nr_conns < self.worker_connections:
# wait for an event
events = self.poller.select(1.0)
for key, _ in events:
callback = key.data
callback(key.fileobj)

# check (but do not wait) for finished requests
result = futures.wait(self.futures, timeout=0,
return_when=futures.FIRST_COMPLETED)
else:
# wait for a request to finish
result = futures.wait(self.futures, timeout=1.0,
return_when=futures.FIRST_COMPLETED)

# clean up finished requests
for fut in result.done:
self.futures.remove(fut)
new_connections_accepted = self.nr_conns < self.worker_connections
self.wait_for_and_dispatch_events(timeout=1)

if not self.is_parent_alive():
break

# handle keepalive timeouts
self.murder_keepalived()

self.tpool.shutdown(False)
new_connections_still_accepted = self.nr_conns < self.worker_connections
if new_connections_accepted != new_connections_still_accepted:
self.set_accept_enabled(new_connections_still_accepted)

# Don't accept any new connections, as we're about to shut down
self.set_accept_enabled(False)

# ... but try handle all already accepted connections within the grace period
graceful_timeout = time.monotonic() + self.cfg.graceful_timeout
while self.nr_conns > 0:
time_remaining = max(graceful_timeout - time.monotonic(), 0)
if time_remaining == 0:
break
self.wait_for_and_dispatch_events(timeout=time_remaining)

self.thread_pool.shutdown(wait=False)
self.poller.close()
self.method_queue.close()

for s in self.sockets:
s.close()

futures.wait(self.futures, timeout=self.cfg.graceful_timeout)

def finish_request(self, fs):
if fs.cancelled():
self.nr_conns -= 1
fs.conn.close()
return

def finish_request(self, conn, fs):
try:
(keepalive, conn) = fs.result()
# if the connection should be kept alived add it
# to the eventloop and record it
keepalive = not fs.cancelled() and fs.result()
if keepalive and self.alive:
# flag the socket as non blocked
conn.sock.setblocking(False)

# register the connection
conn.set_timeout()
with self._lock:
self._keep.append(conn)

# add the socket to the event loop
self.poller.register(conn.sock, selectors.EVENT_READ,
partial(self.on_client_socket_readable, conn))
conn.set_keepalive_timeout()
self.keepalived_conns.append(conn)
self.poller.register(conn.sock, selectors.EVENT_READ,
partial(self.on_client_socket_readable, conn))
else:
self.nr_conns -= 1
conn.close()
except Exception:
# an exception happened, make sure to close the
# socket.
self.nr_conns -= 1
fs.conn.close()
conn.close()

def handle(self, conn):
keepalive = False
req = None
try:
req = next(conn.parser)
if not req:
return (False, conn)
return False

# handle the request
keepalive = self.handle_request(req, conn)
if keepalive:
return (keepalive, conn)
return self.handle_request(req, conn)
except http.errors.NoMoreData as e:
self.log.debug("Ignored premature client disconnection. %s", e)

Expand All @@ -308,7 +272,7 @@ def handle(self, conn):
except Exception as e:
self.handle_error(req, conn.sock, conn.client, e)

return (False, conn)
return False

def handle_request(self, req, conn):
environ = {}
Expand All @@ -328,7 +292,7 @@ def handle_request(self, req, conn):

if not self.alive or not self.cfg.keepalive:
resp.force_close()
elif len(self._keep) >= self.max_keepalived:
elif len(self.keepalived_conns) >= self.max_keepalived:
resp.force_close()

respiter = self.wsgi(environ, resp.start_response)
Expand Down