diff --git a/chart/templates/_helpers.yaml b/chart/templates/_helpers.yaml index 42bd5cfa4320b..ad4df84b44ab2 100644 --- a/chart/templates/_helpers.yaml +++ b/chart/templates/_helpers.yaml @@ -90,7 +90,10 @@ If release name contains chart name it will be used as a full name. {{- $kedaEnabled = or .Values.workers.celery.keda.enabled (and (not (has .Values.workers.celery.keda.enabled (list true false))) .Values.workers.keda.enabled) }} {{- $kedaUsePgBouncer = or .Values.workers.celery.keda.usePgbouncer (and (not (has .Values.workers.celery.keda.usePgbouncer (list true false))) .Values.workers.keda.usePgbouncer) }} {{- end }} - {{- if and $kedaEnabled (or (eq .Values.data.metadataConnection.protocol "mysql") (and .Values.pgbouncer.enabled (not $kedaUsePgBouncer))) }} + {{- $triggererKedaEnabled := and .Values.triggerer.enabled .Values.triggerer.keda.enabled }} + {{- $workersKedaNeedsDbConn := and $kedaEnabled (or (eq .Values.data.metadataConnection.protocol "mysql") (and .Values.pgbouncer.enabled (not $kedaUsePgBouncer))) }} + {{- $triggererKedaNeedsDbConn := and $triggererKedaEnabled (or (eq .Values.data.metadataConnection.protocol "mysql") (and .Values.pgbouncer.enabled (not .Values.triggerer.keda.usePgbouncer))) }} + {{- if or $workersKedaNeedsDbConn $triggererKedaNeedsDbConn }} - name: KEDA_DB_CONN valueFrom: secretKeyRef: diff --git a/chart/templates/secrets/metadata-connection-secret.yaml b/chart/templates/secrets/metadata-connection-secret.yaml index 71441755d4bae..93d94af9ead4d 100644 --- a/chart/templates/secrets/metadata-connection-secret.yaml +++ b/chart/templates/secrets/metadata-connection-secret.yaml @@ -37,6 +37,9 @@ {{- $kedaEnabled = or .Values.workers.celery.keda.enabled (and (not (has .Values.workers.celery.keda.enabled (list true false))) .Values.workers.keda.enabled) }} {{- $kedaUsePgBouncer = or .Values.workers.celery.keda.usePgbouncer (and (not (has .Values.workers.celery.keda.usePgbouncer (list true false))) .Values.workers.keda.usePgbouncer) }} {{- end }} +{{- $triggererKedaEnabled := and .Values.triggerer.enabled .Values.triggerer.keda.enabled }} +{{- $workersKedaNeedsPgbouncerBypass := and $kedaEnabled .Values.pgbouncer.enabled (not $kedaUsePgBouncer) }} +{{- $triggererKedaNeedsPgbouncerBypass := and $triggererKedaEnabled .Values.pgbouncer.enabled (not .Values.triggerer.keda.usePgbouncer) }} apiVersion: v1 kind: Secret metadata: @@ -58,11 +61,11 @@ data: {{- with .Values.data.metadataConnection }} connection: {{ urlJoin (dict "scheme" .protocol "userinfo" (printf "%s:%s" ($metadataUser | urlquery) (.pass | urlquery) ) "host" (printf "%s:%s" $host $port) "path" (printf "/%s" $database) "query" $query) | b64enc | quote }} {{- end }} - {{- if and $kedaEnabled .Values.pgbouncer.enabled (not $kedaUsePgBouncer) }} + {{- if or $workersKedaNeedsPgbouncerBypass $triggererKedaNeedsPgbouncerBypass }} {{- with .Values.data.metadataConnection }} kedaConnection: {{ urlJoin (dict "scheme" .protocol "userinfo" (printf "%s:%s" ($metadataUser | urlquery) (.pass | urlquery) ) "host" (printf "%s:%s" $metadataHost $metadataPort) "path" (printf "/%s" $metadataDatabase) "query" $query) | b64enc | quote }} {{- end }} - {{- else if and (or $kedaEnabled .Values.triggerer.keda.enabled) (eq .Values.data.metadataConnection.protocol "mysql") }} + {{- else if and (or $kedaEnabled $triggererKedaEnabled) (eq .Values.data.metadataConnection.protocol "mysql") }} {{- with .Values.data.metadataConnection }} kedaConnection: {{ urlJoin (dict "userinfo" (printf "%s:%s" ($metadataUser | urlquery) (.pass | urlquery) ) "host" (printf "tcp(%s:%s)" $metadataHost $metadataPort) "path" (printf "/%s" $metadataDatabase) "query" $query) | trimPrefix "//" | b64enc | quote }} {{- end }} diff --git a/helm-tests/tests/helm_tests/airflow_core/test_triggerer.py b/helm-tests/tests/helm_tests/airflow_core/test_triggerer.py index 55660f2b34687..6a916da4d7053 100644 --- a/helm-tests/tests/helm_tests/airflow_core/test_triggerer.py +++ b/helm-tests/tests/helm_tests/airflow_core/test_triggerer.py @@ -16,9 +16,11 @@ # under the License. from __future__ import annotations +import base64 + import jmespath import pytest -from chart_utils.helm_template_generator import render_chart +from chart_utils.helm_template_generator import prepare_k8s_lookup_dict, render_chart from chart_utils.log_groomer import LogGroomerTestBase @@ -796,6 +798,18 @@ class TestTriggererLogGroomer(LogGroomerTestBase): class TestTriggererKedaAutoScaler: """Tests triggerer keda autoscaler.""" + @staticmethod + def _render_triggerer_keda_db_connection(values): + docs = render_chart( + values=values, + show_only=[ + "templates/triggerer/triggerer-deployment.yaml", + "templates/triggerer/triggerer-kedaautoscaler.yaml", + "templates/secrets/metadata-connection-secret.yaml", + ], + ) + return prepare_k8s_lookup_dict(docs) + def test_should_add_component_specific_labels(self): docs = render_chart( values={ @@ -900,3 +914,87 @@ def test_mysql_db_backend_keda(self): assert jmespath.search("spec.triggers[0].metadata.connectionStringFromEnv", docs[0]) == "KEDA_DB_CONN" assert jmespath.search("spec.triggers[0].metadata.connectionFromEnv", docs[0]) is None + + def test_mysql_db_backend_adds_keda_db_conn_to_triggerer(self): + docs_by_key = self._render_triggerer_keda_db_connection( + values={ + "data": {"metadataConnection": {"protocol": "mysql", "port": 3306}}, + "triggerer": {"keda": {"enabled": True}}, + } + ) + + triggerer = docs_by_key[("StatefulSet", "release-name-triggerer")] + keda_autoscaler = docs_by_key[("ScaledObject", "release-name-triggerer")] + metadata_secret = docs_by_key[("Secret", "release-name-metadata")] + + triggerer_env_vars = jmespath.search( + "spec.template.spec.containers[?name=='triggerer'] | [0].env[].name", triggerer + ) + assert "KEDA_DB_CONN" in triggerer_env_vars + + secret_data = jmespath.search("data", metadata_secret) + assert "kedaConnection" in secret_data + keda_connection_secret = base64.b64decode(secret_data["kedaConnection"]).decode() + assert not keda_connection_secret.startswith("//") + + assert ( + jmespath.search("spec.triggers[0].metadata.connectionStringFromEnv", keda_autoscaler) + == "KEDA_DB_CONN" + ) + assert jmespath.search("spec.triggers[0].metadata.connectionFromEnv", keda_autoscaler) is None + + def test_pgbouncer_bypass_adds_keda_db_conn_to_triggerer(self): + docs_by_key = self._render_triggerer_keda_db_connection( + values={ + "pgbouncer": {"enabled": True}, + "triggerer": {"keda": {"enabled": True, "usePgbouncer": False}}, + } + ) + + triggerer = docs_by_key[("StatefulSet", "release-name-triggerer")] + keda_autoscaler = docs_by_key[("ScaledObject", "release-name-triggerer")] + metadata_secret = docs_by_key[("Secret", "release-name-metadata")] + + triggerer_env_vars = jmespath.search( + "spec.template.spec.containers[?name=='triggerer'] | [0].env[].name", triggerer + ) + assert "KEDA_DB_CONN" in triggerer_env_vars + + secret_data = jmespath.search("data", metadata_secret) + connection_secret = base64.b64decode(secret_data["connection"]).decode() + keda_connection_secret = base64.b64decode(secret_data["kedaConnection"]).decode() + assert "@release-name-pgbouncer" in connection_secret + assert ":6543" in connection_secret + assert "@release-name-postgresql" in keda_connection_secret + assert ":5432" in keda_connection_secret + assert "/postgres" in keda_connection_secret + + assert ( + jmespath.search("spec.triggers[0].metadata.connectionFromEnv", keda_autoscaler) == "KEDA_DB_CONN" + ) + assert jmespath.search("spec.triggers[0].metadata.connectionStringFromEnv", keda_autoscaler) is None + + def test_postgresql_default_triggerer_keda_uses_airflow_db_connection(self): + docs_by_key = self._render_triggerer_keda_db_connection( + values={"triggerer": {"keda": {"enabled": True}}} + ) + + triggerer = docs_by_key[("StatefulSet", "release-name-triggerer")] + keda_autoscaler = docs_by_key[("ScaledObject", "release-name-triggerer")] + metadata_secret = docs_by_key[("Secret", "release-name-metadata")] + + triggerer_env_vars = jmespath.search( + "spec.template.spec.containers[?name=='triggerer'] | [0].env[].name", triggerer + ) + assert "AIRFLOW_CONN_AIRFLOW_DB" in triggerer_env_vars + assert "KEDA_DB_CONN" not in triggerer_env_vars + + secret_data = jmespath.search("data", metadata_secret) + assert "connection" in secret_data + assert "kedaConnection" not in secret_data + + assert ( + jmespath.search("spec.triggers[0].metadata.connectionFromEnv", keda_autoscaler) + == "AIRFLOW_CONN_AIRFLOW_DB" + ) + assert jmespath.search("spec.triggers[0].metadata.connectionStringFromEnv", keda_autoscaler) is None