fix(amazon): EksPodOperator deferrable mode fails on remote triggerers#63020
fix(amazon): EksPodOperator deferrable mode fails on remote triggerers#63020akhilesharora wants to merge 3 commits intoapache:mainfrom
Conversation
|
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
|
fdad29e to
dc9c61f
Compare
SameerMesiah97
left a comment
There was a problem hiding this comment.
Looks fine. But there was one critical issue that I have commented on already. And I do have one other concern:
Since the EKS authentication token generated by fetch_access_token_for_cluster typically expires after approx. 15 minutes, I’m wondering how this behaves for longer-running triggers. If the trigger polls the Kubernetes API for longer than the token lifetime, could the embedded token expire and cause authentication failures?
CI needs to be run to see if there any other issues.
|
|
||
| :param eks_cluster_name: The name of the cluster to generate kubeconfig for. | ||
| :param pod_namespace: The namespace to run within kubernetes. | ||
| :return: A kubeconfig dict with embedded bearer token. |
There was a problem hiding this comment.
This docstring is too long. The emphasis should be on description not justification. Something like this would be better:
"""
Generate a kubeconfig dict with an embedded bearer token for deferrable execution.
The token-based config avoids the exec credential plugin so it can be safely
serialized and used by the triggerer process.
:param eks_cluster_name: The name of the EKS cluster.
:param pod_namespace: The Kubernetes namespace.
:return: Kubeconfig dictionary with embedded bearer token.
"""
The additional content where you explain why this function exists might be better as a comment.
| sts_url = f"{StsHook(region_name=session.region_name).conn_client_meta.endpoint_url}/?Action=GetCallerIdentity&Version=2011-06-15" | ||
| finally: | ||
| del os.environ["AWS_STS_REGIONAL_ENDPOINTS"] | ||
|
|
There was a problem hiding this comment.
Modifying environment variables is not acceptable here. os.environ is process-global so setting and deleting AWS_STS_REGIONAL_ENDPOINTS like this could interfere with other tasks running in the same worker process. It could also remove a value that was already set by the environment.
I am not sure why you need to manipulate environment variables because my understanding is that the url construction would default to 'regional' without explicitly setting AWS_STS_REGIONAL_ENDPOINTS to regional.
There was a problem hiding this comment.
You're right, I've removed the env var manipulation entirely. Now constructing the regional STS URL directly: https://sts.{region}.amazonaws.com/.... Also applied this fix to the existing generate_config_file method for consistency.
| sts_url=sts_url, | ||
| region_name=session.region_name, | ||
| ) | ||
| except Exception as e: |
There was a problem hiding this comment.
The Exception here is too broad. Can you perhaps narrow it to the errors you would expect when invoking fetch_access_token_for_cluster ?
|
|
||
| This override generates a kubeconfig with an embedded bearer token instead of an exec | ||
| block, allowing the config to work on the triggerer without requiring local temp files. | ||
| """ |
There was a problem hiding this comment.
This docstring is too long as well. I would suggest the below:
"""
Override to generate a token-based kubeconfig for the triggerer.
EKS kubeconfigs use an exec credential plugin that references temporary
files created on the worker. These files are not available on the triggerer,
so this override embeds a bearer token instead.
"""
Same as above, I would leave the truncated content to be included in a comment instead.
| import datetime | ||
|
|
||
| from airflow.providers.cncf.kubernetes.triggers.pod import ContainerState, KubernetesPodTrigger | ||
| from airflow.providers.common.compat.sdk import AirflowNotFoundException, BaseHook |
There was a problem hiding this comment.
Why are these imports within the function? Not necessarily an issue but can you explain why?
There was a problem hiding this comment.
I placed them locally but they could be moved to module level to match the parent class pattern
|
|
||
| This test verifies that the method generates a kubeconfig dict with a bearer token | ||
| embedded directly (instead of an exec block that references temp files), allowing | ||
| the config to be serialized and used on the triggerer. |
There was a problem hiding this comment.
Remove everything from this docstring except the first line.
| This test verifies that EksPodOperator.invoke_defer_method() generates a kubeconfig | ||
| with an embedded bearer token (instead of an exec block with temp file references) | ||
| so that the triggerer can authenticate without requiring files that only exist on the worker. | ||
| """ |
There was a problem hiding this comment.
Remove everything from this docstring except the first line.
| @@ -0,0 +1 @@ | |||
| Fix EksPodOperator deferrable mode failing on remote triggerers with 401 Unauthorized by embedding bearer token in kubeconfig instead of using exec block with temp file references | |||
There was a problem hiding this comment.
I am not sure if a news fragment is necessary for this but let's see what a committer/maintainer has to say.
There was a problem hiding this comment.
Will defer to committer guidance on this. Included it as it's a user-facing bugfix
When using EksPodOperator with deferrable=True, the triggerer fails with 401 Unauthorized because credential temp files created on the worker don't exist on the triggerer host. The root cause is that the kubeconfig exec block references a temp file path that only exists on the worker. When the trigger is serialized and sent to the triggerer (on a different host), the exec block tries to source a file that doesn't exist. This fix adds a new method `generate_config_dict_for_deferral()` that creates a kubeconfig with an embedded bearer token instead of an exec block. The EksPodOperator.invoke_defer_method() is overridden to use this token-based config for the triggerer. Security considerations: - Token is encrypted at rest (Fernet encryption in trigger serialization) - Token has short lifespan (~14 minutes) - Token is never logged - Robust error handling with clear messages Closes apache#61736
- Remove os.environ manipulation for AWS_STS_REGIONAL_ENDPOINTS, construct regional STS URL directly to avoid process-global side effects - Trim verbose docstrings to focus on description rather than justification - Narrow exception handling from broad Exception to specific (BotoCoreError, ClientError, ValueError) - Add comment explaining why imports are inside invoke_defer_method - Apply same env var fix to existing generate_config_file method
dc9c61f to
cfcd906
Compare
Yes, if the trigger polls for longer than ~14 minutes, the token could expire. Anticipating most pod operations (startup, completion monitoring) to finish well within this window, and For very long-running pods, token refresh in the trigger could be a future enhancement. |
Summary
Fix
EksPodOperatorwithdeferrable=Truefailing with 401 Unauthorized when the triggerer runs on a different host from the worker.Root Cause: The kubeconfig exec block references a temp file path (
/tmp/tmpXYZ) that only exists on the worker. When the trigger is serialized and sent to the triggerer, the exec block tries to source a file that doesn't exist.Solution: Generate a kubeconfig with an embedded bearer token instead of an exec block with temp file references.
Changes
EksHook.generate_config_dict_for_deferral()- generates kubeconfig with embedded tokenEksPodOperator.invoke_defer_method()to use token-based config for triggererSecurity Considerations
Test Plan
test_generate_config_dict_for_deferral- verifies embedded token configtest_generate_config_dict_for_deferral_cluster_not_found- error handlingtest_generate_config_dict_for_deferral_empty_token- security validationtest_generate_config_dict_for_deferral_token_fetch_failure- error handlingtest_invoke_defer_method_generates_token_based_config- operator integrationCloses #61736
Was generative AI tooling used to co-author this PR?
Generated-by: Claude Code (Opus 4.5) following the guidelines