diff --git a/docker-compose.yml b/docker-compose.yml index 8190217ef46..bbb3718fecb 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -135,9 +135,6 @@ services: FIDES__CONFIG_PATH: ${FIDES__CONFIG_PATH:-/fides/.fides/fides.toml} FIDES__LOGGING__COLORIZE: "True" FIDES__USER__ANALYTICS_OPT_OUT: "True" - FIDES__CELERY__HEALTHCHECK_PORT: "9001" - expose: - - 9001 volumes: - type: bind source: ./ @@ -146,16 +143,52 @@ services: - /fides/src/fides.egg-info worker-privacy-preferences: - extends: - service: worker-other + image: ethyca/fides:local command: fides worker --queues=fides.privacy_preferences,fides.privacy_request_exports,fides.privacy_request_ingestion + depends_on: + redis: + condition: service_started + restart: always + healthcheck: + test: ["CMD", "celery", "-A", "fides.api.tasks", "inspect", "ping"] + start_period: 60s + interval: 20s + timeout: 5s + retries: 10 + environment: + FIDES__CONFIG_PATH: ${FIDES__CONFIG_PATH:-/fides/.fides/fides.toml} + FIDES__LOGGING__COLORIZE: "True" + FIDES__USER__ANALYTICS_OPT_OUT: "True" + volumes: + - type: bind + source: ./ + target: /fides + read_only: False + - /fides/src/fides.egg-info worker-dsr: - extends: - service: worker-other - healthcheck: - test: [ "CMD", "curl", "-f", "http://localhost:9001/"] + image: ethyca/fides:local command: fides worker --queues=fides.dsr + depends_on: + redis: + condition: service_started + restart: always + healthcheck: + test: ["CMD", "celery", "-A", "fides.api.tasks", "inspect", "ping"] + start_period: 60s + interval: 20s + timeout: 5s + retries: 10 + environment: + FIDES__CONFIG_PATH: ${FIDES__CONFIG_PATH:-/fides/.fides/fides.toml} + FIDES__LOGGING__COLORIZE: "True" + FIDES__USER__ANALYTICS_OPT_OUT: "True" + volumes: + - type: bind + source: ./ + target: /fides + read_only: False + - /fides/src/fides.egg-info redis: image: "redis:8.0-alpine" diff --git a/src/fides/api/tasks/__init__.py b/src/fides/api/tasks/__init__.py index 08b0fcf4cd9..63b992673da 100644 --- a/src/fides/api/tasks/__init__.py +++ b/src/fides/api/tasks/__init__.py @@ -14,7 +14,6 @@ ) from fides.api.db.session import get_db_engine, get_db_session -from fides.api.tasks import celery_healthcheck from fides.api.util.logger import setup as setup_logging from fides.config import CONFIG, FidesConfig @@ -103,7 +102,6 @@ def _create_celery(config: FidesConfig = CONFIG) -> Celery: ) app = Celery(__name__) - celery_healthcheck.register(app) # type: ignore celery_config: Dict[str, Any] = { # Defaults for the celery config @@ -114,8 +112,6 @@ def _create_celery(config: FidesConfig = CONFIG) -> Celery: # Ops requires this to route emails to separate queues "task_create_missing_queues": True, "task_default_queue": "fides", - "healthcheck_port": config.celery.healthcheck_port, - "healthcheck_ping_timeout": config.celery.healthcheck_ping_timeout, } celery_config.update(config.celery) diff --git a/src/fides/api/tasks/celery_healthcheck/README.md b/src/fides/api/tasks/celery_healthcheck/README.md deleted file mode 100644 index 659ae7421b9..00000000000 --- a/src/fides/api/tasks/celery_healthcheck/README.md +++ /dev/null @@ -1,4 +0,0 @@ -This is a copy of celery-healthcheck - it's added here manually because our current -pinned dependencies are not compatible with the package's pinned dependencies (but -they do work with the code). Once we upgrade fastapi and a few other dependencies, -we can remove this (if we want - it's MIT licensed so this is reasonable). \ No newline at end of file diff --git a/src/fides/api/tasks/celery_healthcheck/__init__.py b/src/fides/api/tasks/celery_healthcheck/__init__.py deleted file mode 100644 index b2c85f80985..00000000000 --- a/src/fides/api/tasks/celery_healthcheck/__init__.py +++ /dev/null @@ -1,11 +0,0 @@ -# fmt: off -# type: ignore -# pylint: skip-file -# isort:off - - -from .server import HealthCheckServer - - -def register(celery_app): - celery_app.steps["worker"].add(HealthCheckServer) diff --git a/src/fides/api/tasks/celery_healthcheck/server.py b/src/fides/api/tasks/celery_healthcheck/server.py deleted file mode 100644 index f1692e03634..00000000000 --- a/src/fides/api/tasks/celery_healthcheck/server.py +++ /dev/null @@ -1,73 +0,0 @@ -# fmt: off -# type: ignore -# pylint: skip-file -# isort:off - -import logging -import threading - -import uvicorn -from celery import bootsteps -from celery.worker import WorkController -from fastapi import FastAPI, status -from fastapi.responses import JSONResponse - -app = FastAPI() -logger = logging.getLogger("celery.ext.healthcheck") - -HEALTHCHECK_DEFAULT_PORT = 9000 -HEALTHCHECK_DEFAULT_PING_TIMEOUT = 2.0 - - -class HealthCheckServer(bootsteps.StartStopStep): - def __init__(self, parent: WorkController, **kwargs): - self.thread = None - - # facilitates testing - self.app = app - - # config - self.healthcheck_port = int( - getattr(parent.app.conf, "healthcheck_port", HEALTHCHECK_DEFAULT_PORT) - ) - self.healthcheck_ping_timeout = float( - getattr( - parent.app.conf, - "healthcheck_ping_timeout", - HEALTHCHECK_DEFAULT_PING_TIMEOUT, - ) - ) - - def start(self, parent: WorkController): - @self.app.get("/") - async def celery_ping(): - insp = parent.app.control.inspect( - destination=[parent.hostname], timeout=self.healthcheck_ping_timeout - ) - result = insp.ping() - - if result: - return JSONResponse( - content={"status": "ok", "result": result}, - status_code=status.HTTP_200_OK, - ) - else: - return JSONResponse( - content={"status": "error", "result": result}, - status_code=status.HTTP_503_SERVICE_UNAVAILABLE, - ) - - def run_server(): - uvicorn.run( - self.app, - host="0.0.0.0", - port=self.healthcheck_port, - ) - - self.thread = threading.Thread(target=run_server, daemon=True) - self.thread.start() - - logger.info(f"Health check server started on port {self.healthcheck_port}") - - def stop(self, parent: WorkController): - pass diff --git a/src/fides/config/celery_settings.py b/src/fides/config/celery_settings.py index 841ee60dbf2..ea185e862a4 100644 --- a/src/fides/config/celery_settings.py +++ b/src/fides/config/celery_settings.py @@ -27,12 +27,6 @@ class CelerySettings(FidesSettings): description="If true, tasks are executed locally instead of being sent to the queue. " "If False, tasks are sent to the queue.", ) - healthcheck_port: int = Field( - default=9000, description="The port to use for the health check endpoint" - ) - healthcheck_ping_timeout: float = Field( - default=2.0, description="The timeout in seconds for the health check ping" - ) model_config = SettingsConfigDict(env_prefix=ENV_PREFIX)