Skip to content

Commit

Permalink
Tests passing
Browse files Browse the repository at this point in the history
  • Loading branch information
ask committed Jun 3, 2012
1 parent 3d3495c commit 184bda8
Show file tree
Hide file tree
Showing 14 changed files with 72 additions and 55 deletions.
5 changes: 4 additions & 1 deletion celery/apps/worker.py
Expand Up @@ -198,7 +198,10 @@ def startup_info(self):
appr += " (%s)" % loader
if self.autoscale:
concurrency = "{min=%s, max=%s}" % tuple(self.autoscale)
concurrency += " (%s)" % self.pool_cls.__module__.split('.')[-1]
pool = self.pool_cls
if not isinstance(pool, basestring):
pool = pool.__module__
concurrency += " (%s)" % pool.split('.')[-1]
events = "ON"
if not self.send_events:
events = "OFF (enable -E to monitor this worker)"
Expand Down
9 changes: 5 additions & 4 deletions celery/backends/amqp.py
Expand Up @@ -94,9 +94,7 @@ def _create_binding(self, task_id):
def revive(self, channel):
pass

def _store_result(self, task_id, result, status, traceback=None,
max_retries=20, interval_start=0, interval_step=1,
interval_max=1):
def _store_result(self, task_id, result, status, traceback=None):
"""Send task return value and status."""
with self.mutex:
with self.app.amqp.producer_pool.acquire(block=True) as pub:
Expand Down Expand Up @@ -157,6 +155,7 @@ def get_task_meta(self, task_id, backlog_limit=1000):
except KeyError:
# result probably pending.
return {"status": states.PENDING, "result": None}
poll = get_task_meta # XXX compat

def drain_events(self, connection, consumer, timeout=None, now=time.time):
wait = connection.drain_events
Expand Down Expand Up @@ -189,6 +188,7 @@ def consume(self, task_id, timeout=None):
def get_many(self, task_ids, timeout=None, **kwargs):
with self.app.pool.acquire_channel(block=True) as (conn, channel):
ids = set(task_ids)
cached_ids = set()
for task_id in ids:
try:
cached = self._cache[task_id]
Expand All @@ -197,7 +197,8 @@ def get_many(self, task_ids, timeout=None, **kwargs):
else:
if cached["status"] in states.READY_STATES:
yield task_id, cached
ids.discard(task_id)
cached_ids.add(task_id)
ids ^= cached_ids

bindings = [self._create_binding(task_id) for task_id in task_ids]
with self.Consumer(channel, bindings, no_ack=True) as consumer:
Expand Down
2 changes: 2 additions & 0 deletions celery/bin/celery.py
Expand Up @@ -64,6 +64,7 @@ def __init__(self, app=None, no_color=False, stdout=sys.stdout,
self.colored = term.colored(enabled=not no_color)
self.stdout = stdout
self.stderr = stderr
self.quiet = False

def __call__(self, *args, **kwargs):
try:
Expand Down Expand Up @@ -298,6 +299,7 @@ def run(self, task_name, limit, **kwargs):
self.say_remote_command_reply(reply)
rate_limit = command(rate_limit)


class inspect(Command):
choices = {"active": 1.0,
"active_queues": 1.0,
Expand Down
1 change: 1 addition & 0 deletions celery/concurrency/processes/__init__.py
Expand Up @@ -33,6 +33,7 @@

def process_initializer(app, hostname):
"""Initializes the process so it can be used to process tasks."""
app.set_current()
set_default_app(app)
trace._tasks = app._tasks # make sure this optimization is set.
platforms.signals.reset(*WORKER_SIGRESET)
Expand Down
8 changes: 3 additions & 5 deletions celery/tests/app/test_app.py
Expand Up @@ -203,15 +203,13 @@ def test_amqp_get_broker_info(self):
"userid": "guest",
"password": "guest",
"virtual_host": "/"},
self.app.broker_connection(
transport="amqplib").info())
self.app.broker_connection("amqplib://").info())
self.app.conf.BROKER_PORT = 1978
self.app.conf.BROKER_VHOST = "foo"
self.assertDictContainsSubset({"port": 1978,
"virtual_host": "foo"},
self.app.broker_connection(
transport="amqplib").info())
conn = self.app.broker_connection(virtual_host="/value")
self.app.broker_connection("amqplib://:1978/foo").info())
conn = self.app.broker_connection("amqplib:////value")
self.assertDictContainsSubset({"virtual_host": "/value"},
conn.info())

Expand Down
10 changes: 7 additions & 3 deletions celery/tests/app/test_builtins.py
Expand Up @@ -63,9 +63,13 @@ def test_apply_async(self):
def test_apply_async_with_parent(self):
_task_stack.push(add)
try:
x = group([add.s(4, 4), add.s(8, 8)])
x.apply_async()
self.assertTrue(add.request.children)
add.push_request(called_directly=False)
try:
x = group([add.s(4, 4), add.s(8, 8)])
x.apply_async()
self.assertTrue(add.request.children)
finally:
add.pop_request()
finally:
_task_stack.pop()

Expand Down
1 change: 1 addition & 0 deletions celery/tests/app/test_loaders.py
Expand Up @@ -183,6 +183,7 @@ def test_read_configuration_py_in_name(self, find_module):

@patch("celery.loaders.default.find_module")
def test_read_configuration_importerror(self, find_module):
default.C_WNOCONF = True
find_module.side_effect = ImportError()
l = default.Loader()
with self.assertWarnsRegex(NotConfigured, r'make sure it exists'):
Expand Down
54 changes: 27 additions & 27 deletions celery/tests/backends/test_amqp.py
Expand Up @@ -14,7 +14,7 @@
from celery.exceptions import TimeoutError
from celery.utils import uuid

from celery.tests.utils import Case, sleepdeprived
from celery.tests.utils import AppCase, sleepdeprived


class SomeClass(object):
Expand All @@ -23,7 +23,7 @@ def __init__(self, data):
self.data = data


class test_AMQPBackend(Case):
class test_AMQPBackend(AppCase):

def create_backend(self, **opts):
opts = dict(dict(serializer="pickle", persistent=False), **opts)
Expand Down Expand Up @@ -101,35 +101,35 @@ def test_expires_is_timedelta(self):

@sleepdeprived()
def test_store_result_retries(self):
iterations = [0]
stop_raising_at = [5]

class _Producer(object):
iterations = 0
stop_raising_at = 5

def __init__(self, *args, **kwargs):
pass

def publish(self, msg, *args, **kwargs):
if self.iterations > self.stop_raising_at:
return
raise KeyError("foo")

class Backend(AMQPBackend):
Producer = _Producer
def publish(*args, **kwargs):
if iterations[0] > stop_raising_at[0]:
return
iterations[0] += 1
raise KeyError("foo")

backend = Backend()
with self.assertRaises(KeyError):
backend.store_result("foo", "bar", "STARTED", max_retries=None)
backend = AMQPBackend()
from celery.app.amqp import TaskProducer
prod, TaskProducer.publish = TaskProducer.publish, publish
try:
with self.assertRaises(KeyError):
backend.retry_policy["max_retries"] = None
backend.store_result("foo", "bar", "STARTED")

with self.assertRaises(KeyError):
backend.store_result("foo", "bar", "STARTED", max_retries=10)
with self.assertRaises(KeyError):
backend.retry_policy["max_retries"] = 10
backend.store_result("foo", "bar", "STARTED")
finally:
TaskProducer.publish = prod

def assertState(self, retval, state):
self.assertEqual(retval["status"], state)

def test_poll_no_messages(self):
b = self.create_backend()
self.assertState(b.poll(uuid()), states.PENDING)
self.assertState(b.get_task_meta(uuid()), states.PENDING)

def test_poll_result(self):

Expand Down Expand Up @@ -167,22 +167,22 @@ class MockBackend(AMQPBackend):
results.put(Message(status=states.RECEIVED, seq=1))
results.put(Message(status=states.STARTED, seq=2))
results.put(Message(status=states.FAILURE, seq=3))
r1 = backend.poll(uuid())
r1 = backend.get_task_meta(uuid())
self.assertDictContainsSubset({"status": states.FAILURE,
"seq": 3}, r1,
"FFWDs to the last state")

# Caches last known state.
results.put(Message())
tid = uuid()
backend.poll(tid)
backend.get_task_meta(tid)
self.assertIn(tid, backend._cache, "Caches last known state")

# Returns cache if no new states.
results.queue.clear()
assert not results.qsize()
backend._cache[tid] = "hello"
self.assertEqual(backend.poll(tid), "hello",
self.assertEqual(backend.get_task_meta(tid), "hello",
"Returns cache if no new states")

def test_wait_for(self):
Expand Down Expand Up @@ -217,7 +217,7 @@ def drain_events(self, timeout=None):
b = self.create_backend()
with current_app.pool.acquire_channel(block=False) as (_, channel):
binding = b._create_binding(uuid())
consumer = b._create_consumer(binding, channel)
consumer = b.Consumer(channel, binding, no_ack=True)
with self.assertRaises(socket.timeout):
b.drain_events(Connection(), consumer, timeout=0.1)

Expand Down Expand Up @@ -249,7 +249,7 @@ def test_test_get_many_raises_outer_block(self):

class Backend(AMQPBackend):

def _create_consumer(self, *args, **kwargs):
def Consumer(*args, **kwargs):
raise KeyError("foo")

b = Backend()
Expand Down
21 changes: 13 additions & 8 deletions celery/tests/bin/test_base.py
Expand Up @@ -105,14 +105,19 @@ def test_with_custom_app(self):

def test_with_cmdline_config(self):
cmd = MockCommand()
cmd.enable_config_from_cmdline = True
cmd.namespace = "celeryd"
rest = cmd.setup_app_from_commandline(argv=[
"--loglevel=INFO", "--", "broker.host=broker.example.com",
".prefetch_multiplier=100"])
self.assertEqual(cmd.app.conf.BROKER_HOST, "broker.example.com")
self.assertEqual(cmd.app.conf.CELERYD_PREFETCH_MULTIPLIER, 100)
self.assertListEqual(rest, ["--loglevel=INFO"])
try:
cmd.enable_config_from_cmdline = True
cmd.namespace = "celeryd"
rest = cmd.setup_app_from_commandline(argv=[
"--loglevel=INFO", "--",
"broker.url=amqp://broker.example.com",
".prefetch_multiplier=100"])
self.assertEqual(cmd.app.conf.BROKER_URL,
"amqp://broker.example.com")
self.assertEqual(cmd.app.conf.CELERYD_PREFETCH_MULTIPLIER, 100)
self.assertListEqual(rest, ["--loglevel=INFO"])
finally:
cmd.app.conf.BROKER_URL = "memory://"

def test_parse_preload_options_shortopt(self):
cmd = Command()
Expand Down
3 changes: 3 additions & 0 deletions celery/tests/config.py
Expand Up @@ -6,6 +6,9 @@

BROKER_URL = "memory://"

#: warn if config module not found
os.environ["C_WNOCONF"] = "yes"

#: Don't want log output when running suite.
CELERYD_HIJACK_ROOT_LOGGER = False

Expand Down
6 changes: 3 additions & 3 deletions celery/tests/worker/test_control.py
Expand Up @@ -409,7 +409,7 @@ def reply(self, data, exchange, routing_key, **kwargs):

def test_pool_restart(self):
consumer = Consumer()
consumer.controller = _WC()
consumer.controller = _WC(app=current_app)
consumer.controller.pool.restart = Mock()
panel = self.create_panel(consumer=consumer)
panel.app = self.app
Expand All @@ -423,7 +423,7 @@ def test_pool_restart(self):

def test_pool_restart_import_modules(self):
consumer = Consumer()
consumer.controller = _WC()
consumer.controller = _WC(app=current_app)
consumer.controller.pool.restart = Mock()
panel = self.create_panel(consumer=consumer)
panel.app = self.app
Expand All @@ -440,7 +440,7 @@ def test_pool_restart_import_modules(self):

def test_pool_restart_relaod_modules(self):
consumer = Consumer()
consumer.controller = _WC()
consumer.controller = _WC(app=current_app)
consumer.controller.pool.restart = Mock()
panel = self.create_panel(consumer=consumer)
panel.app = self.app
Expand Down
2 changes: 1 addition & 1 deletion celery/tests/worker/test_request.py
Expand Up @@ -716,7 +716,7 @@ def apply_async(self, target, args=None, kwargs=None,
p = MockPool()
tw.execute_using_pool(p)
self.assertTrue(p.target)
self.assertEqual(p.args[0], mytask)
self.assertEqual(p.args[0], mytask.name)
self.assertEqual(p.args[1], tid)
self.assertEqual(p.args[2], [4])
self.assertIn("f", p.args[3])
Expand Down
2 changes: 1 addition & 1 deletion celery/tests/worker/test_worker.py
Expand Up @@ -755,7 +755,7 @@ def teardown(self):
worker.logger = self._logger

def create_worker(self, **kw):
worker = WorkController(concurrency=1, loglevel=0, **kw)
worker = self.app.WorkController(concurrency=1, loglevel=0, **kw)
worker._shutdown_complete.set()
return worker

Expand Down
3 changes: 1 addition & 2 deletions celery/worker/__init__.py
Expand Up @@ -298,15 +298,14 @@ class WorkController(configurated):
def __init__(self, loglevel=None, hostname=None, ready_callback=noop,
queues=None, app=None, pidfile=None, **kwargs):
self.app = app_or_default(app or self.app)

# all new threads start without a current app, so if an app is not
# passed on to the thread it will fall back to the "default app",
# which then could be the wrong app. So for the worker
# we set this to always return our app. This is a hack,
# and means that only a single app can be used for workers
# running in the same process.
set_default_app(self.app)
app.finalize()
self.app.finalize()
trace._tasks = self.app._tasks

self._shutdown_complete = Event()
Expand Down

0 comments on commit 184bda8

Please sign in to comment.