Skip to content

Commit

Permalink
Scheduler should not fail when invalid executor_config is passed (#14323
Browse files Browse the repository at this point in the history
)

closes #14182

(cherry picked from commit e0ee91e)
  • Loading branch information
kaxil authored and ashb committed Mar 19, 2021
1 parent 44a261a commit 6dd7559
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 1 deletion.
8 changes: 7 additions & 1 deletion airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,13 @@ def execute_async(
) -> None:
"""Executes task asynchronously"""
self.log.info('Add task %s with command %s with executor_config %s', key, command, executor_config)
kube_executor_config = PodGenerator.from_obj(executor_config)
try:
kube_executor_config = PodGenerator.from_obj(executor_config)
except Exception: # pylint: disable=broad-except
self.log.error("Invalid executor_config for %s", key)
self.fail(key=key, info="Invalid executor_config passed")
return

if executor_config:
pod_template_file = executor_config.get("pod_template_override", None)
else:
Expand Down
20 changes: 20 additions & 0 deletions tests/executors/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,26 @@ def test_gauge_executor_metrics(self, mock_stats_gauge, mock_trigger_tasks, mock
]
mock_stats_gauge.assert_has_calls(calls)

@mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
@mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
def test_invalid_executor_config(self, mock_get_kube_client, mock_kubernetes_job_watcher):
executor = self.kubernetes_executor
executor.start()

assert executor.event_buffer == {}
executor.execute_async(
key=('dag', 'task', datetime.utcnow(), 1),
queue=None,
command=['airflow', 'tasks', 'run', 'true', 'some_parameter'],
executor_config=k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[k8s.V1Container(name="base", image="myimage", image_pull_policy="Always")]
)
),
)

assert list(executor.event_buffer.values())[0][1] == "Invalid executor_config passed"

@mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
@mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
def test_change_state_running(self, mock_get_kube_client, mock_kubernetes_job_watcher):
Expand Down

0 comments on commit 6dd7559

Please sign in to comment.