Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add service_file support to GKEPodAsyncHook #37081

Merged

Conversation

Lee-W
Copy link
Member

@Lee-W Lee-W commented Jan 30, 2024

Currently, GKEPodAsyncHook does not support service_file. Thus, passing credentials through " Keyfile Path " or " Keyfile JSON " will be ignored. This PR intends to fix this issue. As the default value of service_file in Token is None (https://github.com/talkiq/gcloud-aio/blob/8c8e1b39ec2e40b42212c270acb98c039267fbc5/auth/gcloud/aio/auth/token.py#L157) and the return value of service_file_as_context when both key file path and key file json are not provided is None. This change won't affect existing behavior


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@boring-cyborg boring-cyborg bot added area:providers provider:cncf-kubernetes Kubernetes provider related issues provider:google Google (including GCP) related issues labels Jan 30, 2024
@Lee-W Lee-W changed the title feat(providers/google): add service_file support to GKEPodAsyncHook add service_file support to GKEPodAsyncHook Jan 30, 2024
@Lee-W Lee-W force-pushed the add-service-file-support-to-GKEPodAsyncHook branch from 957ad69 to 3339adc Compare January 30, 2024 07:27
@Lee-W Lee-W marked this pull request as ready for review January 30, 2024 08:05
@Lee-W Lee-W force-pushed the add-service-file-support-to-GKEPodAsyncHook branch from d16f2b1 to b5acaff Compare January 30, 2024 09:42
@Lee-W Lee-W force-pushed the add-service-file-support-to-GKEPodAsyncHook branch 5 times, most recently from e5c6eb1 to 1867070 Compare February 3, 2024 13:05
@Lee-W Lee-W requested a review from dirrao February 4, 2024 00:32
@Lee-W Lee-W force-pushed the add-service-file-support-to-GKEPodAsyncHook branch 2 times, most recently from 97615f2 to 16a0be4 Compare February 4, 2024 01:16
@phanikumv phanikumv force-pushed the add-service-file-support-to-GKEPodAsyncHook branch from 16a0be4 to a102a08 Compare February 5, 2024 02:20
@pankajkoti
Copy link
Member

pankajkoti commented Feb 5, 2024

I am merging this one. But please let us know here if there's some feedback and we can address in a subsequent PR

cc: @VladaZakharova

@pankajkoti pankajkoti merged commit 2372e21 into apache:main Feb 5, 2024
56 checks passed
@pankajkoti pankajkoti deleted the add-service-file-support-to-GKEPodAsyncHook branch February 5, 2024 06:20
@vchiapaikeo
Copy link
Contributor

I think this commit is problematic when using application default credentials for auth:

Sample DAG:

from airflow import DAG

from airflow.providers.google.cloud.operators.kubernetes_engine import (
    GKEStartPodOperator,
)


DEFAULT_TASK_ARGS = {
    "owner": "gcp-data-platform",
    "start_date": "2021-04-20",
    "retries": 0,
    "retry_delay": 60,
}

with DAG(
    dag_id="test_gke_op",
    schedule_interval="@daily",
    max_active_runs=1,
    max_active_tasks=5,
    catchup=False,
    default_args=DEFAULT_TASK_ARGS,
) as dag:

    _ = GKEStartPodOperator(
        task_id="whoami",
        name="whoami",
        cmds=["gcloud"],
        arguments=["auth", "list"],
        image="gcr.io/google.com/cloudsdktool/cloud-sdk:slim",
        project_id="redacted-project-id",
        namespace="airflow-default",
        location="us-central1",
        cluster_name="airflow-gke-cluster",
        service_account_name="default",
        deferrable=True,
        do_xcom_push=True,
    )

    _ = GKEStartPodOperator(
        task_id="fail",
        name="fail",
        cmds=["bash"],
        arguments=["-xc", "sleep 2 && exit 1"],
        image="gcr.io/google.com/cloudsdktool/cloud-sdk:slim",
        project_id="redacted-project-id",
        namespace="airflow-default",
        location="us-central1",
        cluster_name="airflow-gke-cluster",
        service_account_name="default",
        deferrable=True,
        do_xcom_push=True,
    )

Logs

[2024-02-09, 20:34:47 EST] {taskinstance.py:1980} INFO - Dependencies all met for dep_context=non-requeueable deps ti=
[2024-02-09, 20:34:47 EST] {taskinstance.py:1980} INFO - Dependencies all met for dep_context=requeueable deps ti=
[2024-02-09, 20:34:47 EST] {taskinstance.py:2194} INFO - Starting attempt 6 of 6
[2024-02-09, 20:34:47 EST] {taskinstance.py:2215} INFO - Executing  on 2024-02-09 00:00:00+00:00
[2024-02-09, 20:34:47 EST] {standard_task_runner.py:60} INFO - Started process 832 to run task
[2024-02-09, 20:34:47 EST] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'test_gke_op', 'fail', 'scheduled__2024-02-09T00:00:00+00:00', '--job-id', '52', '--raw', '--subdir', 'DAGS_FOLDER/test_deferrable_xcom.py', '--cfg-path', '/tmp/tmp_nlq7klv']
[2024-02-09, 20:34:47 EST] {standard_task_runner.py:88} INFO - Job 52: Subtask fail
[2024-02-09, 20:34:47 EST] {task_command.py:423} INFO - Running  on host 8d940ff4e12e
[2024-02-09, 20:34:47 EST] {taskinstance.py:2515} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='gcp-data-platform' AIRFLOW_CTX_DAG_ID='test_gke_op' AIRFLOW_CTX_TASK_ID='fail' AIRFLOW_CTX_EXECUTION_DATE='2024-02-09T00:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='6' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-02-09T00:00:00+00:00'
[2024-02-09, 20:34:47 EST] {connection.py:269} WARNING - Connection schemes (type: google_cloud_platform) shall not contain '_' according to RFC3986.
[2024-02-09, 20:34:47 EST] {base.py:83} INFO - Using connection ID 'google_cloud_default' for task execution.
[2024-02-09, 20:34:47 EST] {kubernetes_engine.py:289} INFO - Fetching cluster (project_id=redacted-project-id, location=us-central1, cluster_name=***-gke-cluster)
[2024-02-09, 20:34:47 EST] {credentials_provider.py:353} INFO - Getting connection using `google.auth.default()` since no explicit credentials are provided.
[2024-02-09, 20:34:47 EST] {_default.py:683} WARNING - No project ID could be determined. Consider running `gcloud config set project` or setting the GOOGLE_CLOUD_PROJECT environment variable
[2024-02-09, 20:34:47 EST] {connection.py:269} WARNING - Connection schemes (type: google_cloud_platform) shall not contain '_' according to RFC3986.
[2024-02-09, 20:34:47 EST] {base.py:83} INFO - Using connection ID 'google_cloud_default' for task execution.
[2024-02-09, 20:34:47 EST] {pod.py:999} INFO - Building pod fail-zmonp9cc with labels: {'dag_id': 'test_gke_op', 'task_id': 'fail', 'run_id': 'scheduled__2024-02-09T0000000000-da6dd5816', 'kubernetes_pod_operator': 'True', 'try_number': '6'}
[2024-02-09, 20:34:47 EST] {credentials_provider.py:353} INFO - Getting connection using `google.auth.default()` since no explicit credentials are provided.
[2024-02-09, 20:34:47 EST] {_default.py:683} WARNING - No project ID could be determined. Consider running `gcloud config set project` or setting the GOOGLE_CLOUD_PROJECT environment variable
[2024-02-09, 20:34:48 EST] {taskinstance.py:2368} INFO - Pausing task as DEFERRED. dag_id=test_gke_op, task_id=fail, execution_date=20240209T000000, start_date=20240210T013447
[2024-02-09, 20:34:48 EST] {local_task_job_runner.py:231} INFO - Task exited with return code 100 (task deferral)
[2024-02-09, 20:34:49 EST] {pod.py:145} INFO - Checking pod 'fail-zmonp9cc' in namespace '***-default'.
[2024-02-09, 20:34:49 EST] {pod.py:227} ERROR - Exception occurred while checking pod phase:
Traceback (most recent call last):
  File "/opt/airflow/airflow/providers/cncf/kubernetes/triggers/pod.py", line 148, in run
    pod = await self.hook.get_pod(
  File "/opt/airflow/airflow/providers/google/cloud/hooks/kubernetes_engine.py", line 511, in get_pod
    async with self.service_file_as_context() as service_file:  # type: ignore[attr-defined]
AttributeError: __aenter__
[2024-02-09, 20:34:49 EST] {triggerer_job_runner.py:604} INFO - Trigger test_gke_op/scheduled__2024-02-09T00:00:00+00:00/fail/-1/6 (ID 5) fired: TriggerEvent<{'name': 'fail-zmonp9cc', 'namespace': 'airflow-default', 'status': 'error', 'message': '__aenter__', 'stack_trace': 'Traceback (most recent call last):\n  File "/opt/airflow/airflow/providers/cncf/kubernetes/triggers/pod.py", line 148, in run\n    pod = await self.hook.get_pod(\n  File "/opt/airflow/airflow/providers/google/cloud/hooks/kubernetes_engine.py", line 511, in get_pod\n    async with self.service_file_as_context() as service_file:  # type: ignore[attr-defined]\nAttributeError: __aenter__\n'}>
[2024-02-09, 20:34:50 EST] {taskinstance.py:1980} INFO - Dependencies all met for dep_context=non-requeueable deps ti=
[2024-02-09, 20:34:50 EST] {taskinstance.py:1980} INFO - Dependencies all met for dep_context=requeueable deps ti=
[2024-02-09, 20:34:50 EST] {taskinstance.py:2192} INFO - Resuming after deferral
[2024-02-09, 20:34:50 EST] {taskinstance.py:2215} INFO - Executing  on 2024-02-09 00:00:00+00:00
[2024-02-09, 20:34:50 EST] {standard_task_runner.py:60} INFO - Started process 868 to run task
[2024-02-09, 20:34:50 EST] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'test_gke_op', 'fail', 'scheduled__2024-02-09T00:00:00+00:00', '--job-id', '53', '--raw', '--subdir', 'DAGS_FOLDER/test_deferrable_xcom.py', '--cfg-path', '/tmp/tmpf66zrbv_']
[2024-02-09, 20:34:50 EST] {standard_task_runner.py:88} INFO - Job 53: Subtask fail
[2024-02-09, 20:34:50 EST] {task_command.py:423} INFO - Running  on host 8d940ff4e12e
[2024-02-09, 20:34:51 EST] {connection.py:269} WARNING - Connection schemes (type: google_cloud_platform) shall not contain '_' according to RFC3986.
[2024-02-09, 20:34:51 EST] {base.py:83} INFO - Using connection ID 'google_cloud_default' for task execution.
[2024-02-09, 20:34:51 EST] {credentials_provider.py:353} INFO - Getting connection using `google.auth.default()` since no explicit credentials are provided.
[2024-02-09, 20:34:51 EST] {_default.py:683} WARNING - No project ID could be determined. Consider running `gcloud config set project` or setting the GOOGLE_CLOUD_PROJECT environment variable
[2024-02-09, 20:34:51 EST] {pod.py:722} INFO - Container logs: + sleep 2
[2024-02-09, 20:34:51 EST] {pod.py:722} INFO - Container logs: + exit 1
[2024-02-09, 20:34:51 EST] {pod.py:722} INFO - Container logs: 
[2024-02-09, 20:34:51 EST] {pod_manager.py:790} INFO - Running command... if [ -s /***/xcom/return.json ]; then cat /***/xcom/return.json; else echo __***_xcom_result_empty__; fi
[2024-02-09, 20:34:52 EST] {pod_manager.py:790} INFO - Running command... kill -s SIGINT 1
[2024-02-09, 20:34:52 EST] {pod.py:546} INFO - xcom result file is empty.
[2024-02-09, 20:34:52 EST] {pod_manager.py:608} INFO - Pod fail-zmonp9cc has phase Running
[2024-02-09, 20:34:54 EST] {pod_manager.py:608} INFO - Pod fail-zmonp9cc has phase Running
[2024-02-09, 20:34:57 EST] {pod.py:856} INFO - Skipping deleting pod: fail-zmonp9cc
[2024-02-09, 20:34:57 EST] {taskinstance.py:2737} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/opt/airflow/airflow/providers/cncf/kubernetes/operators/pod.py", line 692, in execute_complete
    raise AirflowException(message)
airflow.exceptions.AirflowException: __aenter__
Traceback (most recent call last):
  File "/opt/***/***/providers/cncf/kubernetes/triggers/pod.py", line 148, in run
    pod = await self.hook.get_pod(
  File "/opt/***/***/providers/google/cloud/hooks/kubernetes_engine.py", line 511, in get_pod
    async with self.service_file_as_context() as service_file:  # type: ignore[attr-defined]
AttributeError: __aenter__
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/opt/airflow/airflow/models/taskinstance.py", line 446, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
  File "/opt/airflow/airflow/models/taskinstance.py", line 416, in _execute_callable
    return execute_callable(context=context, **execute_callable_kwargs)
  File "/opt/airflow/airflow/models/baseoperator.py", line 1618, in resume_execution
    return execute_callable(context)
  File "/opt/airflow/airflow/providers/google/cloud/operators/kubernetes_engine.py", line 593, in execute_complete
    return super().execute_complete(context, event, **kwargs)
  File "/opt/airflow/airflow/providers/cncf/kubernetes/operators/pod.py", line 708, in execute_complete
    self.post_complete_action(
  File "/opt/airflow/airflow/providers/cncf/kubernetes/operators/pod.py", line 732, in post_complete_action
    self.cleanup(
  File "/opt/airflow/airflow/providers/cncf/kubernetes/operators/pod.py", line 783, in cleanup
    raise AirflowException(
airflow.exceptions.AirflowException: Pod fail-zmonp9cc returned a failure.

m1racoli added a commit to king/airflow that referenced this pull request Feb 16, 2024
We utilize the existing implementation of `_CredentialsToken` by using
the async hook's `get_token` method. This implementation allows us to
leverage several features of the Google connection from `Keyfile Path`
or `Keyfile JSON` (see apache#37081) to impersonation chain on hook or
connection level. We therefore do not need to rely on the async hook's
`service_file_as_context` method, which does not support impersonation
chain.

With this change we effectively gain support for impersonation chain in
GKEStartPodOperator in deferrable mode.
potiuk pushed a commit that referenced this pull request Feb 22, 2024
…7486)

We utilize the existing implementation of `_CredentialsToken` by using
the async hook's `get_token` method. This implementation allows us to
leverage several features of the Google connection from `Keyfile Path`
or `Keyfile JSON` (see #37081) to impersonation chain on hook or
connection level. We therefore do not need to rely on the async hook's
`service_file_as_context` method, which does not support impersonation
chain.

With this change we effectively gain support for impersonation chain in
GKEStartPodOperator in deferrable mode.
abhishekbhakat pushed a commit to abhishekbhakat/my_airflow that referenced this pull request Mar 5, 2024
Currently, GKEPodAsyncHook does not support service_file. Thus, passing credentials through " Keyfile Path " or " Keyfile JSON " will be ignored. This PR intends to fix this issue. As the default value of service_file in Token is None (https://github.com/talkiq/gcloud-aio/blob/8c8e1b39ec2e40b42212c270acb98c039267fbc5/auth/gcloud/aio/auth/token.py#L157) and the return value of service_file_as_context when both key file path and key file json are not provided is None. This change won't affect existing behavior
abhishekbhakat pushed a commit to abhishekbhakat/my_airflow that referenced this pull request Mar 5, 2024
…ache#37486)

We utilize the existing implementation of `_CredentialsToken` by using
the async hook's `get_token` method. This implementation allows us to
leverage several features of the Google connection from `Keyfile Path`
or `Keyfile JSON` (see apache#37081) to impersonation chain on hook or
connection level. We therefore do not need to rely on the async hook's
`service_file_as_context` method, which does not support impersonation
chain.

With this change we effectively gain support for impersonation chain in
GKEStartPodOperator in deferrable mode.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers provider:cncf-kubernetes Kubernetes provider related issues provider:google Google (including GCP) related issues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants