Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ repos:
"black",
]

- repo: https://github.com/PyCQA/docformatter
rev: v1.7.5
hooks:
- id: docformatter
args: [--in-place]

- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.5.0
hooks:
Expand Down
5 changes: 1 addition & 4 deletions examples/django/proj/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@
CELERY_ACCEPT_CONTENT = ["json"]
CELERY_RESULT_BACKEND = "db+sqlite:///results.sqlite"
CELERY_TASK_SERIALIZER = "json"


"""
Django settings for proj project.
"""Django settings for proj project.

Generated by 'django-admin startproject' using Django 2.2.1.

Expand Down
4 changes: 1 addition & 3 deletions examples/django/proj/wsgi.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
"""
WSGI config for proj project.
"""WSGI config for proj project.

This module contains the WSGI application used by Django's development server
and any production WSGI deployments. It should expose a module-level variable
Expand All @@ -11,7 +10,6 @@
that later delegates to the Django one. For example, you could introduce WSGI
middleware here, or combine a Django application with an application of another
framework.

"""

import os
Expand Down
4 changes: 1 addition & 3 deletions src/pytest_celery/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
"""
pytest-celery a shim pytest plugin to enable celery.contrib.pytest
"""
"""Pytest-celery a shim pytest plugin to enable celery.contrib.pytest."""

# flake8: noqa

Expand Down
15 changes: 15 additions & 0 deletions src/pytest_celery/api/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@


class CeleryTestBackend(CeleryTestNode):
"""CeleryTestBackend is specialized node type for handling celery backends
nodes. It is used to encapsulate a backend instance.

Responsibility Scope:
Handling backend specific requirements and configuration.
"""

@classmethod
def default_config(cls) -> dict:
return {
Expand All @@ -23,6 +30,14 @@ def restart(self, reload_container: bool = True, force: bool = False) -> None:


class CeleryBackendCluster(CeleryTestCluster):
"""CeleryBackendCluster is a specialized cluster type for handling celery
backends. It is used to define which backend instances are available for
the test.

Responsibility Scope:
Provude useful methods for managing a cluster of celery backends.
"""

def __init__(self, *backends: tuple[CeleryTestBackend | CeleryTestContainer]) -> None:
super().__init__(*backends)

Expand Down
115 changes: 105 additions & 10 deletions src/pytest_celery/api/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,55 +14,92 @@


class CeleryTestNode:
"""CeleryTestNode is the logical representation of a container instance. It
is used to provide a common interface for interacting with the container
regardless of the underlying implementation.

Responsibility Scope:
The node's responsibility is to wrap the container and provide
useful methods for interacting with it.
"""

def __init__(self, container: CeleryTestContainer, app: Celery = None) -> None:
"""Setup the base components of a CeleryTestNode.

Args:
container (CeleryTestContainer): Container to use for the node.
app (Celery, optional): Celery app. Defaults to None.
"""
self._container = container
self._app = app

@property
def container(self) -> CeleryTestContainer:
"""Underlying container for the node."""
return self._container

@property
def app(self) -> Celery:
"""Celery app for the node if available."""
return self._app

def __eq__(self, __value: object) -> bool:
if isinstance(__value, CeleryTestNode):
def __eq__(self, other: object) -> bool:
if isinstance(other, CeleryTestNode):
return all(
(
self.container == __value.container,
self.app == __value.app,
self.container == other.container,
self.app == other.app,
)
)
return False

@classmethod
def default_config(cls) -> dict:
"""Default node configurations if not overridden by the user."""
return {}

def ready(self) -> bool:
"""Waits until the node is ready or raise an exception if it fails to
boot up."""
return self.container.ready()

def config(self, *args: tuple, **kwargs: dict) -> dict:
"""Compile the configurations required for Celery from this node."""
return self.container.celeryconfig

def logs(self) -> str:
"""Get the logs of the underlying container."""
return self.container.logs()

def name(self) -> str:
"""Get the name of this node."""
return self.container.name

def hostname(self) -> str:
"""Get the hostname of this node."""
return self.container.id[:12]

def kill(self, signal: str | int = "SIGKILL", reload_container: bool = True) -> None:
"""Kill the underlying container.

Args:
signal (str | int, optional): Signal to send to the container. Defaults to "SIGKILL".
reload_container (bool, optional): Reload the container object after killing it. Defaults to True.
"""
if self.container.status == "running":
self.container.kill(signal=signal)
if reload_container:
self.container.reload()

def restart(self, reload_container: bool = True, force: bool = False) -> None:
"""Restart the underlying container.

Args:
reload_container (bool, optional): Reload the container object after restarting it. Defaults to True.
force (bool, optional): Kill the container before restarting it. Defaults to False.
"""
if force:
# Use SIGTERM to allow the container to gracefully shutdown
self.kill(signal="SIGTERM", reload_container=reload_container)
self.container.restart(timeout=CONTAINER_TIMEOUT)
if reload_container:
Expand All @@ -71,19 +108,41 @@ def restart(self, reload_container: bool = True, force: bool = False) -> None:
self.app.set_current()

def teardown(self) -> None:
"""Teardown the node."""
self.container.teardown()

def wait_for_log(self, log: str, message: str = "", timeout: int = RESULT_TIMEOUT) -> None:
"""Wait for a log to appear in the container.

Args:
log (str): Log to wait for.
message (str, optional): Message to display while waiting. Defaults to "".
timeout (int, optional): Timeout in seconds. Defaults to RESULT_TIMEOUT.
"""
message = message or f"Waiting for worker container '{self.name()}' to log -> {log}"
wait_for_callable(message=message, func=lambda: log in self.logs(), timeout=timeout)

def assert_log_exists(self, log: str, message: str = "", timeout: int = RESULT_TIMEOUT) -> None:
"""Assert that a log exists in the container.

Args:
log (str): Log to assert.
message (str, optional): Message to display while waiting. Defaults to "".
timeout (int, optional): Timeout in seconds. Defaults to RESULT_TIMEOUT.
"""
try:
self.wait_for_log(log, message, timeout)
except pytest_docker_tools.exceptions.TimeoutError:
assert False, f"Worker container '{self.name()}' did not log -> {log} within {timeout} seconds"

def assert_log_does_not_exist(self, log: str, message: str = "", timeout: int = 1) -> None:
"""Assert that a log does not exist in the container.

Args:
log (str): Log to assert.
message (str, optional): Message to display while waiting. Defaults to "".
timeout (int, optional): Timeout in seconds. Defaults to 1.
"""
message = message or f"Waiting for worker container '{self.name()}' to not log -> {log}"
try:
self.wait_for_log(log, message, timeout)
Expand All @@ -93,7 +152,24 @@ def assert_log_does_not_exist(self, log: str, message: str = "", timeout: int =


class CeleryTestCluster:
"""CeleryTestCluster is a collection of CeleryTestNodes. It is used to
collect the test nodes into a single object for easier management.

Responsibility Scope:
The cluster's responsibility is to define which nodes will be used for
the test.
"""

def __init__(self, *nodes: tuple[CeleryTestNode | CeleryTestContainer]) -> None:
"""Setup the base components of a CeleryTestCluster.

Args:
*nodes (tuple[CeleryTestNode | CeleryTestContainer]): Nodes to use for the cluster.

Raises:
ValueError: At least one node is required.
TypeError: All nodes must be CeleryTestNode or CeleryTestContainer
"""
if not nodes:
raise ValueError("At least one node is required")
if len(nodes) == 1 and isinstance(nodes[0], list):
Expand All @@ -105,10 +181,16 @@ def __init__(self, *nodes: tuple[CeleryTestNode | CeleryTestContainer]) -> None:

@property
def nodes(self) -> tuple[CeleryTestNode]:
"""Get the nodes of the cluster."""
return self._nodes

@nodes.setter
def nodes(self, nodes: tuple[CeleryTestNode | CeleryTestContainer]) -> None:
"""Set the nodes of the cluster.

Args:
nodes (tuple[CeleryTestNode | CeleryTestContainer]): Nodes to use for the cluster.
"""
self._nodes = self._set_nodes(*nodes) # type: ignore

def __iter__(self) -> Iterator[CeleryTestNode]:
Expand All @@ -120,15 +202,16 @@ def __getitem__(self, index: Any) -> CeleryTestNode:
def __len__(self) -> int:
return len(self.nodes)

def __eq__(self, __value: object) -> bool:
if isinstance(__value, CeleryTestCluster):
def __eq__(self, other: object) -> bool:
if isinstance(other, CeleryTestCluster):
for node in self:
if node not in __value:
if node not in other:
return False
return False

@classmethod
def default_config(cls) -> dict:
"""Default cluster configurations if not overridden by the user."""
return {}

@abstractmethod
Expand All @@ -137,6 +220,15 @@ def _set_nodes(
*nodes: tuple[CeleryTestNode | CeleryTestContainer],
node_cls: type[CeleryTestNode] = CeleryTestNode,
) -> tuple[CeleryTestNode]:
"""Set the nodes of the cluster.

Args:
*nodes (tuple[CeleryTestNode | CeleryTestContainer]): Nodes to use for the cluster.
node_cls (type[CeleryTestNode], optional): Node class to use. Defaults to CeleryTestNode.

Returns:
tuple[CeleryTestNode]: Nodes to use for the cluster.
"""
return tuple(
node_cls(node)
if isinstance(
Expand All @@ -148,16 +240,19 @@ def _set_nodes(
) # type: ignore

def ready(self) -> bool:
"""Waits until the cluster is ready or raise an exception if any of the
nodes fail to boot up."""
return all(node.ready() for node in self)

def config(self, *args: tuple, **kwargs: dict) -> dict:
"""Compile the configurations required for Celery from this cluster."""
config = [node.container.celeryconfig for node in self]
return {
"urls": [c["url"] for c in config],
"local_urls": [c["local_url"] for c in config],
}

def teardown(self) -> None:
# Do not need to call teardown on the nodes
# but only tear down self
pass
"""Teardown the cluster."""
# Nodes teardown themselves, so we just need to clear the cluster
# if there is any cleanup to do
15 changes: 15 additions & 0 deletions src/pytest_celery/api/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@


class CeleryTestBroker(CeleryTestNode):
"""CeleryTestBroker is specialized node type for handling celery brokers
nodes. It is used to encapsulate a broker instance.

Responsibility Scope:
Handling broker specific requirements and configuration.
"""

@classmethod
def default_config(cls) -> dict:
return {
Expand All @@ -23,6 +30,14 @@ def restart(self, reload_container: bool = True, force: bool = False) -> None:


class CeleryBrokerCluster(CeleryTestCluster):
"""CeleryBrokerCluster is a specialized cluster type for handling celery
brokers. It is used to define which broker instances are available for the
test.

Responsibility Scope:
Provude useful methods for managing a cluster of celery brokers.
"""

def __init__(self, *brokers: tuple[CeleryTestBroker | CeleryTestContainer]) -> None:
super().__init__(*brokers)

Expand Down
Loading