Skip to content

Commit

Permalink
Fix Kube tests (#12479)
Browse files Browse the repository at this point in the history
This is the same fix as in #12461, but we didn't notice it as the tests
failed after 50 failures.

It also turns out that the k8s API doesn't take a V1NodeSelector and instead
just takes a dict.

Co-authored-by: Daniel Imberman <daniel.imberman@gmail.com>
  • Loading branch information
ashb and dimberman committed Nov 19, 2020
1 parent 93d64e5 commit 9e089ab
Show file tree
Hide file tree
Showing 5 changed files with 8 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,6 @@ def convert_affinity(affinity) -> k8s.V1Affinity:
return _convert_from_dict(affinity, k8s.V1Affinity)


def convert_node_selector(node_selector) -> k8s.V1NodeSelector:
"""Converts a dict into a k8s.V1NodeSelector"""
return _convert_from_dict(node_selector, k8s.V1NodeSelector)


def convert_toleration(toleration) -> k8s.V1Toleration:
"""Converts a dict into an k8s.V1Toleration"""
return _convert_from_dict(toleration, k8s.V1Toleration)
9 changes: 4 additions & 5 deletions airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
convert_configmap,
convert_env_vars,
convert_image_pull_secrets,
convert_node_selector,
convert_pod_runtime_info_env,
convert_port,
convert_resources,
Expand Down Expand Up @@ -194,8 +193,8 @@ def __init__( # pylint: disable=too-many-arguments,too-many-locals
resources: Optional[k8s.V1ResourceRequirements] = None,
affinity: Optional[k8s.V1Affinity] = None,
config_file: Optional[str] = None,
node_selectors: Optional[k8s.V1NodeSelector] = None,
node_selector: Optional[k8s.V1NodeSelector] = None,
node_selectors: Optional[dict] = None,
node_selector: Optional[dict] = None,
image_pull_secrets: Optional[List[k8s.V1LocalObjectReference]] = None,
service_account_name: str = 'default',
is_delete_operator_pod: bool = False,
Expand Down Expand Up @@ -244,9 +243,9 @@ def __init__( # pylint: disable=too-many-arguments,too-many-locals
if node_selectors:
# Node selectors is incorrect based on k8s API
warnings.warn("node_selectors is deprecated. Please use node_selector instead.")
self.node_selector = convert_node_selector(node_selectors)
self.node_selector = node_selectors or {}
elif node_selector:
self.node_selector = convert_node_selector(node_selector)
self.node_selector = node_selector or {}
else:
self.node_selector = None
self.annotations = annotations or {}
Expand Down
1 change: 0 additions & 1 deletion kubernetes_tests/test_kubernetes_pod_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -873,7 +873,6 @@ def test_pod_template_file(
'hostNetwork': False,
'imagePullSecrets': [],
'initContainers': [],
'nodeSelector': {},
'restartPolicy': 'Never',
'securityContext': {},
'serviceAccountName': 'default',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ def setUp(self):
'hostNetwork': False,
'imagePullSecrets': [],
'initContainers': [],
'nodeSelector': {},
'restartPolicy': 'Never',
'securityContext': {},
'serviceAccountName': 'default',
Expand Down
60 changes: 4 additions & 56 deletions tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,40 +339,7 @@ def test_tolerations(self):
self.assertEqual(client.sanitize_for_serialization(result)['spec']['tolerations'], tolerations)

def test_node_selector(self):
k8s_api_node_selector = k8s.V1NodeSelector(
node_selector_terms=[
k8s.V1NodeSelectorTerm(
match_expressions=[
k8s.V1NodeSelectorRequirement(key="disktype", operator="In", values=["ssd"])
]
)
]
)

node_selector = {
'nodeSelectorTerms': [
{'matchExpressions': [{'key': 'disktype', 'operator': 'In', 'values': ['ssd']}]}
]
}

k = KubernetesPodOperator(
namespace='default',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
labels={"foo": "bar"},
name="name",
task_id="task",
in_cluster=False,
do_xcom_push=False,
cluster_context='default',
node_selector=k8s_api_node_selector,
)

result = k.create_pod_request_obj()
client = ApiClient()
self.assertEqual(type(result.spec.node_selector), k8s.V1NodeSelector)
self.assertEqual(client.sanitize_for_serialization(result)['spec']['nodeSelector'], node_selector)
node_selector = {'beta.kubernetes.io/os': 'linux'}

k = KubernetesPodOperator(
namespace='default',
Expand All @@ -385,12 +352,12 @@ def test_node_selector(self):
in_cluster=False,
do_xcom_push=False,
cluster_context='default',
node_selector=k8s_api_node_selector,
node_selector=node_selector,
)

result = k.create_pod_request_obj()
client = ApiClient()
self.assertEqual(type(result.spec.node_selector), k8s.V1NodeSelector)
self.assertEqual(type(result.spec.node_selector), dict)
self.assertEqual(client.sanitize_for_serialization(result)['spec']['nodeSelector'], node_selector)

# repeat tests using deprecated parameter
Expand All @@ -410,24 +377,5 @@ def test_node_selector(self):

result = k.create_pod_request_obj()
client = ApiClient()
self.assertEqual(type(result.spec.node_selector), k8s.V1NodeSelector)
self.assertEqual(client.sanitize_for_serialization(result)['spec']['nodeSelector'], node_selector)

k = KubernetesPodOperator(
namespace='default',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
labels={"foo": "bar"},
name="name",
task_id="task",
in_cluster=False,
do_xcom_push=False,
cluster_context='default',
node_selectors=node_selector,
)

result = k.create_pod_request_obj()
client = ApiClient()
self.assertEqual(type(result.spec.node_selector), k8s.V1NodeSelector)
self.assertEqual(type(result.spec.node_selector), dict)
self.assertEqual(client.sanitize_for_serialization(result)['spec']['nodeSelector'], node_selector)

0 comments on commit 9e089ab

Please sign in to comment.