Skip to content

Commit

Permalink
KubernetesExecutor should default to template image if used (#19484)
Browse files Browse the repository at this point in the history
Currently, the user must specify image and tag in airflow.cfg, even when they are using a pod template file.  If the pod template file specifies an image and tag, the user should not be forced to also specify this in airflow.cfg.
  • Loading branch information
dstandish committed Nov 9, 2021
1 parent 2590013 commit d18e2b0
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 10 deletions.
5 changes: 4 additions & 1 deletion airflow/kubernetes/kube_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ def __init__(self):

self.worker_container_repository = conf.get(self.kubernetes_section, 'worker_container_repository')
self.worker_container_tag = conf.get(self.kubernetes_section, 'worker_container_tag')
self.kube_image = f'{self.worker_container_repository}:{self.worker_container_tag}'
if self.worker_container_repository and self.worker_container_tag:
self.kube_image = f'{self.worker_container_repository}:{self.worker_container_tag}'
else:
self.kube_image = None

# The Kubernetes Namespace in which the Scheduler and Webserver reside. Note
# that if your
Expand Down
22 changes: 14 additions & 8 deletions tests/kubernetes/test_pod_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import os
import re
import sys
import unittest
import uuid
from unittest import mock

Expand All @@ -38,8 +37,8 @@
from airflow.kubernetes.secret import Secret


class TestPodGenerator(unittest.TestCase):
def setUp(self):
class TestPodGenerator:
def setup_method(self):
self.static_uuid = uuid.UUID('cf4a56d2-8101-4217-b027-2af6216feb48')
self.deserialize_result = {
'apiVersion': 'v1',
Expand Down Expand Up @@ -395,10 +394,17 @@ def test_reconcile_pods(self, mock_uuid):

assert result_dict == expected_dict

@pytest.mark.parametrize(
'config_image, expected_image',
[
pytest.param('my_image:my_tag', 'my_image:my_tag', id='image_in_cfg'),
pytest.param(None, 'busybox', id='no_image_in_cfg'),
],
)
@mock.patch('uuid.uuid4')
def test_construct_pod(self, mock_uuid):
path = sys.path[0] + '/tests/kubernetes/pod_generator_base_with_secrets.yaml'
worker_config = PodGenerator.deserialize_model_file(path)
def test_construct_pod(self, mock_uuid, config_image, expected_image):
template_file = sys.path[0] + '/tests/kubernetes/pod_generator_base_with_secrets.yaml'
worker_config = PodGenerator.deserialize_model_file(template_file)
mock_uuid.return_value = self.static_uuid
executor_config = k8s.V1Pod(
spec=k8s.V1PodSpec(
Expand All @@ -414,7 +420,7 @@ def test_construct_pod(self, mock_uuid):
dag_id=self.dag_id,
task_id=self.task_id,
pod_id='pod_id',
kube_image='airflow_image',
kube_image=config_image,
try_number=self.try_number,
date=self.execution_date,
args=['command'],
Expand All @@ -430,7 +436,7 @@ def test_construct_pod(self, mock_uuid):
expected.metadata.name = 'pod_id.' + self.static_uuid.hex
expected.metadata.namespace = 'test_namespace'
expected.spec.containers[0].args = ['command']
expected.spec.containers[0].image = 'airflow_image'
expected.spec.containers[0].image = expected_image
expected.spec.containers[0].resources = {'limits': {'cpu': '1m', 'memory': '1G'}}
expected.spec.containers[0].env.append(
k8s.V1EnvVar(
Expand Down
1 change: 0 additions & 1 deletion tests/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1946,7 +1946,6 @@ def test_render_k8s_pod_yaml(self, pod_mutation_hook, create_task_instance):
'--subdir',
__file__,
],
'image': ':',
'name': 'base',
'env': [{'name': 'AIRFLOW_IS_K8S_EXECUTOR_POD', 'value': 'True'}],
}
Expand Down

0 comments on commit d18e2b0

Please sign in to comment.