Skip to content

Commit

Permalink
Fix AWS Batch waiter failure state (#33656)
Browse files Browse the repository at this point in the history
* Fix AWS Batch waiter failure state

* Add tests for AWS Batch batch_job_complete waiter
  • Loading branch information
yermalov-here committed Aug 23, 2023
1 parent 911cf46 commit 85aea74
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 3 deletions.
6 changes: 3 additions & 3 deletions airflow/providers/amazon/aws/waiters/batch.json
Expand Up @@ -17,7 +17,7 @@
"argument": "jobs[].status",
"expected": "FAILED",
"matcher": "pathAll",
"state": "failed"
"state": "failure"
}
]
},
Expand All @@ -37,13 +37,13 @@
"argument": "computeEnvironments[].status",
"expected": "INVALID",
"matcher": "pathAny",
"state": "failed"
"state": "failure"
},
{
"argument": "computeEnvironments[].status",
"expected": "DELETED",
"matcher": "pathAny",
"state": "failed"
"state": "failure"
}
]
}
Expand Down
56 changes: 56 additions & 0 deletions tests/providers/amazon/aws/waiters/test_custom_waiters.py
Expand Up @@ -27,6 +27,7 @@
from moto import mock_eks

from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow.providers.amazon.aws.hooks.batch_client import BatchClientHook
from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook
from airflow.providers.amazon.aws.hooks.ecs import EcsClusterStates, EcsHook, EcsTaskDefinitionStates
from airflow.providers.amazon.aws.hooks.eks import EksHook
Expand Down Expand Up @@ -295,3 +296,58 @@ def test_export_table_to_point_in_time_failed(self, mock_describe_export):
ExportArn="LoremIpsumissimplydummytextoftheprintingandtypesettingindustry",
WaiterConfig={"Delay": 0.01, "MaxAttempts": 3},
)


class TestCustomBatchServiceWaiters:
"""Test waiters from ``amazon/aws/waiters/batch.json``."""

JOB_ID = "test_job_id"

@pytest.fixture(autouse=True)
def setup_test_cases(self, monkeypatch):
self.client = boto3.client("batch", region_name="eu-west-3")
monkeypatch.setattr(BatchClientHook, "conn", self.client)

@pytest.fixture
def mock_describe_jobs(self):
"""Mock ``BatchClientHook.Client.describe_jobs`` method."""
with mock.patch.object(self.client, "describe_jobs") as m:
yield m

def test_service_waiters(self):
hook_waiters = BatchClientHook(aws_conn_id=None).list_waiters()
assert "batch_job_complete" in hook_waiters

@staticmethod
def describe_jobs(status: str):
"""
Helper function for generate minimal DescribeJobs response for a single job.
https://docs.aws.amazon.com/batch/latest/APIReference/API_DescribeJobs.html
"""
return {
"jobs": [
{
"status": status,
},
],
}

def test_job_succeeded(self, mock_describe_jobs):
"""Test job succeeded"""
mock_describe_jobs.side_effect = [
self.describe_jobs(BatchClientHook.RUNNING_STATE),
self.describe_jobs(BatchClientHook.SUCCESS_STATE),
]
waiter = BatchClientHook(aws_conn_id=None).get_waiter("batch_job_complete")
waiter.wait(jobs=[self.JOB_ID], WaiterConfig={"Delay": 0.01, "MaxAttempts": 2})

def test_job_failed(self, mock_describe_jobs):
"""Test job failed"""
mock_describe_jobs.side_effect = [
self.describe_jobs(BatchClientHook.RUNNING_STATE),
self.describe_jobs(BatchClientHook.FAILURE_STATE),
]
waiter = BatchClientHook(aws_conn_id=None).get_waiter("batch_job_complete")

with pytest.raises(WaiterError, match="Waiter encountered a terminal failure state"):
waiter.wait(jobs=[self.JOB_ID], WaiterConfig={"Delay": 0.01, "MaxAttempts": 2})

0 comments on commit 85aea74

Please sign in to comment.