Skip to content

Fix race condition in AsyncKubernetesHook corrupting global kubernetes_asyncio config#65566

Merged
potiuk merged 4 commits into
apache:mainfrom
ZhaoMJ:fix/async-k8s-hook-global-config-race-condition
May 10, 2026
Merged

Fix race condition in AsyncKubernetesHook corrupting global kubernetes_asyncio config#65566
potiuk merged 4 commits into
apache:mainfrom
ZhaoMJ:fix/async-k8s-hook-global-config-race-condition

Conversation

@ZhaoMJ
Copy link
Copy Markdown
Contributor

@ZhaoMJ ZhaoMJ commented Apr 20, 2026

Summary

When multiple deferrable KubernetesPodOperator tasks run concurrently on the same triggerer process, each task's AsyncKubernetesHook calls load_kube_config_from_dict (or load_incluster_config), which writes to the module-level global kubernetes_asyncio Configuration object. Because load_kube_config_from_dict is a coroutine with internal await points, two concurrent triggers for different clusters interleave and produce a mixed global state — e.g. cluster-A's server URL paired with cluster-B's bearer token. _TimeoutAsyncK8sApiClient() was then constructed with no configuration= argument, silently consuming whatever global state happened to be current.

This manifests as intermittent 403 errors on long-running pods in cross-cluster setups: the API server receives a request from the wrong service account identity.

Changes:

  • _TimeoutAsyncK8sApiClient.__init__: add configuration= parameter forwarded to ApiClient, matching the existing sync _TimeoutK8sApiClient pattern.
  • _load_config: initialise self.client_configuration to a fresh async_client.Configuration() when None, ensuring each hook instance owns an isolated object that load_* functions populate in-place (rather than calling Configuration.set_default()).
  • load_incluster_config call: pass client_configuration=self.client_configuration.
  • load_kube_config_from_dict call: pass client_configuration=self.client_configuration.
  • get_conn: pass configuration=self.client_configuration to _TimeoutAsyncK8sApiClient.

The kubeconfig_path, kube_config (tempfile), and default-file branches already forwarded client_configuration=self.client_configuration; they now benefit from the initialisation fix so the value is never None.

Seven new tests are added covering each changed path and a concurrency regression test using asyncio.gather.


Was generative AI tooling used to co-author this PR?
  • Yes — Claude Sonnet 4.6

Generated-by: Claude Sonnet 4.6 following the guidelines

Mingjie Zhao added 2 commits April 21, 2026 03:18
…s_asyncio config

When multiple deferrable KubernetesPodOperator tasks run concurrently on the
same triggerer, each task's AsyncKubernetesHook calls load_kube_config_from_dict
(or load_incluster_config), which writes to the module-level global
kubernetes_asyncio Configuration. Because load_kube_config_from_dict is async
and contains multiple await points, two coroutines for different clusters can
interleave: one writes its server URL while the other writes its bearer token,
producing a mixed configuration. _TimeoutAsyncK8sApiClient() was then
instantiated with no configuration= argument, so it silently consumed whatever
global state happened to be current at that moment.

The fix has four parts:
- Add __init__ to _TimeoutAsyncK8sApiClient accepting configuration= (matching
  the existing sync _TimeoutK8sApiClient pattern).
- Initialise self.client_configuration to a fresh async_client.Configuration()
  at the top of _load_config when it is None, ensuring each hook instance owns
  an isolated Configuration object.
- Pass client_configuration=self.client_configuration to load_incluster_config
  and load_kube_config_from_dict, which previously wrote only to global state.
- Pass configuration=self.client_configuration when constructing
  _TimeoutAsyncK8sApiClient in get_conn, so the client uses the per-instance
  configuration rather than the global default.

The kubeconfig_path, kube_config (tempfile), and default-file branches already
forwarded client_configuration=self.client_configuration; they now benefit from
the initialisation fix so the value is never None.
Covers the race condition fix across five scenarios:

- _TimeoutAsyncK8sApiClient accepts a configuration= argument and forwards it
  to the parent ApiClient, ensuring each instance is isolated.
- No-arg construction still works for backwards compatibility.
- _load_config allocates a fresh async_client.Configuration() per hook instance
  when none was supplied by the caller.
- load_incluster_config and load_kube_config_from_dict are each called with the
  per-instance client_configuration rather than leaving it as None (which would
  cause both functions to fall back to Configuration.set_default() and corrupt
  shared global state).
- get_conn passes configuration= to _TimeoutAsyncK8sApiClient so the API client
  uses the hook's isolated Configuration, not the global default.
- The core regression test runs two hooks for different clusters concurrently
  via asyncio.gather and asserts that each hook's client_configuration is a
  distinct object populated only with that hook's own connection data.
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR fixes a concurrency bug in the CNCF Kubernetes provider where concurrent AsyncKubernetesHook instances could corrupt the module-level kubernetes_asyncio default Configuration, leading to cross-cluster credential/host mixups (intermittent auth failures). The fix ensures each hook instance owns and uses an isolated kubernetes_asyncio Configuration object, and that the async ApiClient is constructed with that configuration.

Changes:

  • Add a configuration= parameter to _TimeoutAsyncK8sApiClient and forward it to kubernetes_asyncio.client.ApiClient.
  • Ensure AsyncKubernetesHook._load_config() always initializes a per-instance async_client.Configuration() and passes it into load_incluster_config / load_kube_config_from_dict.
  • Update AsyncKubernetesHook.get_conn() to construct _TimeoutAsyncK8sApiClient(configuration=...) and add unit/regression tests for the new behavior.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.

File Description
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py Switch async config loading + client construction to use a per-hook kubernetes_asyncio Configuration instance (avoids global default contamination).
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/hooks/test_kubernetes.py Add tests covering configuration forwarding/initialization and a regression test intended to cover the concurrency scenario.

…Hook

The parent KubernetesHook.client_configuration is typed as
kubernetes.client.Configuration (sync). Rather than using a separate
attribute, AsyncKubernetesHook now explicitly accepts and re-declares
client_configuration as async_client.Configuration | None, making the
override visible to type checkers and callers without adding a new
attribute to track.

Both Configuration classes are generated from the same OpenAPI source and
are structurally compatible when library versions align.

Also add an await asyncio.sleep(0) yield inside the populate_config mock
side effect in the concurrency regression test so asyncio.gather can
actually interleave the two coroutines and exercise the race window.
@potiuk potiuk added the ready for maintainer review Set after triaging when all criteria pass. label Apr 22, 2026
Copy link
Copy Markdown
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense! Thanks!

@potiuk potiuk merged commit e2cf316 into apache:main May 10, 2026
113 checks passed
@boring-cyborg
Copy link
Copy Markdown

boring-cyborg Bot commented May 10, 2026

Awesome work, congrats on your first merged pull request! You are invited to check our Issue Tracker for additional contributions.

jason810496 pushed a commit to jason810496/airflow that referenced this pull request May 11, 2026
…s_asyncio config (apache#65566)

* Fix race condition in AsyncKubernetesHook corrupting global kubernetes_asyncio config

When multiple deferrable KubernetesPodOperator tasks run concurrently on the
same triggerer, each task's AsyncKubernetesHook calls load_kube_config_from_dict
(or load_incluster_config), which writes to the module-level global
kubernetes_asyncio Configuration. Because load_kube_config_from_dict is async
and contains multiple await points, two coroutines for different clusters can
interleave: one writes its server URL while the other writes its bearer token,
producing a mixed configuration. _TimeoutAsyncK8sApiClient() was then
instantiated with no configuration= argument, so it silently consumed whatever
global state happened to be current at that moment.

The fix has four parts:
- Add __init__ to _TimeoutAsyncK8sApiClient accepting configuration= (matching
  the existing sync _TimeoutK8sApiClient pattern).
- Initialise self.client_configuration to a fresh async_client.Configuration()
  at the top of _load_config when it is None, ensuring each hook instance owns
  an isolated Configuration object.
- Pass client_configuration=self.client_configuration to load_incluster_config
  and load_kube_config_from_dict, which previously wrote only to global state.
- Pass configuration=self.client_configuration when constructing
  _TimeoutAsyncK8sApiClient in get_conn, so the client uses the per-instance
  configuration rather than the global default.

The kubeconfig_path, kube_config (tempfile), and default-file branches already
forwarded client_configuration=self.client_configuration; they now benefit from
the initialisation fix so the value is never None.

* Add tests for AsyncKubernetesHook per-instance Configuration isolation

Covers the race condition fix across five scenarios:

- _TimeoutAsyncK8sApiClient accepts a configuration= argument and forwards it
  to the parent ApiClient, ensuring each instance is isolated.
- No-arg construction still works for backwards compatibility.
- _load_config allocates a fresh async_client.Configuration() per hook instance
  when none was supplied by the caller.
- load_incluster_config and load_kube_config_from_dict are each called with the
  per-instance client_configuration rather than leaving it as None (which would
  cause both functions to fall back to Configuration.set_default() and corrupt
  shared global state).
- get_conn passes configuration= to _TimeoutAsyncK8sApiClient so the API client
  uses the hook's isolated Configuration, not the global default.
- The core regression test runs two hooks for different clusters concurrently
  via asyncio.gather and asserts that each hook's client_configuration is a
  distinct object populated only with that hook's own connection data.

* Address review: override client_configuration type in AsyncKubernetesHook

The parent KubernetesHook.client_configuration is typed as
kubernetes.client.Configuration (sync). Rather than using a separate
attribute, AsyncKubernetesHook now explicitly accepts and re-declares
client_configuration as async_client.Configuration | None, making the
override visible to type checkers and callers without adding a new
attribute to track.

Both Configuration classes are generated from the same OpenAPI source and
are structurally compatible when library versions align.

Also add an await asyncio.sleep(0) yield inside the populate_config mock
side effect in the concurrency regression test so asyncio.gather can
actually interleave the two coroutines and exercise the race window.

---------

Co-authored-by: Mingjie Zhao <mzhao@drwholdings.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:cncf-kubernetes Kubernetes (k8s) provider related issues ready for maintainer review Set after triaging when all criteria pass.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants