Skip to content

Commit

Permalink
WIP: Allow debugging pytest-celery dev worker with vscode during test…
Browse files Browse the repository at this point in the history
…, also, run with: tox -e "3.12-smoke" -- -k test_blm_348 --reruns 0
  • Loading branch information
Nusnus committed Apr 3, 2024
1 parent ae7fb68 commit 3e5824a
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 67 deletions.
9 changes: 0 additions & 9 deletions celery/bootsteps.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,19 +362,10 @@ class StartStopStep(Step):

def start(self, parent):
if self.obj:
# if isinstance(self.obj, list):
# from threading import Thread
# y = Thread(target=self.obj[1].start)
# x = Thread(target=self.obj[0].start)
# x.start()
# y.start()
# return [x,y]
return self.obj.start()

def stop(self, parent):
if self.obj:
if isinstance(self.obj, list):
return [o.stop() for o in self.obj]
return self.obj.stop()

def close(self, parent):
Expand Down
31 changes: 6 additions & 25 deletions celery/worker/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from celery.exceptions import ImproperlyConfigured
from celery.platforms import IS_WINDOWS
from celery.utils.log import worker_logger as logger
from celery.utils.threads import bgThread

__all__ = ('Timer', 'Hub', 'Pool', 'Beat', 'StateDB', 'Consumer')

Expand Down Expand Up @@ -259,29 +258,11 @@ def create(self, w):

def start(self, parent):
if self.obj:
# self.obj[0].start()
# self.obj[1].start()
# return True
if isinstance(self.obj, list):
# from threading import Thread
# y = Thread(target=self.obj[1].start)
# x = Thread(target=self.obj[0].start)
# x.start()
# y.start()
# return [x,y]
broker0 = StartConsumerThread(name='broker0', obj=self.obj[0])
broker1 = StartConsumerThread(name='broker1', obj=self.obj[1])
broker0.start()
broker1.start()
return [broker0, broker1]
from threading import Thread
y = Thread(target=self.obj[0].start)
x = Thread(target=self.obj[1].start)
x.start()
y.start()
return [x, y]
return self.obj.start()



class StartConsumerThread(bgThread):
def __init__(self, name=None, obj=None, **kwargs):
super().__init__(name, **kwargs)
self.obj = obj

def body(self):
self.obj.start()
140 changes: 108 additions & 32 deletions t/smoke/tests/test_blm-347.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@

import pytest
from pytest_celery import (RABBITMQ_CONTAINER_TIMEOUT, RABBITMQ_PORTS, RESULT_TIMEOUT, CeleryBackendCluster,
CeleryBrokerCluster, CeleryTestSetup, RabbitMQContainer, RabbitMQTestBroker,
RedisTestBackend)
CeleryBrokerCluster, CeleryTestSetup, MemcachedTestBackend, RabbitMQContainer,
RabbitMQTestBroker)
from pytest_docker_tools import build, container, fxtr

from celery import Celery
from celery.canvas import Signature # noqa
from celery.result import AsyncResult # noqa
from celery.canvas import Signature, group
from celery.result import AsyncResult
from t.integration.tasks import identity
from t.smoke.workers.dev import SmokeWorkerContainer

###############################################################################
# RabbitMQ Management Broker
# Broker
###############################################################################


Expand Down Expand Up @@ -78,26 +78,21 @@ def celery_broker_cluster(


###############################################################################
# Redis Result Backend
# Result Backend
###############################################################################


@pytest.fixture
def celery_backend_cluster(
celery_redis_backend: RedisTestBackend,
celery_memcached_backend: MemcachedTestBackend,
) -> CeleryBackendCluster:
cluster = CeleryBackendCluster(celery_redis_backend)
cluster = CeleryBackendCluster(celery_memcached_backend)
yield cluster
cluster.teardown()


@pytest.fixture
def default_redis_backend_image() -> str:
return "redis:latest"


###############################################################################
# Worker Configuration
# Worker
###############################################################################


Expand All @@ -112,13 +107,33 @@ def worker_queue(cls) -> str:

@classmethod
def command(cls, *args: str) -> list[str]:
return super().command(
return [
"python",
"-m",
"debugpy",
"--listen",
"0.0.0.0:5678",
# "--wait-for-client",
"-m",
] + super().command(
"-Ofair",
"--without-gossip",
"--without-mingle",
"--without-heartbeat",
# "-P",
# "gevent",
"-P",
"gevent",
)

@classmethod
def initial_env(
cls, celery_worker_cluster_config: dict, initial: dict | None = None
) -> dict:
return super().initial_env(
celery_worker_cluster_config,
{
"GEVENT_SUPPORT": True,
"PYDEVD_DISABLE_FILE_VALIDATION": 1,
},
)


Expand Down Expand Up @@ -168,21 +183,18 @@ def default_worker_app(default_worker_app: Celery) -> Celery:

app = default_worker_app
app.conf.update(
# Serialization formats
accept_content=["json"],
task_serializer="json",
result_serializer="json",
event_serializer="json",
# Broker settings
# broker_url=get_broker_url(ssl=False),
broker_use_ssl=False,
broker_transport_options={"confirm_publish": True},
broker_login_method="PLAIN",
# Result backend and worker settings
# result_backend="dbcache+bas://dlj2backendsvc-1.4",
worker_prefetch_multiplier=1,
task_acks_late=True,
# Task retry, expiration, and other settings
task_reject_on_worker_lost=True,
task_result_expires=60 * 60 * 24, # 24 hours
task_publish_retry_policy={
# "errback": kombu_error_handler,
Expand All @@ -194,6 +206,7 @@ def default_worker_app(default_worker_app: Celery) -> Celery:
},
task_default_delivery_mode=1,
worker_enable_remote_control=False,
worker_cancel_long_running_tasks_on_connection_loss=True,
)

return app
Expand All @@ -204,21 +217,84 @@ def default_worker_app(default_worker_app: Celery) -> Celery:
###############################################################################


def test_blm_348(
@pytest.fixture
def publish_to_broker():
def _publish_to_broker(
broker: RabbitMQManagementTestBroker,
sig: Signature,
) -> AsyncResult:
app = Celery(sig.app.main)
app.conf = sig.app.conf
app.conf["broker_url"] = broker.config()["host_url"]
sig._app = app
if isinstance(sig, group):
for t in sig.tasks:
t._app = app
return sig.apply_async()

return _publish_to_broker


def test_blm_348_publish_to_two_brokers(
celery_setup: CeleryTestSetup,
broker1: RabbitMQManagementTestBroker,
broker2: RabbitMQManagementTestBroker,
publish_to_broker,
):
sig: Signature = identity.s("test_blm_348")

# Publish to broker1
app = Celery(celery_setup.app.main)
app.conf = celery_setup.app.conf
app.conf["broker_url"] = broker1.config()["host_url"]
assert identity.s("test_blm_348").apply_async(app=app).get(timeout=RESULT_TIMEOUT) == "test_blm_348"
print(celery_setup.worker.logs())
res: AsyncResult = publish_to_broker(broker1, sig)
assert res.get(timeout=RESULT_TIMEOUT) == "test_blm_348"

# Publish to broker2
app = Celery(celery_setup.app.main)
app.conf = celery_setup.app.conf
app.conf["broker_url"] = broker2.config()["host_url"]
assert identity.s("test_blm_348").apply_async(app=app).get(timeout=RESULT_TIMEOUT) == "test_blm_348"
print(celery_setup.worker.logs())
res: AsyncResult = publish_to_broker(broker2, sig)
assert res.get(timeout=RESULT_TIMEOUT) == "test_blm_348"

print("Done\n" + celery_setup.worker.logs())


def test_blm_348_large_traffic(
celery_setup: CeleryTestSetup,
broker1: RabbitMQManagementTestBroker,
broker2: RabbitMQManagementTestBroker,
publish_to_broker,
):
RESULT_TIMEOUT = 60 * 3
count = 100
sig = group([identity.s(i) for i in range(count)])

# Publish large traffic to broker1
res: AsyncResult = publish_to_broker(broker1, sig)
assert res.get(timeout=RESULT_TIMEOUT) == list(range(count))

# Publish large traffic to broker2
res: AsyncResult = publish_to_broker(broker2, sig)
assert res.get(timeout=RESULT_TIMEOUT) == list(range(count))

# Publish to both brokers
sig1 = group(identity.s(i) for i in range(count // 2))
sig2 = group(identity.s(i) for i in range(count // 2, count))
res1: AsyncResult = publish_to_broker(broker1, sig1)
res2: AsyncResult = publish_to_broker(broker2, sig2)
assert res1.get(timeout=RESULT_TIMEOUT) == list(range(count // 2))
assert res2.get(timeout=RESULT_TIMEOUT) == list(range(count // 2, count))

print("Done\n" + celery_setup.worker.logs())


def test_blm_348_broker2_only(
celery_setup: CeleryTestSetup,
broker1: RabbitMQManagementTestBroker,
broker2: RabbitMQManagementTestBroker,
publish_to_broker,
):
count = 10
sig = group([identity.s(i) for i in range(count)])

# Publish large traffic to broker2
broker1.kill()
res: AsyncResult = publish_to_broker(broker2, sig)
assert res.get(timeout=RESULT_TIMEOUT) == list(range(count))

print("Done\n" + celery_setup.worker.logs())
1 change: 1 addition & 0 deletions t/smoke/workers/dev.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def worker_queue(cls) -> str:

default_worker_container = container(
image="{celery_dev_worker_image.id}",
ports={"5678/tcp": "5678"},
environment=fxtr("default_worker_env"),
network="{default_pytest_celery_network.name}",
volumes={
Expand Down
5 changes: 4 additions & 1 deletion t/smoke/workers/docker/dev
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,17 @@ ENV WORKER_QUEUE=$CELERY_WORKER_QUEUE
ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1

EXPOSE 5678

# Install celery from source
WORKDIR /celery

COPY --chown=test_user:test_user . /celery
RUN pip install --no-cache-dir --upgrade \
pip \
-e /celery[redis,pymemcache,gevent] \
pytest-celery==1.0.0rc2
pytest-celery==1.0.0rc2 \
debugpy

# The workdir must be /app
WORKDIR /app
Expand Down

0 comments on commit 3e5824a

Please sign in to comment.