Skip to content

Commit

Permalink
Add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
pankajastro committed May 4, 2024
1 parent e594417 commit b7484f7
Showing 1 changed file with 51 additions and 0 deletions.
51 changes: 51 additions & 0 deletions tests/providers/google/cloud/operators/test_kubernetes_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,7 @@ def setup_method(self):
namespace=NAMESPACE,
image=IMAGE,
deferrable=True,
on_finish_action="delete_pod",
)
self.gke_op.pod = mock.MagicMock(
name=TASK_NAME,
Expand Down Expand Up @@ -683,6 +684,56 @@ def test_async_create_pod_should_execute_successfully(
fetch_cluster_info_mock.assert_called_once()
assert isinstance(exc.value.trigger, GKEStartPodTrigger)

@pytest.mark.parametrize("status", ["error", "failed", "timeout"])
@mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook.get_pod")
@mock.patch(KUB_OP_PATH.format("_clean"))
@mock.patch(KUB_OP_PATH.format("write_logs"))
def test_execute_complete_failure(self, mock_write_logs, mock_clean, mock_get_pod, status):
self.gke_op._cluster_url = CLUSTER_URL
self.gke_op._ssl_ca_cert = SSL_CA_CERT
with pytest.raises(AirflowException):
self.gke_op.execute_complete(
context=mock.MagicMock(),
event={"name": "test", "status": status, "namespace": "default", "message": ""},
cluster_url=self.gke_op._cluster_url,
ssl_ca_cert=self.gke_op._ssl_ca_cert,
)
mock_write_logs.assert_called_once()

@mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook.get_pod")
@mock.patch(KUB_OP_PATH.format("_clean"))
@mock.patch(KUB_OP_PATH.format("write_logs"))
def test_execute_complete_success(self, mock_write_logs, mock_clean, mock_get_pod):
self.gke_op._cluster_url = CLUSTER_URL
self.gke_op._ssl_ca_cert = SSL_CA_CERT
self.gke_op.execute_complete(
context=mock.MagicMock(),
event={"name": "test", "status": "success", "namespace": "default"},
cluster_url=self.gke_op._cluster_url,
ssl_ca_cert=self.gke_op._ssl_ca_cert,
)
mock_write_logs.assert_called_once()

@mock.patch(KUB_OP_PATH.format("pod_manager"))
@mock.patch(
"airflow.providers.google.cloud.operators.kubernetes_engine.GKEStartPodOperator.invoke_defer_method"
)
@mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook.get_pod")
@mock.patch(KUB_OP_PATH.format("_clean"))
def test_execute_complete_running(
self, mock_clean, mock_get_pod, mock_invoke_defer_method, mock_pod_manager
):
self.gke_op._cluster_url = CLUSTER_URL
self.gke_op._ssl_ca_cert = SSL_CA_CERT
self.gke_op.execute_complete(
context=mock.MagicMock(),
event={"name": "test", "status": "running", "namespace": "default"},
cluster_url=self.gke_op._cluster_url,
ssl_ca_cert=self.gke_op._ssl_ca_cert,
)
mock_pod_manager.fetch_container_logs.assert_called_once()
mock_invoke_defer_method.assert_called_once()


class TestGKEStartJobOperator:
def setup_method(self):
Expand Down

0 comments on commit b7484f7

Please sign in to comment.