Skip to content

Commit

Permalink
Fixes 'no handlers for multiprocessing' warning on Windows and fork+exec
Browse files Browse the repository at this point in the history
  • Loading branch information
ask committed Feb 7, 2012
1 parent 16c37d8 commit df9eda7
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 12 deletions.
10 changes: 3 additions & 7 deletions celery/apps/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,9 @@ def init_loader(self):
self.loader.import_from_cwd(module)

def redirect_stdouts_to_logger(self):
handled = self.app.log.setup_logging_subsystem(loglevel=self.loglevel,
logfile=self.logfile)
if not handled:
logger = self.app.log.get_default_logger()
if self.redirect_stdouts:
self.app.log.redirect_stdouts_to_logger(logger,
loglevel=self.redirect_stdouts_level)
self.app.log.setup(self.loglevel, self.logfile,
self.redirect_stdouts,
self.redirect_stdouts_level)

def purge_messages(self):
count = self.app.control.discard_all()
Expand Down
1 change: 1 addition & 0 deletions celery/bin/celeryd.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
"""
from __future__ import absolute_import

import os
import sys

try:
Expand Down
5 changes: 5 additions & 0 deletions celery/concurrency/processes/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import

import os
import platform
import signal as _signal

Expand Down Expand Up @@ -40,6 +41,10 @@ def process_initializer(app, hostname):
# This is for Windows and other platforms not supporting
# fork(). Note that init_worker makes sure it's only
# run once per process.
app.log.setup(int(os.environ.get("CELERY_LOG_LEVEL", 0)),
os.environ.get("CELERY_LOG_FILE") or None,
bool(os.environ.get("CELERY_LOG_REDIRECT", False)),
str(os.environ.get("CELERY_LOG_REDIRECT_LEVEL")))
app.loader.init_worker()
app.loader.init_worker_process()
signals.worker_process_init.send(sender=None)
Expand Down
25 changes: 22 additions & 3 deletions celery/concurrency/processes/forking.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,28 @@ def main():
current_process()._inheriting = True
preparation_data = load(from_parent)
_forking.prepare(preparation_data)

# Huge hack to make logging before Process.run work.
loglevel = os.environ.get("_MP_FORK_LOGLEVEL_")
logfile = os.environ.get("_MP_FORK_LOGFILE_") or None
format = os.environ.get("_MP_FORK_LOGFORMAT_")
if loglevel:
from multiprocessing import util
import logging
logger = util.get_logger()
logger.setLevel(int(loglevel))
if not logger.handlers:
logger._rudimentary_setup = True
logfile = logfile or sys.__stderr__
if hasattr(logfile, "write"):
handler = logging.StreamHandler(logfile)
else:
handler = logging.FileHandler(logfile)
formatter = logging.Formatter(
format or util.DEFAULT_LOGGING_FORMAT)
handler.setFormatter(formatter)
logger.addHandler(handler)

self = load(from_parent)
current_process()._inheriting = False

Expand Down Expand Up @@ -163,6 +185,3 @@ def reduce_connection(conn):
_forking.Popen = Popen
else:
from multiprocessing.forking import freeze_support



29 changes: 27 additions & 2 deletions celery/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import logging
import threading
import os
import sys
import traceback

Expand Down Expand Up @@ -130,10 +131,31 @@ def setup_logging_subsystem(self, loglevel=None, logfile=None,
signals.after_setup_logger.send(sender=None, logger=logger,
loglevel=loglevel, logfile=logfile,
format=format, colorize=colorize)

# This is a hack for multiprocessing's fork+exec, so that
# logging before Process.run works.
os.environ.update(_MP_FORK_LOGLEVEL_=str(loglevel),
_MP_FORK_LOGFILE_=logfile or "",
_MP_FORK_LOGFORMAT_=format)
Logging._setup = True

return receivers

def setup(self, loglevel=None, logfile=None, redirect_stdouts=False,
redirect_level="WARNING"):
handled = self.setup_logging_subsystem(loglevel=loglevel,
logfile=logfile)
if not handled:
logger = self.get_default_logger()
if redirect_stdouts:
self.redirect_stdouts_to_logger(logger,
loglevel=redirect_level)
os.environ.update(
CELERY_LOG_LEVEL=str(loglevel) if loglevel else "",
CELERY_LOG_FILE=str(logfile) if logfile else "",
CELERY_LOG_REDIRECT="1" if redirect_stdouts else "",
CELERY_LOG_REDIRECT_LEVEL=str(redirect_level))

def _detect_handler(self, logfile=None):
"""Create log handler with either a filename, an open stream
or :const:`None` (stderr)."""
Expand Down Expand Up @@ -216,10 +238,13 @@ def redirect_stdouts_to_logger(self, logger, loglevel=None,
sys.stderr = proxy
return proxy

def _is_configured(self, logger):
return logger.handlers and not getattr(
logger, "_rudimentary_setup", False)

def _setup_logger(self, logger, logfile, format, colorize,
formatter=ColorFormatter, **kwargs):

if logger.handlers: # Logger already configured
if self._is_configured(logger):
return logger

handler = self._detect_handler(logfile)
Expand Down

0 comments on commit df9eda7

Please sign in to comment.