Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent KubernetesPodOperator from finding pods with the wrong name #25882

Closed
wants to merge 5 commits into from

Conversation

mtilda
Copy link

@mtilda mtilda commented Aug 22, 2022

Changes

This PR adds a condition to the method find_pod to verify that the name of the pod matches the name defined by the user. This will ensure two pods with different names but identical context do not collide.

Issue

Two (or more) pods created with KubernetesPodOperator from identical context (namespace, dag_id, task_id, run_id, and map_index) across different Airflow environments (e.g. staging + production) collide, causing race conditions. This happens because the method find_pods in KubernetesPodOperator sees these pods as identical.

At first, I thought this was a name collision, so I prepended the string stg-, or prd- (based on an Airflow Variable) to the pod name. It seems this did not fix my problem; however it made the problem easier to debug.

As you can see in the logs below, instead of creating the pod prd-example-9ba5a14b73ab41988c1c57afe5ec81f4, KPO found the "matching" pod stg-example-ce40ace00c704b87a56241b87bc4ff47.

...
[2022-08-10, 07:56:49 EDT] {kubernetes_pod.py:221} INFO - Creating pod prd-example-9ba5a14b73ab41988c1c57afe5ec81f4 with labels: {'dag_id': 'example_k8s_pod_operator', 'task_id': 'run_k8s_pod', 'run_id': 'scheduled__2022-08-09T1145000000-c7f22427c', 'kubernetes_pod_operator': 'True', 'try_number': '4'}
[2022-08-10, 07:56:49 EDT] {kubernetes_pod.py:366} INFO - Found matching pod stg-example-9ba5a14b73ab41988c1c57afe5ec81f4 with labels {'airflow_version': '2.3.2-astro.2', 'dag_id': 'example_k8s_pod_operator', 'kubernetes_pod_operator': 'True', 'run_id': 'scheduled__2022-08-09T1145000000-c7f22427c', 'task_id': 'run_k8s_pod', 'try_number': '4'}
[2022-08-10, 07:56:49 EDT] {kubernetes_pod.py:367} INFO - `try_number` of task_instance: 4
[2022-08-10, 07:56:49 EDT] {kubernetes_pod.py:368} INFO - `try_number` of pod: 4
...

I would like to see the find_pod method fail to find a pod in this situation, because the name does not match what the user defined.

Test

Run two Airflow projects with identical DAGs (same dag_id and schedule args) with an identical task (same task_id) like below:

with DAG(
    "example_k8s_pod_operator"
    description="Runs scripts in Kubernetes Pods",
    default_args={
        "retries": 1,
        "retry_delay": timedelta(seconds=10),
        "trigger_rule": "all_done",
    },
    start_date=datetime(2022, 1, 1),
    schedule_interval="@daily",
) as dag:

    KubernetesPodOperator(
        task_id="run_k8s_pod",
        arguments=["Hello world!"],
        cmds=["echo"],
        get_logs=True,
        image="ubuntu",
        # in_cluster=False,  # I am using a different cluster, but you should not need to
        is_delete_operator_pod=True,
        namespace="default",
        name="example",  # Change this manually or with Airflow Variable
    )

Change the argument name, so it is unique across your Airflow instances.

Trigger these DAGs at approximately the same time for the same date (ensure run_id is the same). Then observe logs for race condition behavior described above.


Astronomer Support

This relates to Astronomer Support ticket #11647.

Notes

I am a new contributor, so feel free to educate me on contribution best practices.

@boring-cyborg boring-cyborg bot added provider:cncf-kubernetes Kubernetes provider related issues area:providers labels Aug 22, 2022
@boring-cyborg
Copy link

boring-cyborg bot commented Aug 22, 2022

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
Here are some useful points:

  • Pay attention to the quality of your code (flake8, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

@RNHTTR
Copy link
Collaborator

RNHTTR commented Aug 22, 2022

Can you also add a unit test please?

@mtilda
Copy link
Author

mtilda commented Aug 23, 2022

Can you also add a unit test please?

Happy to! I am playing around with the testing suite for the first time.

I would like to acknowledge a logical flaw in this PR. The name gets mutated with a hash string at the end, so we will need to account for that.

If I create a pod like this:

KubernetesPodOperator(
  *args,
  name="example",
  **kwargs,
)

The resulting pod name may look like this:

example-f19633028ebb49d9b8db3419c38dce6a

I wonder if the this mutated name is accessible somewhere (need to keep digging ⛏), or I can just

        elif num_pods == 1 and pod.metadata.name.startswith(self.name):

@mtilda
Copy link
Author

mtilda commented Aug 23, 2022

I wonder if the this mutated name is accessible somewhere (need to keep digging ⛏), or I can just

        elif num_pods == 1 and pod.metadata.name.startswith(self.name):

Looking at the existing test code below, it looks like we are safe to just use startswith here.

assert pod.metadata.name.startswith(name_base)

Not yet tested, as I am still learning breeze
@mtilda
Copy link
Author

mtilda commented Aug 23, 2022

Can you also add a unit test please?

@RNHTTR I wrote a draft of some Kubernetes test code. I have not tried running it yet, because I am still learning how to use the testing suite. I went ahead and pushed a commit, so others could see and maybe help me out.

I piggy-backed on another similar test (test_reattach_failing_pod_once), but maybe I should create a new one. Thoughts?

Here is my pseudocode of the essential functionality that should be tested:

  1. Create pod
    1. Create a task from KubernetesPodOperator (name="good_name")
    2. Execute that task to launch a pod
    3. Assert new pod was created
  2. Create pod with different name (this can be deleted immediately)
    1. Create a task from KubernetesPodOperator -- (name="bad_name")
    2. Execute that task to launch a pod
    3. Assert new pod was created
  3. Reattach to existing pod
    1. Create a task from KubernetesPodOperator -- (name="good_name")
    2. Execute that task to launch a pod
    3. Assert new pod was not created

@RNHTTR
Copy link
Collaborator

RNHTTR commented Aug 23, 2022

I'm not super familiar with the inner workings of KPO, but it mostly looks good at first glance.

A couple nits:

  1. Should it be something other than good_name and bad_name? Is this actually an antipattern that the test is testing against, or is good/bad subjective, and something like existing_name and new_name make sense?
  2. Regarding your other point, I think it probably makes more sense to have two separate tests. Maybe use the old one as a template for a new test that has a name like test_reattach_pod_new_name? I'm not sure if that name makes sense, but a name that precisely indicates what the test is trying to accomplish.

Comment on lines +377 to 380
elif num_pods == 1 and pod_list[0].metadata.name.startswith(self.name):
pod = pod_list[0]
self.log.info("Found matching pod %s with labels %s", pod.metadata.name, pod.metadata.labels)
self.log.info("`try_number` of task_instance: %s", context['ti'].try_number)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also log something if a pod is found but under the wrong name so the user can understand the context better when debugging.

@@ -370,11 +370,11 @@ def find_pod(
label_selector=label_selector,
).items

pod = None
pod: Optional[k8s.V1Pod] = None
num_pods = len(pod_list)
if num_pods > 1:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it also mean: if more than one pod are found here, we can actually use the name to match & find the Pod we desire and then proceed, rather than raising an exception?

@github-actions
Copy link

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Nov 18, 2022
@github-actions github-actions bot closed this Nov 24, 2022
@amoghrajesh
Copy link
Contributor

Did this make it in? What is blocking it? I can take it to completion if nobody is working on it actively @XD-DENG

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers provider:cncf-kubernetes Kubernetes provider related issues stale Stale PRs per the .github/workflows/stale.yml policy file
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants