diff --git a/providers/google/src/airflow/providers/google/suite/transfers/gcs_to_gdrive.py b/providers/google/src/airflow/providers/google/suite/transfers/gcs_to_gdrive.py index e184f00a1bc3f..00a6051a153c7 100644 --- a/providers/google/src/airflow/providers/google/suite/transfers/gcs_to_gdrive.py +++ b/providers/google/src/airflow/providers/google/suite/transfers/gcs_to_gdrive.py @@ -65,7 +65,7 @@ class GCSToGoogleDriveOperator(BaseOperator): copied to ``blah/baz``; to retain the prefix write the destination_object as e.g. ``blah/foo``, in which case the copied file will be named ``blah/foo/baz``. :param destination_folder_id: The folder ID where the destination objects will be placed. It is - an additive prefix for anything specified in destination_object. + an additive prefix for anything specified in destination_object. (templated) For example if folder ID ``xXyYzZ`` is called ``foo`` and the destination is ``bar/baz``, the file will end up in `foo/bar/baz`. This can be used to target an existing folder that is already visible to other users. The credentials @@ -88,6 +88,7 @@ class GCSToGoogleDriveOperator(BaseOperator): "source_object", "destination_object", "impersonation_chain", + "destination_folder_id", ) ui_color = "#f0eee4" diff --git a/providers/google/tests/system/google/cloud/gcs/example_gcs_to_gdrive.py b/providers/google/tests/system/google/cloud/gcs/example_gcs_to_gdrive.py index 2fac6ccd088c4..02bb0c0820db2 100644 --- a/providers/google/tests/system/google/cloud/gcs/example_gcs_to_gdrive.py +++ b/providers/google/tests/system/google/cloud/gcs/example_gcs_to_gdrive.py @@ -41,6 +41,7 @@ from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator +from airflow.providers.google.common.utils.get_secret import get_secret from airflow.providers.google.suite.hooks.drive import GoogleDriveHook from airflow.providers.google.suite.transfers.gcs_to_gdrive import GCSToGoogleDriveOperator @@ -71,6 +72,9 @@ LOCAL_PATH = str(Path("gcs")) FILE_LOCAL_PATH = str(Path(LOCAL_PATH)) FILE_NAME = "example_upload.txt" +GDRIVE_SECRET_ID = "gdrive_shared_folder_id" +GDRIVE_ID = "{{ task_instance.xcom_pull('get_shared_drive_id') }}" + log = logging.getLogger(__name__) @@ -80,7 +84,14 @@ start_date=datetime(2021, 1, 1), catchup=False, tags=["example", "gcs", "gdrive"], + render_template_as_native_obj=True, ) as dag: + @task + def get_shared_drive_id() -> str: + return get_secret(secret_id=GDRIVE_SECRET_ID).strip() + + + get_shared_drive_id_task = get_shared_drive_id() @task def create_connection(connection_id: str): @@ -124,6 +135,7 @@ def create_connection(connection_id: str): source_bucket=BUCKET_NAME, source_object=f"{TMP_PATH}/{FILE_NAME}", destination_object=f"{WORK_DIR}/copied_{FILE_NAME}", + destination_folder_id=GDRIVE_ID, ) # [END howto_operator_gcs_to_gdrive_copy_single_file] @@ -134,7 +146,7 @@ def create_connection(connection_id: str): source_bucket=BUCKET_NAME, source_object=f"{TMP_PATH}/{FILE_NAME}", destination_object=f"{WORK_DIR}/copied_{FILE_NAME}", - destination_folder_id=FOLDER_ID, + destination_folder_id=GDRIVE_ID, ) # [END howto_operator_gcs_to_gdrive_copy_single_file_into_folder] @@ -145,6 +157,7 @@ def create_connection(connection_id: str): source_bucket=BUCKET_NAME, source_object=f"{TMP_PATH}/*", destination_object=f"{WORK_DIR}/", + destination_folder_id=GDRIVE_ID, ) # [END howto_operator_gcs_to_gdrive_copy_files] @@ -155,23 +168,31 @@ def create_connection(connection_id: str): source_bucket=BUCKET_NAME, source_object=f"{TMP_PATH}/*.txt", destination_object=f"{WORK_DIR}/", + destination_folder_id=GDRIVE_ID, move_object=True, ) # [END howto_operator_gcs_to_gdrive_move_files] @task(trigger_rule=TriggerRule.ALL_DONE) - def remove_files_from_drive(): + def remove_files_from_drive(**context): + ti = context["ti"] service = GoogleDriveHook(gcp_conn_id=CONNECTION_ID).get_conn() root_path = ( service.files() - .list(q=f"name = '{WORK_DIR}' and mimeType = 'application/vnd.google-apps.folder'") + .list( + q=f"name = '{WORK_DIR}' and mimeType = 'application/vnd.google-apps.folder'", + corpora="drive", + driveId=ti.xcom_pull("get_shared_drive_id"), + includeItemsFromAllDrives=True, + supportsAllDrives=True, + ) .execute() ) if files := root_path["files"]: batch = service.new_batch_http_request() for file in files: - log.info("Preparing to remove file: {}", file) - batch.add(service.files().delete(fileId=file["id"])) + log.info("Deleting file %s...", file["name"]) + batch.add(service.files().delete(fileId=file["id"], supportsAllDrives=True)) batch.execute() log.info("Selected files removed.") @@ -188,7 +209,7 @@ def delete_connection(connection_id: str) -> None: delete_connection_task = delete_connection(connection_id=CONNECTION_ID) # TEST SETUP - create_bucket >> [upload_file_1, upload_file_2] + get_shared_drive_id_task >> create_bucket >> [upload_file_1, upload_file_2] ( [upload_file_1, upload_file_2, create_connection_task] # TEST BODY diff --git a/providers/google/tests/system/google/cloud/gcs/example_gdrive_to_gcs.py b/providers/google/tests/system/google/cloud/gcs/example_gdrive_to_gcs.py index 482d7b43003c0..14bf8452fad30 100644 --- a/providers/google/tests/system/google/cloud/gcs/example_gdrive_to_gcs.py +++ b/providers/google/tests/system/google/cloud/gcs/example_gdrive_to_gcs.py @@ -21,6 +21,7 @@ import logging import os from datetime import datetime +from functools import cache from typing import Any from airflow.models.dag import DAG @@ -35,6 +36,7 @@ from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator from airflow.providers.google.cloud.transfers.gdrive_to_gcs import GoogleDriveToGCSOperator +from airflow.providers.google.common.utils.get_secret import get_secret from airflow.providers.google.suite.hooks.drive import GoogleDriveHook from airflow.providers.google.suite.sensors.drive import GoogleDriveFileExistenceSensor from airflow.providers.google.suite.transfers.gcs_to_gdrive import GCSToGoogleDriveOperator @@ -64,6 +66,9 @@ FILE_NAME = "example_upload.txt" DRIVE_FILE_NAME = f"example_upload_{DAG_ID}_{ENV_ID}.txt" LOCAL_PATH = f"gcs/{FILE_NAME}" +GDRIVE_SECRET_ID = "gdrive_shared_folder_id" +GDRIVE_ID = "{{ task_instance.xcom_pull('get_shared_drive_id') }}" + log = logging.getLogger(__name__) @@ -73,7 +78,13 @@ start_date=datetime(2021, 1, 1), catchup=False, tags=["example", "gcs", "gdrive"], + render_template_as_native_obj=True, ) as dag: + @task + def get_shared_drive_id() -> str: + return get_secret(secret_id=GDRIVE_SECRET_ID).strip() + + get_shared_drive_id_task = get_shared_drive_id() @task def create_connection(connection_id: str): @@ -108,6 +119,7 @@ def create_connection(connection_id: str): source_bucket=BUCKET_NAME, source_object=FILE_NAME, destination_object=DRIVE_FILE_NAME, + destination_folder_id=GDRIVE_ID, ) # [START detect_file] @@ -115,6 +127,7 @@ def create_connection(connection_id: str): task_id="detect_file", folder_id=FOLDER_ID, file_name=DRIVE_FILE_NAME, + drive_id=GDRIVE_ID, gcp_conn_id=CONNECTION_ID, ) # [END detect_file] @@ -124,6 +137,7 @@ def create_connection(connection_id: str): task_id="upload_gdrive_object_to_gcs", gcp_conn_id=CONNECTION_ID, folder_id=FOLDER_ID, + drive_id=GDRIVE_ID, file_name=DRIVE_FILE_NAME, bucket_name=BUCKET_NAME, object_name=OBJECT, @@ -131,13 +145,24 @@ def create_connection(connection_id: str): # [END upload_gdrive_to_gcs] @task(trigger_rule=TriggerRule.ALL_DONE) - def remove_files_from_drive(): + def remove_files_from_drive(**context): + ti = context["ti"] service = GoogleDriveHook(gcp_conn_id=CONNECTION_ID).get_conn() - response = service.files().list(q=f"name = '{DRIVE_FILE_NAME}'").execute() + response = ( + service.files() + .list( + q=f"name = '{DRIVE_FILE_NAME}'", + corpora="drive", + driveId=ti.xcom_pull("get_shared_drive_id"), + includeItemsFromAllDrives=True, + supportsAllDrives=True, + ) + .execute() + ) if files := response["files"]: file = files[0] - log.info("Deleting file {}...", file) - service.files().delete(fileId=file["id"]) + log.info("Deleting file %s...", file["name"]) + service.files().delete(fileId=file["id"], supportsAllDrives=True).execute() log.info("Done.") remove_files_from_drive_task = remove_files_from_drive() @@ -153,7 +178,7 @@ def delete_connection(connection_id: str) -> None: delete_connection_task = delete_connection(connection_id=CONNECTION_ID) ( - [create_bucket >> upload_file, create_connection_task] + [get_shared_drive_id_task >> create_bucket >> upload_file, create_connection_task] >> copy_single_file # TEST BODY >> detect_file