Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion chart/templates/_helpers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ If release name contains chart name it will be used as a full name.
{{- $kedaEnabled = or .Values.workers.celery.keda.enabled (and (not (has .Values.workers.celery.keda.enabled (list true false))) .Values.workers.keda.enabled) }}
{{- $kedaUsePgBouncer = or .Values.workers.celery.keda.usePgbouncer (and (not (has .Values.workers.celery.keda.usePgbouncer (list true false))) .Values.workers.keda.usePgbouncer) }}
{{- end }}
{{- if and $kedaEnabled (or (eq .Values.data.metadataConnection.protocol "mysql") (and .Values.pgbouncer.enabled (not $kedaUsePgBouncer))) }}
{{- $triggererKedaEnabled := and .Values.triggerer.enabled .Values.triggerer.keda.enabled }}
{{- $workersKedaNeedsDbConn := and $kedaEnabled (or (eq .Values.data.metadataConnection.protocol "mysql") (and .Values.pgbouncer.enabled (not $kedaUsePgBouncer))) }}
{{- $triggererKedaNeedsDbConn := and $triggererKedaEnabled (or (eq .Values.data.metadataConnection.protocol "mysql") (and .Values.pgbouncer.enabled (not .Values.triggerer.keda.usePgbouncer))) }}
{{- if or $workersKedaNeedsDbConn $triggererKedaNeedsDbConn }}
- name: KEDA_DB_CONN
valueFrom:
secretKeyRef:
Expand Down
7 changes: 5 additions & 2 deletions chart/templates/secrets/metadata-connection-secret.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
{{- $kedaEnabled = or .Values.workers.celery.keda.enabled (and (not (has .Values.workers.celery.keda.enabled (list true false))) .Values.workers.keda.enabled) }}
{{- $kedaUsePgBouncer = or .Values.workers.celery.keda.usePgbouncer (and (not (has .Values.workers.celery.keda.usePgbouncer (list true false))) .Values.workers.keda.usePgbouncer) }}
{{- end }}
{{- $triggererKedaEnabled := and .Values.triggerer.enabled .Values.triggerer.keda.enabled }}
{{- $workersKedaNeedsPgbouncerBypass := and $kedaEnabled .Values.pgbouncer.enabled (not $kedaUsePgBouncer) }}
{{- $triggererKedaNeedsPgbouncerBypass := and $triggererKedaEnabled .Values.pgbouncer.enabled (not .Values.triggerer.keda.usePgbouncer) }}
apiVersion: v1
kind: Secret
metadata:
Expand All @@ -58,11 +61,11 @@ data:
{{- with .Values.data.metadataConnection }}
connection: {{ urlJoin (dict "scheme" .protocol "userinfo" (printf "%s:%s" ($metadataUser | urlquery) (.pass | urlquery) ) "host" (printf "%s:%s" $host $port) "path" (printf "/%s" $database) "query" $query) | b64enc | quote }}
{{- end }}
{{- if and $kedaEnabled .Values.pgbouncer.enabled (not $kedaUsePgBouncer) }}
{{- if or $workersKedaNeedsPgbouncerBypass $triggererKedaNeedsPgbouncerBypass }}
{{- with .Values.data.metadataConnection }}
kedaConnection: {{ urlJoin (dict "scheme" .protocol "userinfo" (printf "%s:%s" ($metadataUser | urlquery) (.pass | urlquery) ) "host" (printf "%s:%s" $metadataHost $metadataPort) "path" (printf "/%s" $metadataDatabase) "query" $query) | b64enc | quote }}
{{- end }}
{{- else if and (or $kedaEnabled .Values.triggerer.keda.enabled) (eq .Values.data.metadataConnection.protocol "mysql") }}
{{- else if and (or $kedaEnabled $triggererKedaEnabled) (eq .Values.data.metadataConnection.protocol "mysql") }}
{{- with .Values.data.metadataConnection }}
kedaConnection: {{ urlJoin (dict "userinfo" (printf "%s:%s" ($metadataUser | urlquery) (.pass | urlquery) ) "host" (printf "tcp(%s:%s)" $metadataHost $metadataPort) "path" (printf "/%s" $metadataDatabase) "query" $query) | trimPrefix "//" | b64enc | quote }}
{{- end }}
Expand Down
100 changes: 99 additions & 1 deletion helm-tests/tests/helm_tests/airflow_core/test_triggerer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
# under the License.
from __future__ import annotations

import base64

import jmespath
import pytest
from chart_utils.helm_template_generator import render_chart
from chart_utils.helm_template_generator import prepare_k8s_lookup_dict, render_chart
from chart_utils.log_groomer import LogGroomerTestBase


Expand Down Expand Up @@ -796,6 +798,18 @@ class TestTriggererLogGroomer(LogGroomerTestBase):
class TestTriggererKedaAutoScaler:
"""Tests triggerer keda autoscaler."""

@staticmethod
def _render_triggerer_keda_db_connection(values):
docs = render_chart(
values=values,
show_only=[
"templates/triggerer/triggerer-deployment.yaml",
"templates/triggerer/triggerer-kedaautoscaler.yaml",
"templates/secrets/metadata-connection-secret.yaml",
],
)
return prepare_k8s_lookup_dict(docs)

def test_should_add_component_specific_labels(self):
docs = render_chart(
values={
Expand Down Expand Up @@ -900,3 +914,87 @@ def test_mysql_db_backend_keda(self):

assert jmespath.search("spec.triggers[0].metadata.connectionStringFromEnv", docs[0]) == "KEDA_DB_CONN"
assert jmespath.search("spec.triggers[0].metadata.connectionFromEnv", docs[0]) is None

def test_mysql_db_backend_adds_keda_db_conn_to_triggerer(self):
docs_by_key = self._render_triggerer_keda_db_connection(
values={
"data": {"metadataConnection": {"protocol": "mysql", "port": 3306}},
"triggerer": {"keda": {"enabled": True}},
}
)

triggerer = docs_by_key[("StatefulSet", "release-name-triggerer")]
keda_autoscaler = docs_by_key[("ScaledObject", "release-name-triggerer")]
metadata_secret = docs_by_key[("Secret", "release-name-metadata")]

triggerer_env_vars = jmespath.search(
"spec.template.spec.containers[?name=='triggerer'] | [0].env[].name", triggerer
)
assert "KEDA_DB_CONN" in triggerer_env_vars

secret_data = jmespath.search("data", metadata_secret)
assert "kedaConnection" in secret_data
keda_connection_secret = base64.b64decode(secret_data["kedaConnection"]).decode()
assert not keda_connection_secret.startswith("//")

assert (
jmespath.search("spec.triggers[0].metadata.connectionStringFromEnv", keda_autoscaler)
== "KEDA_DB_CONN"
)
assert jmespath.search("spec.triggers[0].metadata.connectionFromEnv", keda_autoscaler) is None

def test_pgbouncer_bypass_adds_keda_db_conn_to_triggerer(self):
docs_by_key = self._render_triggerer_keda_db_connection(
values={
"pgbouncer": {"enabled": True},
"triggerer": {"keda": {"enabled": True, "usePgbouncer": False}},
}
)

triggerer = docs_by_key[("StatefulSet", "release-name-triggerer")]
keda_autoscaler = docs_by_key[("ScaledObject", "release-name-triggerer")]
metadata_secret = docs_by_key[("Secret", "release-name-metadata")]

triggerer_env_vars = jmespath.search(
"spec.template.spec.containers[?name=='triggerer'] | [0].env[].name", triggerer
)
assert "KEDA_DB_CONN" in triggerer_env_vars

secret_data = jmespath.search("data", metadata_secret)
connection_secret = base64.b64decode(secret_data["connection"]).decode()
keda_connection_secret = base64.b64decode(secret_data["kedaConnection"]).decode()
assert "@release-name-pgbouncer" in connection_secret
assert ":6543" in connection_secret
assert "@release-name-postgresql" in keda_connection_secret
assert ":5432" in keda_connection_secret
assert "/postgres" in keda_connection_secret

assert (
jmespath.search("spec.triggers[0].metadata.connectionFromEnv", keda_autoscaler) == "KEDA_DB_CONN"
)
assert jmespath.search("spec.triggers[0].metadata.connectionStringFromEnv", keda_autoscaler) is None

def test_postgresql_default_triggerer_keda_uses_airflow_db_connection(self):
docs_by_key = self._render_triggerer_keda_db_connection(
values={"triggerer": {"keda": {"enabled": True}}}
)

triggerer = docs_by_key[("StatefulSet", "release-name-triggerer")]
keda_autoscaler = docs_by_key[("ScaledObject", "release-name-triggerer")]
metadata_secret = docs_by_key[("Secret", "release-name-metadata")]

triggerer_env_vars = jmespath.search(
"spec.template.spec.containers[?name=='triggerer'] | [0].env[].name", triggerer
)
assert "AIRFLOW_CONN_AIRFLOW_DB" in triggerer_env_vars
assert "KEDA_DB_CONN" not in triggerer_env_vars

secret_data = jmespath.search("data", metadata_secret)
assert "connection" in secret_data
assert "kedaConnection" not in secret_data

assert (
jmespath.search("spec.triggers[0].metadata.connectionFromEnv", keda_autoscaler)
== "AIRFLOW_CONN_AIRFLOW_DB"
)
assert jmespath.search("spec.triggers[0].metadata.connectionStringFromEnv", keda_autoscaler) is None
Loading