-
Notifications
You must be signed in to change notification settings - Fork 13.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add DbtCloudRetryJobOperator to retry failed dbt job #38001
Conversation
3f48cf4
to
4933fe5
Compare
hey @josh-fell do you still plan to review this PR? If not, can you assign some other reviewers? Thanks. |
2271132
to
e42e524
Compare
cc @Taragolis @hussein-awala @eladkal anyone can help review? Thanks! |
e42e524
to
4129dc7
Compare
elif job_run_status == DbtCloudJobRunStatus.SUCCESS.value: | ||
self.log.info("Job run %s has completed successfully.", self.run_id) | ||
return self.run_id | ||
elif job_run_status in ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@andyguwc is there any reason to check the cancelled and error in the same condition statement? exception raised could be more specific if we put in different conditions right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this logic came from the existing DBT operator. I think the rationale is we probably just care about success. And CANCELLED ERROR needs manual triaging anyways.
That said, I see the other comment too about why creating a new operator which duplicates a lot of code. Let me think about the other approach as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not create a flag on the DbtCloudRunJobOperator to retry from failure rather than create a new operator that reuses a lot of the same code?
@josh-fell thanks. I was actually going back and forth between these approaches. I thought separating is cleaner because the operator only does one thing, and the user doesn't need to think through both "retry" from airflow task perspective and "retry" by calling that DBT API to retry from point of failure. Let me think more about this. |
@josh-fell As I think about adding a flag for retry from failure. I can't wrap my head around the logic for retry in a deferrable operator. Mind share your thoughts on how this could work if the operator is deferrable? |
@andyguwc Thanks for your patience here with my responses! How I think about this feature, from a UX perspective, is really handling which endpoint is used in the From the documentation of this new endpoint, it seems as though it could be used for retries as well as regular job execution.
What this suggests is the dbt API handling the lookup of the last run for a given job and choosing whether or not to start a new instance of the job or not, which is great! So, I would think the implementation in Airflow would be adding a I could be misinterpreting the robustness of this new endpoint. If I am, the checking of the last job run status could be implemented in the hook and then carry on with the endpoint decision. I hope that helps! |
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions. |
This PR adds a new operator to the dbt provider
DbtCloudRetryJobOperator
. This operator calls DBT cloud API for retrying a job from point of failure.Note after making the retrying call, the behavior of this operator is very similar to the
DbtCloudRunJobOperator
to poll the job status, hence it references the same operator linkDbtCloudRunJobOperatorLink
and the openlineage code is also the same.Also makes a minor fix for
DbtCloudRunJobOperator
and related docs.Tested locally with example DBT dag
closes: #35772
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.