From 95a613a19b3cd80063f486357251e221a13cb851 Mon Sep 17 00:00:00 2001 From: Prithvi Badiga Date: Wed, 22 Apr 2026 12:41:56 -0700 Subject: [PATCH 1/3] Add multi-team lookup to Kubernetes secrets backend --- .../kubernetes-secrets-backend.rst | 28 +++++++ .../secrets/kubernetes_secrets_backend.py | 71 ++++++++++++---- .../test_kubernetes_secrets_backend.py | 81 +++++++++++++++++-- 3 files changed, 158 insertions(+), 22 deletions(-) diff --git a/providers/cncf/kubernetes/docs/secrets-backends/kubernetes-secrets-backend.rst b/providers/cncf/kubernetes/docs/secrets-backends/kubernetes-secrets-backend.rst index 4f68de3128dc2..9ee9b123dcaf5 100644 --- a/providers/cncf/kubernetes/docs/secrets-backends/kubernetes-secrets-backend.rst +++ b/providers/cncf/kubernetes/docs/secrets-backends/kubernetes-secrets-backend.rst @@ -81,6 +81,7 @@ The following parameters can be passed via ``backend_kwargs`` as a JSON dictiona * ``connections_label``: Label key used to discover connection secrets. Default: ``"airflow.apache.org/connection-id"`` * ``variables_label``: Label key used to discover variable secrets. Default: ``"airflow.apache.org/variable-key"`` * ``config_label``: Label key used to discover config secrets. Default: ``"airflow.apache.org/config-key"`` +* ``team_label``: Label key used to discover team-scoped secrets in multi-team mode. Default: ``"airflow.apache.org/team"`` * ``connections_data_key``: The data key in the Kubernetes secret that holds the connection value. Default: ``"value"`` * ``variables_data_key``: The data key in the Kubernetes secret that holds the variable value. Default: ``"value"`` * ``config_data_key``: The data key in the Kubernetes secret that holds the config value. Default: ``"value"`` @@ -207,6 +208,33 @@ You can create a variable secret with ``kubectl``: airflow.apache.org/variable-key=my_var \ --namespace=airflow +Multi-team lookup +""""""""""""""""" + +In multi-team mode, this backend first looks for a secret whose identifier label matches the requested +connection or variable and whose ``team_label`` matches the current team. If no team-scoped secret is +found, it falls back to a global secret with the same identifier label and no team label. + +For example, with ``team_label="airflow.apache.org/team"``, ``team_name="team_a"``, and +``conn_id="my_db"``, the backend queries: + +* Team-scoped: ``airflow.apache.org/connection-id=my_db,airflow.apache.org/team=team_a`` +* Global fallback: ``airflow.apache.org/connection-id=my_db,!airflow.apache.org/team`` + +Example team-scoped connection secret: + +.. code-block:: yaml + + apiVersion: v1 + kind: Secret + metadata: + name: my-team-db-secret + labels: + airflow.apache.org/connection-id: my_db + airflow.apache.org/team: team_a + data: + value: + Using with External Secrets Operator """"""""""""""""""""""""""""""""""""" diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/secrets/kubernetes_secrets_backend.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/secrets/kubernetes_secrets_backend.py index ea67f7c3da6b2..68a58dda0ef29 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/secrets/kubernetes_secrets_backend.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/secrets/kubernetes_secrets_backend.py @@ -20,6 +20,7 @@ from __future__ import annotations import base64 +import re from functools import cached_property from pathlib import Path @@ -96,6 +97,7 @@ class KubernetesSecretsBackend(BaseSecretsBackend, LoggingMixin): DEFAULT_CONNECTIONS_LABEL = "airflow.apache.org/connection-id" DEFAULT_VARIABLES_LABEL = "airflow.apache.org/variable-key" DEFAULT_CONFIG_LABEL = "airflow.apache.org/config-key" + DEFAULT_TEAM_LABEL = "airflow.apache.org/team" SERVICE_ACCOUNT_NAMESPACE_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/namespace" def __init__( @@ -104,6 +106,7 @@ def __init__( connections_label: str = DEFAULT_CONNECTIONS_LABEL, variables_label: str = DEFAULT_VARIABLES_LABEL, config_label: str = DEFAULT_CONFIG_LABEL, + team_label: str | None = DEFAULT_TEAM_LABEL, connections_data_key: str = "value", variables_data_key: str = "value", config_data_key: str = "value", @@ -114,6 +117,7 @@ def __init__( self.connections_label = connections_label self.variables_label = variables_label self.config_label = config_label + self.team_label = team_label self.connections_data_key = connections_data_key self.variables_data_key = variables_data_key self.config_data_key = config_data_key @@ -143,26 +147,28 @@ def get_conn_value(self, conn_id: str, team_name: str | None = None) -> str | No """ Get serialized representation of Connection from a Kubernetes secret. - Multi-team isolation is not currently supported; ``team_name`` is accepted - for API compatibility but ignored. - :param conn_id: connection id - :param team_name: Team name (unused — multi-team is not currently supported) + :param team_name: Team name associated to the task trying to access the connection (if any) """ - return self._get_secret(self.connections_label, conn_id, self.connections_data_key) + if self._is_team_specific_accessed_as_global(conn_id, team_name): + return None + + return self._get_secret( + self.connections_label, conn_id, self.connections_data_key, team_name=team_name + ) def get_variable(self, key: str, team_name: str | None = None) -> str | None: """ Get Airflow Variable from a Kubernetes secret. - Multi-team isolation is not currently supported; ``team_name`` is accepted - for API compatibility but ignored. - :param key: Variable Key - :param team_name: Team name (unused — multi-team is not currently supported) + :param team_name: Team name associated to the task trying to access the variable (if any) :return: Variable Value """ - return self._get_secret(self.variables_label, key, self.variables_data_key) + if self._is_team_specific_accessed_as_global(key, team_name): + return None + + return self._get_secret(self.variables_label, key, self.variables_data_key, team_name=team_name) def get_config(self, key: str) -> str | None: """ @@ -173,7 +179,13 @@ def get_config(self, key: str) -> str | None: """ return self._get_secret(self.config_label, key, self.config_data_key) - def _get_secret(self, label_key: str | None, label_value: str, data_key: str) -> str | None: + @staticmethod + def _is_team_specific_accessed_as_global(secret_id: str, team_name: str | None = None) -> bool: + return team_name is None and bool(re.fullmatch(r"_[^_]+___.+", secret_id)) + + def _get_secret( + self, label_key: str | None, label_value: str, data_key: str, team_name: str | None = None + ) -> str | None: """ Get secret value from Kubernetes by label selector. @@ -188,18 +200,43 @@ def _get_secret(self, label_key: str | None, label_value: str, data_key: str) -> """ if label_key is None: return None - label_selector = f"{label_key}={label_value}" + + if team_name and self.team_label: + team_secret = self._get_secret_by_selector( + label_key, label_value, data_key, f"{self.team_label}={team_name}", warn_if_missing=False + ) + if team_secret is not None: + return team_secret + + team_selector = f"!{self.team_label}" if self.team_label else None + return self._get_secret_by_selector(label_key, label_value, data_key, team_selector) + + def _get_secret_by_selector( + self, + label_key: str, + label_value: str, + data_key: str, + extra_selector: str | None, + *, + warn_if_missing: bool = True, + ) -> str | None: + """Get secret value from Kubernetes by the given base and optional extra selectors.""" + selectors = [f"{label_key}={label_value}"] + if extra_selector: + selectors.append(extra_selector) + label_selector = ",".join(selectors) secret_list = self.client.list_namespaced_secret( self.namespace, label_selector=label_selector, resource_version="0", ) if not secret_list.items: - self.log.warning( - "No secret found with label %s in namespace %s.", - label_selector, - self.namespace, - ) + if warn_if_missing: + self.log.warning( + "No secret found with label %s in namespace %s.", + label_selector, + self.namespace, + ) return None if len(secret_list.items) > 1: self.log.warning( diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/secrets/test_kubernetes_secrets_backend.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/secrets/test_kubernetes_secrets_backend.py index 6dced3d340bcc..68b53903534c3 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/secrets/test_kubernetes_secrets_backend.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/secrets/test_kubernetes_secrets_backend.py @@ -63,7 +63,7 @@ def test_get_conn_value_uri(self, mock_client, mock_namespace): assert result == uri mock_client.return_value.list_namespaced_secret.assert_called_once_with( "default", - label_selector="airflow.apache.org/connection-id=my_db", + label_selector="airflow.apache.org/connection-id=my_db,!airflow.apache.org/team", resource_version="0", ) @@ -103,6 +103,33 @@ def test_get_conn_value_not_found(self, mock_client, mock_namespace): assert result is None + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_get_conn_value_uses_team_specific_secret_first(self, mock_client, mock_namespace): + mock_client.return_value.list_namespaced_secret.side_effect = [ + _make_secret_list([_make_secret({"value": "team-conn"})]), + ] + + backend = KubernetesSecretsBackend() + result = backend.get_conn_value("my_db", team_name="team_a") + + assert result == "team-conn" + mock_client.return_value.list_namespaced_secret.assert_called_once_with( + "default", + label_selector="airflow.apache.org/connection-id=my_db,airflow.apache.org/team=team_a", + resource_version="0", + ) + + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_get_conn_value_returns_none_for_team_scoped_id_without_team_name( + self, mock_client, mock_namespace + ): + backend = KubernetesSecretsBackend() + + assert backend.get_conn_value("_teama___my_db") is None + mock_client.return_value.list_namespaced_secret.assert_not_called() + class TestKubernetesSecretsBackendVariables: @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") @@ -119,7 +146,7 @@ def test_get_variable(self, mock_client, mock_namespace): assert result == "my-value" mock_client.return_value.list_namespaced_secret.assert_called_once_with( "default", - label_selector="airflow.apache.org/variable-key=api_key", + label_selector="airflow.apache.org/variable-key=api_key,!airflow.apache.org/team", resource_version="0", ) @@ -134,6 +161,33 @@ def test_get_variable_not_found(self, mock_client, mock_namespace): assert result is None + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_get_variable_falls_back_to_global_secret_when_team_secret_is_missing( + self, mock_client, mock_namespace + ): + mock_client.return_value.list_namespaced_secret.side_effect = [ + _make_secret_list([]), + _make_secret_list([_make_secret({"value": "global-value"})]), + ] + + backend = KubernetesSecretsBackend() + result = backend.get_variable("api_key", team_name="team_a") + + assert result == "global-value" + assert mock_client.return_value.list_namespaced_secret.call_args_list == [ + mock.call( + "default", + label_selector="airflow.apache.org/variable-key=api_key,airflow.apache.org/team=team_a", + resource_version="0", + ), + mock.call( + "default", + label_selector="airflow.apache.org/variable-key=api_key,!airflow.apache.org/team", + resource_version="0", + ), + ] + class TestKubernetesSecretsBackendConfig: @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") @@ -150,7 +204,7 @@ def test_get_config(self, mock_client, mock_namespace): assert result == "sqlite:///airflow.db" mock_client.return_value.list_namespaced_secret.assert_called_once_with( "default", - label_selector="airflow.apache.org/config-key=sql_alchemy_conn", + label_selector="airflow.apache.org/config-key=sql_alchemy_conn,!airflow.apache.org/team", resource_version="0", ) @@ -181,7 +235,7 @@ def test_custom_label(self, mock_client, mock_namespace): assert result == "postgresql://localhost/db" mock_client.return_value.list_namespaced_secret.assert_called_once_with( "default", - label_selector="my-org/conn=my_db", + label_selector="my-org/conn=my_db,!airflow.apache.org/team", resource_version="0", ) @@ -225,6 +279,23 @@ def test_secret_with_none_data_returns_none(self, mock_client, mock_namespace): assert result is None + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_team_specific_lookup_uses_custom_team_label(self, mock_client, mock_namespace): + mock_client.return_value.list_namespaced_secret.return_value = _make_secret_list( + [_make_secret({"value": "team-conn"})] + ) + + backend = KubernetesSecretsBackend(team_label="my-org.io/team") + result = backend.get_conn_value("my_db", team_name="team_a") + + assert result == "team-conn" + mock_client.return_value.list_namespaced_secret.assert_called_once_with( + "default", + label_selector="airflow.apache.org/connection-id=my_db,my-org.io/team=team_a", + resource_version="0", + ) + class TestKubernetesSecretsBackendLabelNone: @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) @@ -332,7 +403,7 @@ def test_namespace_used_in_api_calls(self, mock_client, mock_namespace): mock_client.return_value.list_namespaced_secret.assert_called_once_with( "airflow", - label_selector="airflow.apache.org/connection-id=my_db", + label_selector="airflow.apache.org/connection-id=my_db,!airflow.apache.org/team", resource_version="0", ) From c086a9bb5a96ce073628257037f0418972289b21 Mon Sep 17 00:00:00 2001 From: Prithvi Badiga Date: Wed, 22 Apr 2026 16:31:08 -0700 Subject: [PATCH 2/3] Refine Kubernetes multi-team lookup docs and tests --- .../kubernetes-secrets-backend.rst | 16 ++++++++--- .../secrets/kubernetes_secrets_backend.py | 6 ++--- .../test_kubernetes_secrets_backend.py | 27 +++++++++++++++++++ 3 files changed, 43 insertions(+), 6 deletions(-) diff --git a/providers/cncf/kubernetes/docs/secrets-backends/kubernetes-secrets-backend.rst b/providers/cncf/kubernetes/docs/secrets-backends/kubernetes-secrets-backend.rst index 9ee9b123dcaf5..7b92fa044f76d 100644 --- a/providers/cncf/kubernetes/docs/secrets-backends/kubernetes-secrets-backend.rst +++ b/providers/cncf/kubernetes/docs/secrets-backends/kubernetes-secrets-backend.rst @@ -211,9 +211,15 @@ You can create a variable secret with ``kubectl``: Multi-team lookup """"""""""""""""" -In multi-team mode, this backend first looks for a secret whose identifier label matches the requested -connection or variable and whose ``team_label`` matches the current team. If no team-scoped secret is -found, it falls back to a global secret with the same identifier label and no team label. +In multi-team mode, when ``team_name`` is provided, this backend first looks for a secret whose +identifier label matches the requested connection or variable and whose ``team_label`` matches the +current team. If no team-scoped secret is found, it falls back to a global secret with the same +identifier label and no team label. + +When ``team_name`` is not provided, the backend only queries for global secrets by requiring that +the configured ``team_label`` is absent (``!team_label``). This means secrets that have a team label +are not eligible in the non-team case, even if their connection or variable identifier matches. +As a result, team-scoped identifiers cannot be accessed without a team context. For example, with ``team_label="airflow.apache.org/team"``, ``team_name="team_a"``, and ``conn_id="my_db"``, the backend queries: @@ -221,6 +227,10 @@ For example, with ``team_label="airflow.apache.org/team"``, ``team_name="team_a" * Team-scoped: ``airflow.apache.org/connection-id=my_db,airflow.apache.org/team=team_a`` * Global fallback: ``airflow.apache.org/connection-id=my_db,!airflow.apache.org/team`` +If ``team_name`` is unset for the same ``conn_id``, the backend queries only: + +* Global only: ``airflow.apache.org/connection-id=my_db,!airflow.apache.org/team`` + Example team-scoped connection secret: .. code-block:: yaml diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/secrets/kubernetes_secrets_backend.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/secrets/kubernetes_secrets_backend.py index 68a58dda0ef29..447a4c48a9e9b 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/secrets/kubernetes_secrets_backend.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/secrets/kubernetes_secrets_backend.py @@ -103,9 +103,9 @@ class KubernetesSecretsBackend(BaseSecretsBackend, LoggingMixin): def __init__( self, namespace: str | None = None, - connections_label: str = DEFAULT_CONNECTIONS_LABEL, - variables_label: str = DEFAULT_VARIABLES_LABEL, - config_label: str = DEFAULT_CONFIG_LABEL, + connections_label: str | None = DEFAULT_CONNECTIONS_LABEL, + variables_label: str | None = DEFAULT_VARIABLES_LABEL, + config_label: str | None = DEFAULT_CONFIG_LABEL, team_label: str | None = DEFAULT_TEAM_LABEL, connections_data_key: str = "value", variables_data_key: str = "value", diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/secrets/test_kubernetes_secrets_backend.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/secrets/test_kubernetes_secrets_backend.py index 68b53903534c3..d4c55038bd575 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/secrets/test_kubernetes_secrets_backend.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/secrets/test_kubernetes_secrets_backend.py @@ -120,6 +120,33 @@ def test_get_conn_value_uses_team_specific_secret_first(self, mock_client, mock_ resource_version="0", ) + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_get_conn_value_falls_back_to_global_secret_when_team_secret_is_missing( + self, mock_client, mock_namespace + ): + mock_client.return_value.list_namespaced_secret.side_effect = [ + _make_secret_list([]), + _make_secret_list([_make_secret({"value": "global-conn"})]), + ] + + backend = KubernetesSecretsBackend() + result = backend.get_conn_value("my_db", team_name="team_a") + + assert result == "global-conn" + assert mock_client.return_value.list_namespaced_secret.call_args_list == [ + mock.call( + "default", + label_selector="airflow.apache.org/connection-id=my_db,airflow.apache.org/team=team_a", + resource_version="0", + ), + mock.call( + "default", + label_selector="airflow.apache.org/connection-id=my_db,!airflow.apache.org/team", + resource_version="0", + ), + ] + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) def test_get_conn_value_returns_none_for_team_scoped_id_without_team_name( From 8c19cdf17bd199f499a3cad848ef529f48170820 Mon Sep 17 00:00:00 2001 From: Prithvi Badiga Date: Wed, 22 Apr 2026 16:37:13 -0700 Subject: [PATCH 3/3] Add Kubernetes variable guard test --- .../secrets/test_kubernetes_secrets_backend.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/secrets/test_kubernetes_secrets_backend.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/secrets/test_kubernetes_secrets_backend.py index d4c55038bd575..e7538631b240b 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/secrets/test_kubernetes_secrets_backend.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/secrets/test_kubernetes_secrets_backend.py @@ -215,6 +215,16 @@ def test_get_variable_falls_back_to_global_secret_when_team_secret_is_missing( ), ] + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_get_variable_returns_none_for_team_scoped_key_without_team_name( + self, mock_client, mock_namespace + ): + backend = KubernetesSecretsBackend() + + assert backend.get_variable("_teama___api_key") is None + mock_client.return_value.list_namespaced_secret.assert_not_called() + class TestKubernetesSecretsBackendConfig: @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default")