Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ The following parameters can be passed via ``backend_kwargs`` as a JSON dictiona
* ``connections_label``: Label key used to discover connection secrets. Default: ``"airflow.apache.org/connection-id"``
* ``variables_label``: Label key used to discover variable secrets. Default: ``"airflow.apache.org/variable-key"``
* ``config_label``: Label key used to discover config secrets. Default: ``"airflow.apache.org/config-key"``
* ``team_label``: Label key used to discover team-scoped secrets in multi-team mode. Default: ``"airflow.apache.org/team"``
* ``connections_data_key``: The data key in the Kubernetes secret that holds the connection value. Default: ``"value"``
* ``variables_data_key``: The data key in the Kubernetes secret that holds the variable value. Default: ``"value"``
* ``config_data_key``: The data key in the Kubernetes secret that holds the config value. Default: ``"value"``
Expand Down Expand Up @@ -207,6 +208,43 @@ You can create a variable secret with ``kubectl``:
airflow.apache.org/variable-key=my_var \
--namespace=airflow

Multi-team lookup
"""""""""""""""""

In multi-team mode, when ``team_name`` is provided, this backend first looks for a secret whose
identifier label matches the requested connection or variable and whose ``team_label`` matches the
current team. If no team-scoped secret is found, it falls back to a global secret with the same
identifier label and no team label.

When ``team_name`` is not provided, the backend only queries for global secrets by requiring that
the configured ``team_label`` is absent (``!team_label``). This means secrets that have a team label
are not eligible in the non-team case, even if their connection or variable identifier matches.
As a result, team-scoped identifiers cannot be accessed without a team context.

For example, with ``team_label="airflow.apache.org/team"``, ``team_name="team_a"``, and
``conn_id="my_db"``, the backend queries:

* Team-scoped: ``airflow.apache.org/connection-id=my_db,airflow.apache.org/team=team_a``
* Global fallback: ``airflow.apache.org/connection-id=my_db,!airflow.apache.org/team``

If ``team_name`` is unset for the same ``conn_id``, the backend queries only:

* Global only: ``airflow.apache.org/connection-id=my_db,!airflow.apache.org/team``

Example team-scoped connection secret:

.. code-block:: yaml

apiVersion: v1
kind: Secret
metadata:
name: my-team-db-secret
labels:
airflow.apache.org/connection-id: my_db
airflow.apache.org/team: team_a
data:
value: <base64-encoded-connection-uri>

Using with External Secrets Operator
"""""""""""""""""""""""""""""""""""""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from __future__ import annotations

import base64
import re
from functools import cached_property
from pathlib import Path

Expand Down Expand Up @@ -96,14 +97,16 @@ class KubernetesSecretsBackend(BaseSecretsBackend, LoggingMixin):
DEFAULT_CONNECTIONS_LABEL = "airflow.apache.org/connection-id"
DEFAULT_VARIABLES_LABEL = "airflow.apache.org/variable-key"
DEFAULT_CONFIG_LABEL = "airflow.apache.org/config-key"
DEFAULT_TEAM_LABEL = "airflow.apache.org/team"
SERVICE_ACCOUNT_NAMESPACE_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"

def __init__(
self,
namespace: str | None = None,
connections_label: str = DEFAULT_CONNECTIONS_LABEL,
variables_label: str = DEFAULT_VARIABLES_LABEL,
config_label: str = DEFAULT_CONFIG_LABEL,
connections_label: str | None = DEFAULT_CONNECTIONS_LABEL,
variables_label: str | None = DEFAULT_VARIABLES_LABEL,
config_label: str | None = DEFAULT_CONFIG_LABEL,
team_label: str | None = DEFAULT_TEAM_LABEL,
connections_data_key: str = "value",
variables_data_key: str = "value",
config_data_key: str = "value",
Comment thread
PrithviBadiga marked this conversation as resolved.
Expand All @@ -114,6 +117,7 @@ def __init__(
self.connections_label = connections_label
self.variables_label = variables_label
self.config_label = config_label
self.team_label = team_label
self.connections_data_key = connections_data_key
self.variables_data_key = variables_data_key
self.config_data_key = config_data_key
Expand Down Expand Up @@ -143,26 +147,28 @@ def get_conn_value(self, conn_id: str, team_name: str | None = None) -> str | No
"""
Get serialized representation of Connection from a Kubernetes secret.

Multi-team isolation is not currently supported; ``team_name`` is accepted
for API compatibility but ignored.

:param conn_id: connection id
:param team_name: Team name (unused — multi-team is not currently supported)
:param team_name: Team name associated to the task trying to access the connection (if any)
"""
return self._get_secret(self.connections_label, conn_id, self.connections_data_key)
if self._is_team_specific_accessed_as_global(conn_id, team_name):
return None

return self._get_secret(
self.connections_label, conn_id, self.connections_data_key, team_name=team_name
)

def get_variable(self, key: str, team_name: str | None = None) -> str | None:
"""
Get Airflow Variable from a Kubernetes secret.

Multi-team isolation is not currently supported; ``team_name`` is accepted
for API compatibility but ignored.

:param key: Variable Key
:param team_name: Team name (unused — multi-team is not currently supported)
:param team_name: Team name associated to the task trying to access the variable (if any)
:return: Variable Value
"""
return self._get_secret(self.variables_label, key, self.variables_data_key)
if self._is_team_specific_accessed_as_global(key, team_name):
return None

return self._get_secret(self.variables_label, key, self.variables_data_key, team_name=team_name)

def get_config(self, key: str) -> str | None:
"""
Expand All @@ -173,7 +179,13 @@ def get_config(self, key: str) -> str | None:
"""
return self._get_secret(self.config_label, key, self.config_data_key)

def _get_secret(self, label_key: str | None, label_value: str, data_key: str) -> str | None:
@staticmethod
def _is_team_specific_accessed_as_global(secret_id: str, team_name: str | None = None) -> bool:
return team_name is None and bool(re.fullmatch(r"_[^_]+___.+", secret_id))

def _get_secret(
self, label_key: str | None, label_value: str, data_key: str, team_name: str | None = None
) -> str | None:
"""
Get secret value from Kubernetes by label selector.

Expand All @@ -188,18 +200,43 @@ def _get_secret(self, label_key: str | None, label_value: str, data_key: str) ->
"""
if label_key is None:
return None
label_selector = f"{label_key}={label_value}"

if team_name and self.team_label:
team_secret = self._get_secret_by_selector(
label_key, label_value, data_key, f"{self.team_label}={team_name}", warn_if_missing=False
)
if team_secret is not None:
return team_secret

team_selector = f"!{self.team_label}" if self.team_label else None
return self._get_secret_by_selector(label_key, label_value, data_key, team_selector)

def _get_secret_by_selector(
self,
label_key: str,
label_value: str,
data_key: str,
extra_selector: str | None,
*,
warn_if_missing: bool = True,
) -> str | None:
"""Get secret value from Kubernetes by the given base and optional extra selectors."""
selectors = [f"{label_key}={label_value}"]
if extra_selector:
selectors.append(extra_selector)
label_selector = ",".join(selectors)
secret_list = self.client.list_namespaced_secret(
self.namespace,
label_selector=label_selector,
resource_version="0",
)
if not secret_list.items:
self.log.warning(
"No secret found with label %s in namespace %s.",
label_selector,
self.namespace,
)
if warn_if_missing:
self.log.warning(
"No secret found with label %s in namespace %s.",
label_selector,
self.namespace,
)
return None
if len(secret_list.items) > 1:
self.log.warning(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def test_get_conn_value_uri(self, mock_client, mock_namespace):
assert result == uri
mock_client.return_value.list_namespaced_secret.assert_called_once_with(
"default",
label_selector="airflow.apache.org/connection-id=my_db",
label_selector="airflow.apache.org/connection-id=my_db,!airflow.apache.org/team",
resource_version="0",
)

Expand Down Expand Up @@ -103,6 +103,60 @@ def test_get_conn_value_not_found(self, mock_client, mock_namespace):

assert result is None

@mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default")
@mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock)
def test_get_conn_value_uses_team_specific_secret_first(self, mock_client, mock_namespace):
mock_client.return_value.list_namespaced_secret.side_effect = [
_make_secret_list([_make_secret({"value": "team-conn"})]),
]

backend = KubernetesSecretsBackend()
result = backend.get_conn_value("my_db", team_name="team_a")

assert result == "team-conn"
mock_client.return_value.list_namespaced_secret.assert_called_once_with(
"default",
label_selector="airflow.apache.org/connection-id=my_db,airflow.apache.org/team=team_a",
resource_version="0",
)

Comment thread
PrithviBadiga marked this conversation as resolved.
@mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default")
@mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock)
def test_get_conn_value_falls_back_to_global_secret_when_team_secret_is_missing(
self, mock_client, mock_namespace
):
mock_client.return_value.list_namespaced_secret.side_effect = [
_make_secret_list([]),
_make_secret_list([_make_secret({"value": "global-conn"})]),
]

backend = KubernetesSecretsBackend()
result = backend.get_conn_value("my_db", team_name="team_a")

assert result == "global-conn"
assert mock_client.return_value.list_namespaced_secret.call_args_list == [
mock.call(
"default",
label_selector="airflow.apache.org/connection-id=my_db,airflow.apache.org/team=team_a",
resource_version="0",
),
mock.call(
"default",
label_selector="airflow.apache.org/connection-id=my_db,!airflow.apache.org/team",
resource_version="0",
),
]

@mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default")
@mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock)
def test_get_conn_value_returns_none_for_team_scoped_id_without_team_name(
self, mock_client, mock_namespace
):
backend = KubernetesSecretsBackend()

assert backend.get_conn_value("_teama___my_db") is None
mock_client.return_value.list_namespaced_secret.assert_not_called()


class TestKubernetesSecretsBackendVariables:
@mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default")
Expand All @@ -119,7 +173,7 @@ def test_get_variable(self, mock_client, mock_namespace):
assert result == "my-value"
mock_client.return_value.list_namespaced_secret.assert_called_once_with(
"default",
label_selector="airflow.apache.org/variable-key=api_key",
label_selector="airflow.apache.org/variable-key=api_key,!airflow.apache.org/team",
resource_version="0",
)

Expand All @@ -134,6 +188,43 @@ def test_get_variable_not_found(self, mock_client, mock_namespace):

assert result is None

@mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default")
@mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock)
def test_get_variable_falls_back_to_global_secret_when_team_secret_is_missing(
self, mock_client, mock_namespace
):
mock_client.return_value.list_namespaced_secret.side_effect = [
_make_secret_list([]),
_make_secret_list([_make_secret({"value": "global-value"})]),
Comment thread
PrithviBadiga marked this conversation as resolved.
]

backend = KubernetesSecretsBackend()
result = backend.get_variable("api_key", team_name="team_a")

assert result == "global-value"
assert mock_client.return_value.list_namespaced_secret.call_args_list == [
mock.call(
"default",
label_selector="airflow.apache.org/variable-key=api_key,airflow.apache.org/team=team_a",
resource_version="0",
),
mock.call(
"default",
label_selector="airflow.apache.org/variable-key=api_key,!airflow.apache.org/team",
resource_version="0",
),
]

@mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default")
@mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock)
def test_get_variable_returns_none_for_team_scoped_key_without_team_name(
self, mock_client, mock_namespace
):
backend = KubernetesSecretsBackend()

assert backend.get_variable("_teama___api_key") is None
mock_client.return_value.list_namespaced_secret.assert_not_called()


class TestKubernetesSecretsBackendConfig:
@mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default")
Expand All @@ -150,7 +241,7 @@ def test_get_config(self, mock_client, mock_namespace):
assert result == "sqlite:///airflow.db"
mock_client.return_value.list_namespaced_secret.assert_called_once_with(
"default",
label_selector="airflow.apache.org/config-key=sql_alchemy_conn",
label_selector="airflow.apache.org/config-key=sql_alchemy_conn,!airflow.apache.org/team",
resource_version="0",
)

Expand Down Expand Up @@ -181,7 +272,7 @@ def test_custom_label(self, mock_client, mock_namespace):
assert result == "postgresql://localhost/db"
mock_client.return_value.list_namespaced_secret.assert_called_once_with(
"default",
label_selector="my-org/conn=my_db",
label_selector="my-org/conn=my_db,!airflow.apache.org/team",
resource_version="0",
)

Expand Down Expand Up @@ -225,6 +316,23 @@ def test_secret_with_none_data_returns_none(self, mock_client, mock_namespace):

assert result is None

@mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default")
@mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock)
def test_team_specific_lookup_uses_custom_team_label(self, mock_client, mock_namespace):
mock_client.return_value.list_namespaced_secret.return_value = _make_secret_list(
[_make_secret({"value": "team-conn"})]
)

backend = KubernetesSecretsBackend(team_label="my-org.io/team")
result = backend.get_conn_value("my_db", team_name="team_a")

assert result == "team-conn"
mock_client.return_value.list_namespaced_secret.assert_called_once_with(
"default",
label_selector="airflow.apache.org/connection-id=my_db,my-org.io/team=team_a",
resource_version="0",
)


class TestKubernetesSecretsBackendLabelNone:
@mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock)
Expand Down Expand Up @@ -332,7 +440,7 @@ def test_namespace_used_in_api_calls(self, mock_client, mock_namespace):

mock_client.return_value.list_namespaced_secret.assert_called_once_with(
"airflow",
label_selector="airflow.apache.org/connection-id=my_db",
label_selector="airflow.apache.org/connection-id=my_db,!airflow.apache.org/team",
resource_version="0",
)

Expand Down
Loading