Skip to content

Commit

Permalink
inherit PIPE and fix bug in reap_workers
Browse files Browse the repository at this point in the history
  • Loading branch information
benoitc committed Jan 29, 2010
1 parent 6dff13c commit 6d4ef85
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 50 deletions.
37 changes: 17 additions & 20 deletions gunicorn/arbiter.py
Expand Up @@ -17,6 +17,7 @@
import time

from gunicorn.worker import Worker
from gunicorn import util

class Arbiter(object):

Expand All @@ -34,9 +35,7 @@ class Arbiter(object):
(getattr(signal, name), name[3:].lower()) for name in dir(signal)
if name[:3] == "SIG" and name[3] != "_"
)




def __init__(self, address, num_workers, modname,
**kwargs):
self.address = address
Expand Down Expand Up @@ -96,8 +95,11 @@ def unlink_pidfile(self, path):
def valid_pidfile(self, path):
try:
with open(path, "r") as f:
pid = int(f.read() or 0)
if pid <= 0: return
try:
pid = int(f.read())
except:
return None
if pid <= 0: return None

try:
os.kill(pid, 0)
Expand All @@ -116,14 +118,10 @@ def init_signals(self):
if self.PIPE:
map(lambda p: p.close(), self.PIPE)
self.PIPE = pair = os.pipe()
map(self.set_non_blocking, pair)
map(lambda p: fcntl.fcntl(p, fcntl.F_SETFD, fcntl.FD_CLOEXEC), pair)
map(util.set_non_blocking, pair)
map(util.close_on_exec, pair)
map(lambda s: signal.signal(s, self.signal), self.SIGNALS)
signal.signal(signal.SIGCHLD, self.handle_chld)

def set_non_blocking(self, fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK
fcntl.fcntl(fd, fcntl.F_SETFL, flags)

def signal(self, sig, frame):
if len(self.SIG_QUEUE) < 5:
Expand All @@ -146,6 +144,7 @@ def listen(self, addr):
self.log.error("should be a non GUNICORN environnement")
else:
raise


for i in range(5):
try:
Expand Down Expand Up @@ -292,7 +291,7 @@ def stop(self, graceful=True):
if not graceful:
sig = signal.SIGTERM
limit = time.time() + self.timeout
while len(self.WORKERS) and time.time() < limit:
while self.WORKERS or time.time() > limit:
self.kill_workers(sig)
time.sleep(0.1)
self.reap_workers()
Expand Down Expand Up @@ -343,43 +342,41 @@ def spawn_workers(self):
continue

worker = Worker(i, self.pid, self.LISTENER, self.modname,
self.timeout, self.debug)
self.timeout, self.PIPE, self.debug)
pid = os.fork()
if pid != 0:
self.WORKERS[pid] = worker
continue

# Process Child
worker_pid = os.getpid()
try:
self.log.info("Worker %s booting" % worker_pid)
worker.run()
sys.exit(0)
except SystemExit:

raise
except:
self.log.exception("Exception in worker process.")
sys.exit(-1)
finally:
worker.tmp.close()
self.log.info("Worker %s exiting." % worker_pid)
os._exit(127)

def kill_workers(self, sig):
for pid in self.WORKERS.keys():
self.kill_worker(pid, sig)

def kill_worker(self, pid, sig):
worker = self.WORKERS.pop(pid)

try:
os.kill(pid, sig)
kpid, stat = os.waitpid(pid, os.WNOHANG)
if kpid:
self.log.warning("Problem killing process: %s" % pid)
except OSError, e:
if e.errno == errno.ESRCH:
worker = self.WORKERS.pop(pid)
try:
worker.tmp.close()
except:
pass

raise
15 changes: 3 additions & 12 deletions gunicorn/main.py
Expand Up @@ -7,10 +7,10 @@
import logging
import optparse as op
import os
import resource
import sys

from gunicorn.arbiter import Arbiter
from gunicorn import util

LOG_LEVELS = {
"critical": logging.CRITICAL,
Expand All @@ -21,12 +21,6 @@
}

UMASK = 0
MAXFD = 1024
if (hasattr(os, "devnull")):
REDIRECT_TO = os.devnull
else:
REDIRECT_TO = "/dev/null"


def options():
return [
Expand Down Expand Up @@ -75,9 +69,7 @@ def daemonize(logger):
else:
os._exit(0)

maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
if (maxfd == resource.RLIM_INFINITY):
maxfd = 1024
maxfd = util.get_maxfd()

# Iterate through and close all file descriptors.
for fd in range(0, maxfd):
Expand All @@ -86,8 +78,7 @@ def daemonize(logger):
except OSError: # ERROR, fd wasn't open to begin with (ignored)
pass


os.open(REDIRECT_TO, os.O_RDWR)
os.open(util.REDIRECT_TO, os.O_RDWR)
os.dup2(0, 1)
os.dup2(0, 2)

Expand Down
19 changes: 19 additions & 0 deletions gunicorn/util.py
Expand Up @@ -5,10 +5,19 @@

import errno
import fcntl
import os
import resource
import select
import socket
import time

MAXFD = 1024
if (hasattr(os, "devnull")):
REDIRECT_TO = os.devnull
else:
REDIRECT_TO = "/dev/null"



timeout_default = object()

Expand All @@ -20,10 +29,20 @@
monthname = [None,
'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']

def get_maxfd():
maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
if (maxfd == resource.RLIM_INFINITY):
maxfd = MAXFD
return maxfd

def close_on_exec(fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
fcntl.fcntl(fd, fcntl.F_SETFL, flags)

def set_non_blocking(fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK
fcntl.fcntl(fd, fcntl.F_SETFL, flags)

def close(sock):
try:
Expand Down
53 changes: 35 additions & 18 deletions gunicorn/worker.py
Expand Up @@ -24,39 +24,52 @@ class Worker(object):
lambda x: getattr(signal, "SIG%s" % x),
"HUP QUIT INT TERM TTIN TTOU USR1".split()
)

PIPE = []

def __init__(self, workerid, ppid, socket, app, timeout,
debug=False):
pipe, debug=False):
self.nr = 0
self.id = workerid
self.ppid = ppid
self.debug = debug
self.socket = socket
self.timeout = timeout / 2.0
fd, tmpname = tempfile.mkstemp()
self.tmp = os.fdopen(fd, "r+b")
self.tmpname = tmpname
self.app = app
self.alive = True
self.log = logging.getLogger(__name__)


# init pipe
self.PIPE = pipe
map(util.set_non_blocking, pipe)
map(util.close_on_exec, pipe)

# prevent inherientence
self.socket = socket
util.close_on_exec(self.socket)
self.socket.setblocking(0)

util.close_on_exec(fd)

self.address = self.socket.getsockname()

self.app = app
self.alive = True
self.log = logging.getLogger(__name__)


self.address = self.socket.getsockname()

def init_signals(self):
map(lambda s: signal.signal(s, signal.SIG_DFL), self.SIGNALS)
signal.signal(signal.SIGQUIT, self.handle_quit)
signal.signal(signal.SIGUSR1, self.handle_usr1)
signal.signal(signal.SIGTERM, self.handle_exit)
signal.signal(signal.SIGINT, self.handle_exit)
signal.signal(signal.SIGUSR1, self.handle_quit)

def handle_quit(self, sig, frame):

def handle_usr1(self, *args):
self.nr = -65536;
try:
map(lambda p: p.close(), self.PIPE)
except:
pass

def handle_quit(self, *args):
self.alive = False

def handle_exit(self, sig, frame):
Expand All @@ -71,14 +84,16 @@ def _fchmod(self, mode):
def run(self):
self.init_signals()
spinner = 0
self.nr = 0
while self.alive:

nr = 0
self.nr = 0
# Accept until we hit EAGAIN. We're betting that when we're
# processing clients that more clients are waiting. When
# there's no more clients waiting we go back to the select()
# loop and wait for some lovin.
while self.alive:
self.nr = 0
try:
client, addr = self.socket.accept()

Expand All @@ -89,19 +104,21 @@ def run(self):
# to signal that this worker process is alive.
spinner = (spinner+1) % 2
self._fchmod(spinner)
nr += 1
self.nr += 1
except socket.error, e:
if e[0] in (errno.EAGAIN, errno.ECONNABORTED):
break # Uh oh!

raise
if nr == 0: break
if self.nr == 0: break

if self.ppid != os.getppid():
break

while self.alive:
spinner = (spinner+1) % 2
self._fchmod(spinner)
try:
ret = select.select([self.socket], [], [],
ret = select.select([self.socket], [], self.PIPE,
self.timeout)
if ret[0]:
break
Expand Down

0 comments on commit 6d4ef85

Please sign in to comment.