-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Check whether the task finishes before deferring the task for KubernetesPodOperatorAsync #1104
Check whether the task finishes before deferring the task for KubernetesPodOperatorAsync #1104
Conversation
e6c08cb
to
f867f2d
Compare
Codecov ReportPatch coverage:
Additional details and impacted files@@ Coverage Diff @@
## main #1104 +/- ##
=======================================
Coverage 98.58% 98.58%
=======================================
Files 90 90
Lines 5377 5383 +6
=======================================
+ Hits 5301 5307 +6
Misses 76 76
☔ View full report in Codecov by Sentry. |
f867f2d
to
a2dc9aa
Compare
astronomer/providers/cncf/kubernetes/operators/kubernetes_pod.py
Outdated
Show resolved
Hide resolved
a2dc9aa
to
9bda075
Compare
astronomer/providers/cncf/kubernetes/operators/kubernetes_pod.py
Outdated
Show resolved
Hide resolved
"status": "failed", | ||
"message": self.pod.status.message, | ||
} | ||
return self.trigger_reentry(context=context, event=event) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the event payload that we send to trigger_reentry handled correctly? I am doubting if it is consistent with how we return it from the triggerer and that it gets handled correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
9bda075
to
2503200
Compare
self.pod_request_obj = self.build_pod_request_obj(context) | ||
self.pod: k8s.V1Pod = self.get_or_create_pod(self.pod_request_obj, context) | ||
pod_status = self.pod.status.phase | ||
if pod_status in PodPhase.terminal_states or not container_is_running( | ||
pod=self.pod, container_name=self.BASE_CONTAINER_NAME |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will there always be a base container in all the pods? Also, if there is a base container what does it do? I am thinking if there are multiple containers in the pod then should we not check for the running status of all the containers in the pod? Or the base container is meant to keep a check on the running status of other containers in the pod?
If it's not possible to check the status of all containers I think we could just remove the or
condition which checks the container status and then the PR looks good to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so 🤔 That's how we check when deferred.
https://github.com/astronomer/astronomer-providers/blob/main/astronomer/providers/cncf/kubernetes/operators/kubernetes_pod.py#L69
https://github.com/astronomer/astronomer-providers/blob/main/astronomer/providers/cncf/kubernetes/triggers/wait_container.py#L109
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am afraid we have a wrong implementation there with respect to the above questions. It's done a bit differently in the OSS provider, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, I think we implement the logic in different ways
2503200
to
75841ed
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have a strong reservation on this. Since this is consistent with the existing trigger implementation, other issues would be outside the scope of the PR.
Thanks! I'll merge it for now. As deferrable mode has been added in OSS airflow, I think we'll eventually do something like #1169 |
…peratorAsync (#1104)" This reverts commit 89ccc7e. PR #1104 adds logic to poke for Pod status before putting the status check on deferral. However, it is observed that our DAG fails always when we go on checking the status.phase on pods as Pod.status is None before the pod gets scheduled on a None. So, in most scenarios it does not make sense to check for the pod status immediately to verify that the pod has completed it's desired execution and hence, we revert this poke in case of Kubernetes Pod operator.
…peratorAsync (#1104)" (#1209) This reverts commit 89ccc7e. PR #1104 adds logic to poke for Pod status before putting the status check on deferral. However, it is observed that our DAG fails always when we go on checking the status.phase on pods as Pod.status is None before the pod gets scheduled on a node. So, in most scenarios it does not make sense to check for the pod status immediately to verify that the pod has completed its desired execution and hence, we revert this poke in case of Kubernetes Pod operator. closes: #1208
Like the SnowflakeOperatorAsync, we try to verify if the submitted job has already completed before deferring it to prevent unnecessary deferring. This way, we can skip deferring the task if it has already been finished.