Skip to content

Commit

Permalink
Refactor condition to skip dag mounts or gitsync on scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
gmsantos committed Jul 5, 2022
1 parent dd2f91e commit c8c5265
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 12 deletions.
12 changes: 7 additions & 5 deletions chart/templates/scheduler/scheduler-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
{{- $persistence := .Values.workers.persistence.enabled }}
# If we're using a StatefulSet
{{- $stateful := and $local $persistence }}
# We can skip DAGs mounts on scheduler if dagProcessor is enabled, except with $local mode
{{- $localOrDagProcessorDisabled := or (not .Values.dagProcessor.enabled) $local }}
# If we're using elasticsearch logging
{{- $elasticsearch := .Values.elasticsearch.enabled }}
{{- $nodeSelector := or .Values.scheduler.nodeSelector .Values.nodeSelector }}
Expand Down Expand Up @@ -141,7 +143,7 @@ spec:
{{- include "custom_airflow_environment" . | indent 10 }}
{{- include "standard_airflow_environment" . | indent 10 }}
{{- end }}
{{- if and (not .Values.dagProcessor.enabled) .Values.dags.gitSync.enabled }}
{{- if and $localOrDagProcessorDisabled .Values.dags.gitSync.enabled }}
{{- include "git_sync_container" (dict "Values" .Values "is_init" "true") | nindent 8 }}
{{- end }}
{{- if .Values.scheduler.extraInitContainers }}
Expand Down Expand Up @@ -202,13 +204,13 @@ spec:
subPath: airflow_local_settings.py
readOnly: true
{{- end }}
{{- if and (or (not .Values.dagProcessor.enabled) $local) (or .Values.dags.persistence.enabled .Values.dags.gitSync.enabled) }}
{{- if and $localOrDagProcessorDisabled (or .Values.dags.persistence.enabled .Values.dags.gitSync.enabled) }}
{{- include "airflow_dags_mount" . | nindent 12 }}
{{- end }}
{{- if .Values.scheduler.extraVolumeMounts }}
{{ toYaml .Values.scheduler.extraVolumeMounts | indent 12 }}
{{- end }}
{{- if and (not .Values.dagProcessor.enabled) .Values.dags.gitSync.enabled }}
{{- if and $localOrDagProcessorDisabled .Values.dags.gitSync.enabled }}
{{- include "git_sync_container" . | indent 8 }}
{{- end }}
{{- if .Values.scheduler.logGroomerSidecar.enabled }}
Expand Down Expand Up @@ -239,7 +241,7 @@ spec:
- name: config
configMap:
name: {{ template "airflow_config" . }}
{{- if not .Values.dagProcessor.enabled }}
{{- if $localOrDagProcessorDisabled }}
{{- if .Values.dags.persistence.enabled }}
- name: dags
persistentVolumeClaim:
Expand All @@ -251,7 +253,7 @@ spec:
{{- if and .Values.dags.gitSync.enabled .Values.dags.gitSync.sshKeySecret }}
{{- include "git_sync_ssh_key_volume" . | indent 8 }}
{{- end }}
{{- end }}
{{- end }}
{{- if .Values.scheduler.extraVolumes }}
{{ toYaml .Values.scheduler.extraVolumes | indent 8 }}
{{- end }}
Expand Down
46 changes: 39 additions & 7 deletions tests/charts/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,21 +534,53 @@ def test_dags_gitsync_sidecar_and_init_container(self, dags_values):
c["name"] for c in jmespath.search("spec.template.spec.initContainers", docs[0])
]

def test_no_dags_mount_or_volume_or_gitsync_sidecar_expected(self):
@parameterized.expand(
[
(True, "LocalExecutor", False),
(True, "CeleryExecutor", True),
(True, "KubernetesExecutor", True),
(True, "LocalKubernetesExecutor", False),
(False, "LocalExecutor", False),
(False, "CeleryExecutor", False),
(False, "KubernetesExecutor", False),
(False, "LocalKubernetesExecutor", False),
]
)
def test_dags_mount_and_gitsync_expected_with_dag_processor(
self, dag_processor, executor, skip_dags_mount
):
"""
DAG Processor can move gitsync and DAGs mount from the scheduler to the DAG Processor only.
The only exception is when we have a Local executor.
In these cases, the scheduler does the worker role and needs access to DAGs anyway.
"""
docs = render_chart(
values={
"dagProcessor": {"enabled": True},
"dagProcessor": {"enabled": dag_processor},
"executor": executor,
"dags": {"gitSync": {"enabled": True}, "persistence": {"enabled": True}},
"scheduler": {"logGroomerSidecar": {"enabled": False}},
},
show_only=["templates/scheduler/scheduler-deployment.yaml"],
)

assert "dags" not in [
vm["name"] for vm in jmespath.search("spec.template.spec.containers[0].volumeMounts", docs[0])
]
assert "dags" not in [vm["name"] for vm in jmespath.search("spec.template.spec.volumes", docs[0])]
assert 1 == len(jmespath.search("spec.template.spec.containers", docs[0]))
if skip_dags_mount:
assert "dags" not in [
vm["name"] for vm in jmespath.search("spec.template.spec.containers[0].volumeMounts", docs[0])
]
assert "dags" not in [vm["name"] for vm in jmespath.search("spec.template.spec.volumes", docs[0])]
assert 1 == len(jmespath.search("spec.template.spec.containers", docs[0]))
else:
assert "dags" in [
vm["name"] for vm in jmespath.search("spec.template.spec.containers[0].volumeMounts", docs[0])
]
assert "dags" in [vm["name"] for vm in jmespath.search("spec.template.spec.volumes", docs[0])]
assert "git-sync" in [
c["name"] for c in jmespath.search("spec.template.spec.containers", docs[0])
]
assert "git-sync-init" in [
c["name"] for c in jmespath.search("spec.template.spec.initContainers", docs[0])
]

def test_log_groomer_resources(self):
docs = render_chart(
Expand Down

0 comments on commit c8c5265

Please sign in to comment.