-
Notifications
You must be signed in to change notification settings - Fork 14.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add sensor for AWS Batch (#19850) #19885
Conversation
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
|
INTERMEDIATE_STATES = ( | ||
'SUBMITTED', | ||
'PENDING', | ||
'RUNNABLE', | ||
'STARTING', | ||
'RUNNING', | ||
) | ||
FAILURE_STATES = ('FAILED',) | ||
SUCCESS_STATES = ('SUCCEEDED',) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be better to manage the statuses on the hook level.
The hook itself also has a function that require the same definitions:
airflow/airflow/providers/amazon/aws/hooks/batch_client.py
Lines 248 to 255 in 0df50f4
if job_status == "SUCCEEDED": | |
self.log.info("AWS batch job (%s) succeeded: %s", job_id, job) | |
return True | |
if job_status == "FAILED": | |
raise AirflowException(f"AWS Batch job ({job_id}) failed: {job}") | |
if job_status in ["SUBMITTED", "PENDING", "RUNNABLE", "STARTING", "RUNNING"]: |
So we should have one place to define it for both.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point - I will define these in AwsBatchClientHook
and reference it.
While making the change, I noticed that the hook also uses two additional combinations of state values for its own methods
poll_for_job_running
: checks for["RUNNING", "SUCCEEDED", "FAILED"]
.poll_for_job_complete
: checks for["SUCCEEDED", "FAILED"]
I'm planning to create the following state definitions to cater to all cases. Let me know if you think otherwise and leaving the above two methods the same would be better.
FAILURE_STATE = 'FAILED'
SUCCESS_STATE = 'SUCCEEDED'
RUNNING_STATE = 'RUNNING'
INTERMEDIATE_STATES = (
'SUBMITTED',
'PENDING',
'RUNNABLE',
'STARTING',
RUNNING_STATE,
)
if state in self.FAILURE_STATES: | ||
raise AirflowException(f'Batch sensor failed. Batch Job Status: {state}') | ||
|
||
if state in self.INTERMEDIATE_STATES: | ||
return False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not overly familiar with Batch yet. Is there a use case for setting a target state? Rather than just returning success/working/failed, is it desirable to have the option to check if it is currently RUNNING, for example?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sensor's objective is to poke until the job reaches a logical end - success/failure. In this context, I can't think of a use-case where matching against any other target state can be useful. There could be other places where we want to check against a target state and the AwsBatchClientHook already has a method for this.
def poll_job_status(self, job_id: str, match_status: List[str]) -> bool: |
Let me know if I misunderstood your comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was looking at emr_sensor and that helped me understand your question better. Are you asking if making the terminal states configurable such that the sensor can return true/false based on it?
I don't know a definitive answer but this is my thought process
- I could see it useful for DAGs which follow a fire-and-forget pattern to launch the job.
- In fire-and-forget scenarios, would there be a need to use a Sensor? - I don't think so
- In the case of EMR, a cluster RUNNING could be a logical terminal state for long-running clusters. On the contrary, the batch is for ephemeral tasks and the same might not make sense
- I see
AwsBatchOperator
providing this option through thewaiter
argument.
With this, I'm inclining to say that a configurable terminal state might not be useful here. Let me know your thoughts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I appreciate the detailed reply; that makes sense.
As an example of what I was thinking: the EKS Nodegroup sensor can be used to wait for the nodegroup to go "creating" so it can send the command to start spinning up a second, or wait to go "active" and trigger a task to run on it, or wait to go "nonexistent" so the cluster it is attached to could be deleted. I was wondering if Batch had a usecase for adding similar logic, but it sounds like that may not be the case.
Carry on, and sorry for the distraction. 😄
Adds a sensor implementation to ask for the status of an AWS Batch job. The sensor will enable DAGs to wait for the batch job to reach a terminal state before proceeding to the downstream tasks.
Different Aws Batch job statuses are used in both AwsBatchClientHook and BatchSensor. This change defines the status strings inside AwsBatchClientHook such that it can be reused across other classes
b6b5daf
to
0b95fa6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PR is likely OK to be merged with just subset of tests for default Python and Database versions without running the full matrix of tests, because it does not modify the core of Airflow. If the committers decide that the full tests matrix is needed, they will add the label 'full tests needed'. Then you should rebase to the latest main or amend the last commit of the PR, and push it with --force-with-lease. |
Awesome work, congrats on your first merged pull request! |
Adds a sensor implementation to ask for the status of an
AWS Batch job. The sensor will enable DAGs to wait for the
batch job to reach a terminal state before proceeding to the
downstream tasks.
closes: #19850
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.