Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions providers/celery/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -265,16 +265,25 @@ config:
type: string
example: ~
default: "False"
ssl_mutual_tls:
description: |
Whether to require mutual TLS (client certificate authentication).
When True (default), SSL_KEY and SSL_CERT must be set.
Set to False for one-way TLS (server verification only).
version_added: ~
type: boolean
example: ~
default: "True"
ssl_key:
description: |
Path to the client key.
Path to the client key. Required when SSL_MUTUAL_TLS is True.
version_added: ~
type: string
example: ~
default: ""
ssl_cert:
description: |
Path to the client certificate.
Path to the client certificate. Required when SSL_MUTUAL_TLS is True.
version_added: ~
type: string
example: ~
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,36 +141,54 @@ def get_default_celery_config(team_conf) -> dict[str, Any]:

try:
if celery_ssl_active:
ssl_mutual_tls = team_conf.getboolean("celery", "SSL_MUTUAL_TLS", fallback=True)
ssl_key = team_conf.get("celery", "SSL_KEY")
ssl_cert = team_conf.get("celery", "SSL_CERT")
ssl_cacert = team_conf.get("celery", "SSL_CACERT")

if ssl_mutual_tls and (not ssl_key or not ssl_cert):
raise ValueError(
"SSL_MUTUAL_TLS is True (default) but SSL_KEY and/or SSL_CERT are not set. "
"Set both for mutual TLS, or set SSL_MUTUAL_TLS=False for one-way TLS."
)

if not ssl_cacert:
log.info("SSL_CACERT is not set. Using system CA certificates for server verification.")

if not ssl_mutual_tls and (ssl_key or ssl_cert):
log.warning(
"SSL_MUTUAL_TLS is False but SSL_KEY/SSL_CERT are configured. "
"Client certificates will not be used. "
"Set SSL_MUTUAL_TLS=True if you intend to use mutual TLS."
)

if broker_url and re.search(r"amqps?://", broker_url):
broker_use_ssl = {
"keyfile": team_conf.get("celery", "SSL_KEY"),
"certfile": team_conf.get("celery", "SSL_CERT"),
"ca_certs": team_conf.get("celery", "SSL_CACERT"),
"cert_reqs": ssl.CERT_REQUIRED,
}
broker_use_ssl = {"cert_reqs": ssl.CERT_REQUIRED}
if ssl_cacert:
broker_use_ssl["ca_certs"] = ssl_cacert
if ssl_mutual_tls:
broker_use_ssl["keyfile"] = ssl_key
broker_use_ssl["certfile"] = ssl_cert
elif broker_url and re.search("rediss?://|sentinel://", broker_url):
broker_use_ssl = {
"ssl_keyfile": team_conf.get("celery", "SSL_KEY"),
"ssl_certfile": team_conf.get("celery", "SSL_CERT"),
"ssl_ca_certs": team_conf.get("celery", "SSL_CACERT"),
"ssl_cert_reqs": ssl.CERT_REQUIRED,
}
broker_use_ssl = {"ssl_cert_reqs": ssl.CERT_REQUIRED}
if ssl_cacert:
broker_use_ssl["ssl_ca_certs"] = ssl_cacert
if ssl_mutual_tls:
broker_use_ssl["ssl_keyfile"] = ssl_key
broker_use_ssl["ssl_certfile"] = ssl_cert
else:
raise AirflowException(
raise ValueError(
"The broker you configured does not support SSL_ACTIVE to be True. "
"Please use RabbitMQ or Redis if you would like to use SSL for broker."
)

config["broker_use_ssl"] = broker_use_ssl
except AirflowConfigException:
raise AirflowException(
"AirflowConfigException: SSL_ACTIVE is True, please ensure SSL_KEY, SSL_CERT and SSL_CACERT are set"
)
except ValueError:
raise
except Exception as e:
raise AirflowException(
f"Exception: There was an unknown Celery SSL Error. Please ensure you want to use SSL and/or have "
f"all necessary certs and key ({e})."
)
raise RuntimeError(
f"Unknown Celery SSL error. Please ensure you want to use SSL and have all necessary certs and key ({e})."
) from e

# Warning for not recommended backends
match_not_recommended_backend = re.search("rediss?://|amqp://|rpc://", result_backend)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,15 +169,22 @@ def get_provider_info():
"example": None,
"default": "False",
},
"ssl_mutual_tls": {
"description": "Whether to require mutual TLS (client certificate authentication).\nWhen True (default), SSL_KEY and SSL_CERT must be set.\nSet to False for one-way TLS (server verification only).\n",
"version_added": None,
"type": "boolean",
"example": None,
"default": "True",
},
"ssl_key": {
"description": "Path to the client key.\n",
"description": "Path to the client key. Required when SSL_MUTUAL_TLS is True.\n",
"version_added": None,
"type": "string",
"example": None,
"default": "",
},
"ssl_cert": {
"description": "Path to the client certificate.\n",
"description": "Path to the client certificate. Required when SSL_MUTUAL_TLS is True.\n",
"version_added": None,
"type": "string",
"example": None,
Expand Down
129 changes: 129 additions & 0 deletions providers/celery/tests/unit/celery/executors/test_celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,64 @@ def test_amqp_broker_url_still_builds_ssl_config(self):
assert broker_ssl["keyfile"] == "/path/to/key.pem"
assert broker_ssl["cert_reqs"] == ssl.CERT_REQUIRED

@conf_vars(
{
("celery", "BROKER_URL"): "rediss://redis:6380//",
("celery", "SSL_ACTIVE"): "True",
("celery", "SSL_KEY"): "/path/to/key.pem",
("celery", "SSL_CERT"): "/path/to/cert.pem",
("celery", "SSL_CACERT"): "/path/to/ca.pem",
}
)
def test_redis_mutual_tls_builds_ssl_config(self):
"""Test mutual TLS: all three SSL keys produce correct broker_use_ssl for Redis."""
import importlib
import ssl

importlib.reload(default_celery)

config = default_celery.DEFAULT_CELERY_CONFIG
assert "broker_use_ssl" in config
broker_ssl = config["broker_use_ssl"]
assert broker_ssl["ssl_keyfile"] == "/path/to/key.pem"
assert broker_ssl["ssl_certfile"] == "/path/to/cert.pem"
assert broker_ssl["ssl_ca_certs"] == "/path/to/ca.pem"
assert broker_ssl["ssl_cert_reqs"] == ssl.CERT_REQUIRED

@conf_vars(
{
("celery", "BROKER_URL"): "amqps://guest:guest@rabbitmq:5671//",
("celery", "SSL_ACTIVE"): "True",
("celery", "SSL_CACERT"): "/path/to/ca.pem",
}
)
def test_amqps_mutual_tls_missing_key_cert_raises(self):
"""Test that mutual TLS (default) raises error when SSL_KEY/SSL_CERT are missing."""
import importlib

with pytest.raises(ValueError, match="SSL_MUTUAL_TLS is True.*but SSL_KEY and/or SSL_CERT"):
importlib.reload(default_celery)

@conf_vars(
{
("celery", "BROKER_URL"): "amqps://guest:guest@rabbitmq:5671//",
("celery", "SSL_ACTIVE"): "True",
("celery", "SSL_KEY"): "/path/to/key",
("celery", "SSL_CERT"): "/path/to/cert",
("celery", "SSL_CACERT"): "",
}
)
def test_ssl_active_without_cacert_uses_system_cas(self):
"""Test that empty SSL_CACERT falls back to system CAs (ca_certs omitted from config)."""
import importlib
import ssl

importlib.reload(default_celery)
broker_ssl = default_celery.DEFAULT_CELERY_CONFIG["broker_use_ssl"]

assert "ca_certs" not in broker_ssl
assert broker_ssl["cert_reqs"] == ssl.CERT_REQUIRED

@conf_vars(
{
("celery", "BROKER_URL"): "amqps://guest:guest@rabbitmq:5671//",
Expand All @@ -887,6 +945,77 @@ def test_amqps_broker_url_no_ssl_when_inactive(self):
config = default_celery.DEFAULT_CELERY_CONFIG
assert "broker_use_ssl" not in config

@conf_vars(
{
("celery", "BROKER_URL"): "amqps://guest:guest@rabbitmq:5671//",
("celery", "SSL_ACTIVE"): "True",
("celery", "SSL_MUTUAL_TLS"): "False",
("celery", "SSL_CACERT"): "/path/to/ca.pem",
}
)
def test_amqps_one_way_tls(self):
"""Test one-way TLS for AMQP: only ca_certs, no keyfile/certfile."""
import importlib
import ssl

importlib.reload(default_celery)

config = default_celery.DEFAULT_CELERY_CONFIG
assert "broker_use_ssl" in config
broker_ssl = config["broker_use_ssl"]
assert broker_ssl["ca_certs"] == "/path/to/ca.pem"
assert broker_ssl["cert_reqs"] == ssl.CERT_REQUIRED
assert "keyfile" not in broker_ssl
assert "certfile" not in broker_ssl

@conf_vars(
{
("celery", "BROKER_URL"): "rediss://redis:6380//",
("celery", "SSL_ACTIVE"): "True",
("celery", "SSL_MUTUAL_TLS"): "False",
("celery", "SSL_CACERT"): "/path/to/ca.pem",
}
)
def test_redis_one_way_tls(self):
"""Test one-way TLS for Redis: only ssl_ca_certs, no ssl_keyfile/ssl_certfile."""
import importlib
import ssl

importlib.reload(default_celery)

config = default_celery.DEFAULT_CELERY_CONFIG
assert "broker_use_ssl" in config
broker_ssl = config["broker_use_ssl"]
assert broker_ssl["ssl_ca_certs"] == "/path/to/ca.pem"
assert broker_ssl["ssl_cert_reqs"] == ssl.CERT_REQUIRED
assert "ssl_keyfile" not in broker_ssl
assert "ssl_certfile" not in broker_ssl

@conf_vars(
{
("celery", "BROKER_URL"): "amqps://guest:guest@rabbitmq:5671//",
("celery", "SSL_ACTIVE"): "True",
("celery", "SSL_MUTUAL_TLS"): "False",
("celery", "SSL_KEY"): "/path/to/key.pem",
("celery", "SSL_CERT"): "/path/to/cert.pem",
("celery", "SSL_CACERT"): "/path/to/ca.pem",
}
)
def test_one_way_tls_ignores_key_cert(self):
"""Test that SSL_KEY/SSL_CERT are ignored when SSL_MUTUAL_TLS is False."""
import importlib
import ssl

importlib.reload(default_celery)

config = default_celery.DEFAULT_CELERY_CONFIG
assert "broker_use_ssl" in config
broker_ssl = config["broker_use_ssl"]
assert broker_ssl["ca_certs"] == "/path/to/ca.pem"
assert broker_ssl["cert_reqs"] == ssl.CERT_REQUIRED
assert "keyfile" not in broker_ssl
assert "certfile" not in broker_ssl


class TestCreateCeleryAppTeamIsolation:
"""Tests for create_celery_app() multi-team config isolation."""
Expand Down
2 changes: 1 addition & 1 deletion scripts/ci/prek/known_airflow_exceptions.txt
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ providers/arangodb/src/airflow/providers/arangodb/hooks/arangodb.py::9
providers/arangodb/src/airflow/providers/arangodb/operators/arangodb.py::1
providers/atlassian/jira/src/airflow/providers/atlassian/jira/hooks/jira.py::1
providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py::2
providers/celery/src/airflow/providers/celery/executors/default_celery.py::5
providers/celery/src/airflow/providers/celery/executors/default_celery.py::2
providers/celery/tests/integration/celery/test_celery_executor.py::2
providers/cloudant/src/airflow/providers/cloudant/hooks/cloudant.py::2
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py::3
Expand Down
Loading