From 43a93a938b25f33e51a2c773b86e04146b6a758f Mon Sep 17 00:00:00 2001 From: hseo36 Date: Mon, 6 Apr 2026 11:06:26 -0400 Subject: [PATCH] fix/celery-ssl-skip-empty-key-cert --- providers/celery/provider.yaml | 13 +- .../celery/executors/default_celery.py | 60 +++++--- .../providers/celery/get_provider_info.py | 11 +- .../celery/executors/test_celery_executor.py | 129 ++++++++++++++++++ scripts/ci/prek/known_airflow_exceptions.txt | 2 +- 5 files changed, 189 insertions(+), 26 deletions(-) diff --git a/providers/celery/provider.yaml b/providers/celery/provider.yaml index b89ea4e16ae50..6b6c1007ad331 100644 --- a/providers/celery/provider.yaml +++ b/providers/celery/provider.yaml @@ -265,16 +265,25 @@ config: type: string example: ~ default: "False" + ssl_mutual_tls: + description: | + Whether to require mutual TLS (client certificate authentication). + When True (default), SSL_KEY and SSL_CERT must be set. + Set to False for one-way TLS (server verification only). + version_added: ~ + type: boolean + example: ~ + default: "True" ssl_key: description: | - Path to the client key. + Path to the client key. Required when SSL_MUTUAL_TLS is True. version_added: ~ type: string example: ~ default: "" ssl_cert: description: | - Path to the client certificate. + Path to the client certificate. Required when SSL_MUTUAL_TLS is True. version_added: ~ type: string example: ~ diff --git a/providers/celery/src/airflow/providers/celery/executors/default_celery.py b/providers/celery/src/airflow/providers/celery/executors/default_celery.py index f0ef8185d1ecd..9dd94036734e0 100644 --- a/providers/celery/src/airflow/providers/celery/executors/default_celery.py +++ b/providers/celery/src/airflow/providers/celery/executors/default_celery.py @@ -141,36 +141,54 @@ def get_default_celery_config(team_conf) -> dict[str, Any]: try: if celery_ssl_active: + ssl_mutual_tls = team_conf.getboolean("celery", "SSL_MUTUAL_TLS", fallback=True) + ssl_key = team_conf.get("celery", "SSL_KEY") + ssl_cert = team_conf.get("celery", "SSL_CERT") + ssl_cacert = team_conf.get("celery", "SSL_CACERT") + + if ssl_mutual_tls and (not ssl_key or not ssl_cert): + raise ValueError( + "SSL_MUTUAL_TLS is True (default) but SSL_KEY and/or SSL_CERT are not set. " + "Set both for mutual TLS, or set SSL_MUTUAL_TLS=False for one-way TLS." + ) + + if not ssl_cacert: + log.info("SSL_CACERT is not set. Using system CA certificates for server verification.") + + if not ssl_mutual_tls and (ssl_key or ssl_cert): + log.warning( + "SSL_MUTUAL_TLS is False but SSL_KEY/SSL_CERT are configured. " + "Client certificates will not be used. " + "Set SSL_MUTUAL_TLS=True if you intend to use mutual TLS." + ) + if broker_url and re.search(r"amqps?://", broker_url): - broker_use_ssl = { - "keyfile": team_conf.get("celery", "SSL_KEY"), - "certfile": team_conf.get("celery", "SSL_CERT"), - "ca_certs": team_conf.get("celery", "SSL_CACERT"), - "cert_reqs": ssl.CERT_REQUIRED, - } + broker_use_ssl = {"cert_reqs": ssl.CERT_REQUIRED} + if ssl_cacert: + broker_use_ssl["ca_certs"] = ssl_cacert + if ssl_mutual_tls: + broker_use_ssl["keyfile"] = ssl_key + broker_use_ssl["certfile"] = ssl_cert elif broker_url and re.search("rediss?://|sentinel://", broker_url): - broker_use_ssl = { - "ssl_keyfile": team_conf.get("celery", "SSL_KEY"), - "ssl_certfile": team_conf.get("celery", "SSL_CERT"), - "ssl_ca_certs": team_conf.get("celery", "SSL_CACERT"), - "ssl_cert_reqs": ssl.CERT_REQUIRED, - } + broker_use_ssl = {"ssl_cert_reqs": ssl.CERT_REQUIRED} + if ssl_cacert: + broker_use_ssl["ssl_ca_certs"] = ssl_cacert + if ssl_mutual_tls: + broker_use_ssl["ssl_keyfile"] = ssl_key + broker_use_ssl["ssl_certfile"] = ssl_cert else: - raise AirflowException( + raise ValueError( "The broker you configured does not support SSL_ACTIVE to be True. " "Please use RabbitMQ or Redis if you would like to use SSL for broker." ) config["broker_use_ssl"] = broker_use_ssl - except AirflowConfigException: - raise AirflowException( - "AirflowConfigException: SSL_ACTIVE is True, please ensure SSL_KEY, SSL_CERT and SSL_CACERT are set" - ) + except ValueError: + raise except Exception as e: - raise AirflowException( - f"Exception: There was an unknown Celery SSL Error. Please ensure you want to use SSL and/or have " - f"all necessary certs and key ({e})." - ) + raise RuntimeError( + f"Unknown Celery SSL error. Please ensure you want to use SSL and have all necessary certs and key ({e})." + ) from e # Warning for not recommended backends match_not_recommended_backend = re.search("rediss?://|amqp://|rpc://", result_backend) diff --git a/providers/celery/src/airflow/providers/celery/get_provider_info.py b/providers/celery/src/airflow/providers/celery/get_provider_info.py index 071071133bd6d..ce59a55918f11 100644 --- a/providers/celery/src/airflow/providers/celery/get_provider_info.py +++ b/providers/celery/src/airflow/providers/celery/get_provider_info.py @@ -169,15 +169,22 @@ def get_provider_info(): "example": None, "default": "False", }, + "ssl_mutual_tls": { + "description": "Whether to require mutual TLS (client certificate authentication).\nWhen True (default), SSL_KEY and SSL_CERT must be set.\nSet to False for one-way TLS (server verification only).\n", + "version_added": None, + "type": "boolean", + "example": None, + "default": "True", + }, "ssl_key": { - "description": "Path to the client key.\n", + "description": "Path to the client key. Required when SSL_MUTUAL_TLS is True.\n", "version_added": None, "type": "string", "example": None, "default": "", }, "ssl_cert": { - "description": "Path to the client certificate.\n", + "description": "Path to the client certificate. Required when SSL_MUTUAL_TLS is True.\n", "version_added": None, "type": "string", "example": None, diff --git a/providers/celery/tests/unit/celery/executors/test_celery_executor.py b/providers/celery/tests/unit/celery/executors/test_celery_executor.py index d69df9ec5e7e6..b1fee33cb755d 100644 --- a/providers/celery/tests/unit/celery/executors/test_celery_executor.py +++ b/providers/celery/tests/unit/celery/executors/test_celery_executor.py @@ -872,6 +872,64 @@ def test_amqp_broker_url_still_builds_ssl_config(self): assert broker_ssl["keyfile"] == "/path/to/key.pem" assert broker_ssl["cert_reqs"] == ssl.CERT_REQUIRED + @conf_vars( + { + ("celery", "BROKER_URL"): "rediss://redis:6380//", + ("celery", "SSL_ACTIVE"): "True", + ("celery", "SSL_KEY"): "/path/to/key.pem", + ("celery", "SSL_CERT"): "/path/to/cert.pem", + ("celery", "SSL_CACERT"): "/path/to/ca.pem", + } + ) + def test_redis_mutual_tls_builds_ssl_config(self): + """Test mutual TLS: all three SSL keys produce correct broker_use_ssl for Redis.""" + import importlib + import ssl + + importlib.reload(default_celery) + + config = default_celery.DEFAULT_CELERY_CONFIG + assert "broker_use_ssl" in config + broker_ssl = config["broker_use_ssl"] + assert broker_ssl["ssl_keyfile"] == "/path/to/key.pem" + assert broker_ssl["ssl_certfile"] == "/path/to/cert.pem" + assert broker_ssl["ssl_ca_certs"] == "/path/to/ca.pem" + assert broker_ssl["ssl_cert_reqs"] == ssl.CERT_REQUIRED + + @conf_vars( + { + ("celery", "BROKER_URL"): "amqps://guest:guest@rabbitmq:5671//", + ("celery", "SSL_ACTIVE"): "True", + ("celery", "SSL_CACERT"): "/path/to/ca.pem", + } + ) + def test_amqps_mutual_tls_missing_key_cert_raises(self): + """Test that mutual TLS (default) raises error when SSL_KEY/SSL_CERT are missing.""" + import importlib + + with pytest.raises(ValueError, match="SSL_MUTUAL_TLS is True.*but SSL_KEY and/or SSL_CERT"): + importlib.reload(default_celery) + + @conf_vars( + { + ("celery", "BROKER_URL"): "amqps://guest:guest@rabbitmq:5671//", + ("celery", "SSL_ACTIVE"): "True", + ("celery", "SSL_KEY"): "/path/to/key", + ("celery", "SSL_CERT"): "/path/to/cert", + ("celery", "SSL_CACERT"): "", + } + ) + def test_ssl_active_without_cacert_uses_system_cas(self): + """Test that empty SSL_CACERT falls back to system CAs (ca_certs omitted from config).""" + import importlib + import ssl + + importlib.reload(default_celery) + broker_ssl = default_celery.DEFAULT_CELERY_CONFIG["broker_use_ssl"] + + assert "ca_certs" not in broker_ssl + assert broker_ssl["cert_reqs"] == ssl.CERT_REQUIRED + @conf_vars( { ("celery", "BROKER_URL"): "amqps://guest:guest@rabbitmq:5671//", @@ -887,6 +945,77 @@ def test_amqps_broker_url_no_ssl_when_inactive(self): config = default_celery.DEFAULT_CELERY_CONFIG assert "broker_use_ssl" not in config + @conf_vars( + { + ("celery", "BROKER_URL"): "amqps://guest:guest@rabbitmq:5671//", + ("celery", "SSL_ACTIVE"): "True", + ("celery", "SSL_MUTUAL_TLS"): "False", + ("celery", "SSL_CACERT"): "/path/to/ca.pem", + } + ) + def test_amqps_one_way_tls(self): + """Test one-way TLS for AMQP: only ca_certs, no keyfile/certfile.""" + import importlib + import ssl + + importlib.reload(default_celery) + + config = default_celery.DEFAULT_CELERY_CONFIG + assert "broker_use_ssl" in config + broker_ssl = config["broker_use_ssl"] + assert broker_ssl["ca_certs"] == "/path/to/ca.pem" + assert broker_ssl["cert_reqs"] == ssl.CERT_REQUIRED + assert "keyfile" not in broker_ssl + assert "certfile" not in broker_ssl + + @conf_vars( + { + ("celery", "BROKER_URL"): "rediss://redis:6380//", + ("celery", "SSL_ACTIVE"): "True", + ("celery", "SSL_MUTUAL_TLS"): "False", + ("celery", "SSL_CACERT"): "/path/to/ca.pem", + } + ) + def test_redis_one_way_tls(self): + """Test one-way TLS for Redis: only ssl_ca_certs, no ssl_keyfile/ssl_certfile.""" + import importlib + import ssl + + importlib.reload(default_celery) + + config = default_celery.DEFAULT_CELERY_CONFIG + assert "broker_use_ssl" in config + broker_ssl = config["broker_use_ssl"] + assert broker_ssl["ssl_ca_certs"] == "/path/to/ca.pem" + assert broker_ssl["ssl_cert_reqs"] == ssl.CERT_REQUIRED + assert "ssl_keyfile" not in broker_ssl + assert "ssl_certfile" not in broker_ssl + + @conf_vars( + { + ("celery", "BROKER_URL"): "amqps://guest:guest@rabbitmq:5671//", + ("celery", "SSL_ACTIVE"): "True", + ("celery", "SSL_MUTUAL_TLS"): "False", + ("celery", "SSL_KEY"): "/path/to/key.pem", + ("celery", "SSL_CERT"): "/path/to/cert.pem", + ("celery", "SSL_CACERT"): "/path/to/ca.pem", + } + ) + def test_one_way_tls_ignores_key_cert(self): + """Test that SSL_KEY/SSL_CERT are ignored when SSL_MUTUAL_TLS is False.""" + import importlib + import ssl + + importlib.reload(default_celery) + + config = default_celery.DEFAULT_CELERY_CONFIG + assert "broker_use_ssl" in config + broker_ssl = config["broker_use_ssl"] + assert broker_ssl["ca_certs"] == "/path/to/ca.pem" + assert broker_ssl["cert_reqs"] == ssl.CERT_REQUIRED + assert "keyfile" not in broker_ssl + assert "certfile" not in broker_ssl + class TestCreateCeleryAppTeamIsolation: """Tests for create_celery_app() multi-team config isolation.""" diff --git a/scripts/ci/prek/known_airflow_exceptions.txt b/scripts/ci/prek/known_airflow_exceptions.txt index 9cd3d52cd1f6b..1a870c169d602 100644 --- a/scripts/ci/prek/known_airflow_exceptions.txt +++ b/scripts/ci/prek/known_airflow_exceptions.txt @@ -150,7 +150,7 @@ providers/arangodb/src/airflow/providers/arangodb/hooks/arangodb.py::9 providers/arangodb/src/airflow/providers/arangodb/operators/arangodb.py::1 providers/atlassian/jira/src/airflow/providers/atlassian/jira/hooks/jira.py::1 providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py::2 -providers/celery/src/airflow/providers/celery/executors/default_celery.py::5 +providers/celery/src/airflow/providers/celery/executors/default_celery.py::2 providers/celery/tests/integration/celery/test_celery_executor.py::2 providers/cloudant/src/airflow/providers/cloudant/hooks/cloudant.py::2 providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py::3