Skip to content

Commit

Permalink
93% total coverage.
Browse files Browse the repository at this point in the history
celery.bin.celeryd and celery.concurrency.processes now fully covered.
  • Loading branch information
Ask Solem committed Jun 24, 2010
1 parent a085889 commit 1e45852
Show file tree
Hide file tree
Showing 7 changed files with 414 additions and 22 deletions.
15 changes: 3 additions & 12 deletions celery/bin/celeryd.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ def dump_version(*args):


class Worker(object):
WorkController = WorkController

def __init__(self, concurrency=conf.CELERYD_CONCURRENCY,
loglevel=conf.CELERYD_LOG_LEVEL, logfile=conf.CELERYD_LOG_FILE,
Expand Down Expand Up @@ -195,15 +196,6 @@ def run(self):
print("celery@%s v%s is starting." % (self.hostname,
celery.__version__))


if conf.RESULT_BACKEND == "database" \
and self.settings.DATABASE_ENGINE == "sqlite3" and \
self.concurrency > 1:
warnings.warn("The sqlite3 database engine doesn't handle "
"concurrency well. Will use a single process only.",
UserWarning)
self.concurrency = 1

if getattr(self.settings, "DEBUG", False):
warnings.warn("Using settings.DEBUG leads to a memory leak, "
"never use this setting in a production environment!")
Expand Down Expand Up @@ -232,7 +224,6 @@ def init_queues(self):
if queue not in conf.QUEUES:
if conf.CREATE_MISSING_QUEUES:
Router(queues=conf.QUEUES).add_queue(queue)
print("QUEUES: %s" % conf.QUEUES)
else:
raise ImproperlyConfigured(
"Queue '%s' not defined in CELERY_QUEUES" % queue)
Expand Down Expand Up @@ -293,7 +284,7 @@ def startup_info(self):
}

def run_worker(self):
worker = WorkController(concurrency=self.concurrency,
worker = self.WorkController(concurrency=self.concurrency,
loglevel=self.loglevel,
logfile=self.logfile,
hostname=self.hostname,
Expand Down Expand Up @@ -381,7 +372,7 @@ def set_process_status(info):
arg_start = "manage" in sys.argv[0] and 2 or 1
if sys.argv[arg_start:]:
info = "%s (%s)" % (info, " ".join(sys.argv[arg_start:]))
platform.set_mp_process_title("celeryd", info=info)
return platform.set_mp_process_title("celeryd", info=info)


def run_worker(**options):
Expand Down
13 changes: 7 additions & 6 deletions celery/concurrency/processes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class TaskPool(object):
The logger used for debugging.
"""
Pool = Pool

def __init__(self, limit, logger=None, initializer=None,
maxtasksperchild=None, timeout=None, soft_timeout=None,
Expand All @@ -46,11 +47,11 @@ def start(self):
Will pre-fork all workers so they're ready to accept tasks.
"""
self._pool = Pool(processes=self.limit,
initializer=self.initializer,
timeout=self.timeout,
soft_timeout=self.soft_timeout,
maxtasksperchild=self.maxtasksperchild)
self._pool = self.Pool(processes=self.limit,
initializer=self.initializer,
timeout=self.timeout,
soft_timeout=self.soft_timeout,
maxtasksperchild=self.maxtasksperchild)

def stop(self):
"""Gracefully stop the pool."""
Expand Down Expand Up @@ -96,7 +97,7 @@ def on_ready(self, callbacks, errbacks, ret_value):

if isinstance(ret_value, ExceptionInfo):
if isinstance(ret_value.exception, (
SystemExit, KeyboardInterrupt)): # pragma: no cover
SystemExit, KeyboardInterrupt)):
raise ret_value.exception
[errback(ret_value) for errback in errbacks]
else:
Expand Down
5 changes: 3 additions & 2 deletions celery/platform.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,11 @@ def set_process_title(progname, info=None):
Only works if :mod`setproctitle` is installed.
"""
proctitle = "[%s]" % progname
proctitle = info and "%s %s" % (proctitle, info) or proctitle
if _setproctitle:
proctitle = "[%s]" % progname
proctitle = info and "%s %s" % (proctitle, info) or proctitle
_setproctitle(proctitle)
return proctitle


def set_mp_process_title(progname, info=None):
Expand Down
Empty file.
Loading

0 comments on commit 1e45852

Please sign in to comment.