Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get spark driver pod status if log stream interrupted accidentally #9081

Closed
wants to merge 7 commits into from
Closed

Conversation

dawany
Copy link

@dawany dawany commented May 31, 2020

#8963

Description

I am using airflow SparkSubmitOperator to schedule my spark jobs on kubernetes cluster.

But for some reason, kubernetes often throw 'too old resource version' exception which will interrupt spark watcher, then airflow will lost the log stream and could not get 'Exit Code' eventually. So airflow will mark job failed once log stream lost but the job is still running.

This is a solution about a simple retry mechanism which is when the log stream is interrupted, then call method 'read_namespaced_pod()', which is provided by kubernetes client api, to get spark driver pod status.

Target Github ISSUE

#8963


Make sure to mark the boxes below before creating PR: [x]

  • Description above provides context of the change
  • Unit tests coverage for changes (not needed for documentation changes)
  • Target Github ISSUE in description if exists
  • Commits follow "How to write a good git commit message"
  • Relevant documentation is updated including usage instructions.
  • I will engage committers as explained in Contribution Workflow Example.

In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.

"Cannot execute: {}. Error code is: {}.".format(
self._mask_cmd(spark_submit_cmd), returncode
# double check by spark driver pod status (blocking function)
spark_driver_pod_status = self._start_k8s_pod_status_tracking()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to fail hard when not in kubenetes mode.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, thanks for that, I've split the 'if' conditions to remove the influence when not in k8s mode.

@dawany
Copy link
Author

dawany commented Jun 24, 2020

@ashb would you mind review the new code change? thank you!

@stale
Copy link

stale bot commented Aug 8, 2020

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@stale stale bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Aug 8, 2020
@stale stale bot closed this Aug 16, 2020
@berglh
Copy link

berglh commented Jun 1, 2023

@dawany I know it's been a long time, but we're suffering with this exact issue and for reasons I'd rather not get into we're currently stuck on Airflow 1.10.11. Just curious, did you test this code? Are you no longer experiencing this in newer versions? This seems like a reasonable solution that could easily be patched with a config map or building it into a custom container.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
stale Stale PRs per the .github/workflows/stale.yml policy file
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants