Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class GCSToGoogleDriveOperator(BaseOperator):
copied to ``blah/baz``; to retain the prefix write the destination_object as e.g. ``blah/foo``, in
which case the copied file will be named ``blah/foo/baz``.
:param destination_folder_id: The folder ID where the destination objects will be placed. It is
an additive prefix for anything specified in destination_object.
an additive prefix for anything specified in destination_object. (templated)
For example if folder ID ``xXyYzZ`` is called ``foo`` and the destination is ``bar/baz``, the file
will end up in `foo/bar/baz`.
This can be used to target an existing folder that is already visible to other users. The credentials
Expand All @@ -88,6 +88,7 @@ class GCSToGoogleDriveOperator(BaseOperator):
"source_object",
"destination_object",
"impersonation_chain",
"destination_folder_id",
Copy link
Copy Markdown
Contributor

@shahar1 shahar1 May 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding this as template field should be mentioned in the title

)
ui_color = "#f0eee4"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
from airflow.providers.google.common.utils.get_secret import get_secret
from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
from airflow.providers.google.suite.transfers.gcs_to_gdrive import GCSToGoogleDriveOperator

Expand Down Expand Up @@ -71,6 +72,9 @@
LOCAL_PATH = str(Path("gcs"))
FILE_LOCAL_PATH = str(Path(LOCAL_PATH))
FILE_NAME = "example_upload.txt"
GDRIVE_SECRET_ID = "gdrive_shared_folder_id"
GDRIVE_ID = "{{ task_instance.xcom_pull('get_shared_drive_id') }}"


log = logging.getLogger(__name__)

Expand All @@ -80,7 +84,14 @@
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example", "gcs", "gdrive"],
render_template_as_native_obj=True,
) as dag:
@task
def get_shared_drive_id() -> str:
return get_secret(secret_id=GDRIVE_SECRET_ID).strip()


get_shared_drive_id_task = get_shared_drive_id()

@task
def create_connection(connection_id: str):
Expand Down Expand Up @@ -124,6 +135,7 @@ def create_connection(connection_id: str):
source_bucket=BUCKET_NAME,
source_object=f"{TMP_PATH}/{FILE_NAME}",
destination_object=f"{WORK_DIR}/copied_{FILE_NAME}",
destination_folder_id=GDRIVE_ID,
)
# [END howto_operator_gcs_to_gdrive_copy_single_file]

Expand All @@ -134,7 +146,7 @@ def create_connection(connection_id: str):
source_bucket=BUCKET_NAME,
source_object=f"{TMP_PATH}/{FILE_NAME}",
destination_object=f"{WORK_DIR}/copied_{FILE_NAME}",
destination_folder_id=FOLDER_ID,
destination_folder_id=GDRIVE_ID,
)
# [END howto_operator_gcs_to_gdrive_copy_single_file_into_folder]

Expand All @@ -145,6 +157,7 @@ def create_connection(connection_id: str):
source_bucket=BUCKET_NAME,
source_object=f"{TMP_PATH}/*",
destination_object=f"{WORK_DIR}/",
destination_folder_id=GDRIVE_ID,
)
# [END howto_operator_gcs_to_gdrive_copy_files]

Expand All @@ -155,23 +168,31 @@ def create_connection(connection_id: str):
source_bucket=BUCKET_NAME,
source_object=f"{TMP_PATH}/*.txt",
destination_object=f"{WORK_DIR}/",
destination_folder_id=GDRIVE_ID,
move_object=True,
)
# [END howto_operator_gcs_to_gdrive_move_files]

@task(trigger_rule=TriggerRule.ALL_DONE)
def remove_files_from_drive():
def remove_files_from_drive(**context):
ti = context["ti"]
service = GoogleDriveHook(gcp_conn_id=CONNECTION_ID).get_conn()
root_path = (
service.files()
.list(q=f"name = '{WORK_DIR}' and mimeType = 'application/vnd.google-apps.folder'")
.list(
q=f"name = '{WORK_DIR}' and mimeType = 'application/vnd.google-apps.folder'",
corpora="drive",
driveId=ti.xcom_pull("get_shared_drive_id"),
includeItemsFromAllDrives=True,
supportsAllDrives=True,
)
.execute()
)
if files := root_path["files"]:
batch = service.new_batch_http_request()
for file in files:
log.info("Preparing to remove file: {}", file)
batch.add(service.files().delete(fileId=file["id"]))
log.info("Deleting file %s...", file["name"])
batch.add(service.files().delete(fileId=file["id"], supportsAllDrives=True))
batch.execute()
log.info("Selected files removed.")

Expand All @@ -188,7 +209,7 @@ def delete_connection(connection_id: str) -> None:
delete_connection_task = delete_connection(connection_id=CONNECTION_ID)

# TEST SETUP
create_bucket >> [upload_file_1, upload_file_2]
get_shared_drive_id_task >> create_bucket >> [upload_file_1, upload_file_2]
(
[upload_file_1, upload_file_2, create_connection_task]
# TEST BODY
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import logging
import os
from datetime import datetime
from functools import cache
from typing import Any

from airflow.models.dag import DAG
Expand All @@ -35,6 +36,7 @@
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
from airflow.providers.google.cloud.transfers.gdrive_to_gcs import GoogleDriveToGCSOperator
from airflow.providers.google.common.utils.get_secret import get_secret
from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
from airflow.providers.google.suite.sensors.drive import GoogleDriveFileExistenceSensor
from airflow.providers.google.suite.transfers.gcs_to_gdrive import GCSToGoogleDriveOperator
Expand Down Expand Up @@ -64,6 +66,9 @@
FILE_NAME = "example_upload.txt"
DRIVE_FILE_NAME = f"example_upload_{DAG_ID}_{ENV_ID}.txt"
LOCAL_PATH = f"gcs/{FILE_NAME}"
GDRIVE_SECRET_ID = "gdrive_shared_folder_id"
GDRIVE_ID = "{{ task_instance.xcom_pull('get_shared_drive_id') }}"


log = logging.getLogger(__name__)

Expand All @@ -73,7 +78,13 @@
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example", "gcs", "gdrive"],
render_template_as_native_obj=True,
) as dag:
@task
def get_shared_drive_id() -> str:
return get_secret(secret_id=GDRIVE_SECRET_ID).strip()

get_shared_drive_id_task = get_shared_drive_id()

@task
def create_connection(connection_id: str):
Expand Down Expand Up @@ -108,13 +119,15 @@ def create_connection(connection_id: str):
source_bucket=BUCKET_NAME,
source_object=FILE_NAME,
destination_object=DRIVE_FILE_NAME,
destination_folder_id=GDRIVE_ID,
)

# [START detect_file]
detect_file = GoogleDriveFileExistenceSensor(
task_id="detect_file",
folder_id=FOLDER_ID,
file_name=DRIVE_FILE_NAME,
drive_id=GDRIVE_ID,
gcp_conn_id=CONNECTION_ID,
)
# [END detect_file]
Expand All @@ -124,20 +137,32 @@ def create_connection(connection_id: str):
task_id="upload_gdrive_object_to_gcs",
gcp_conn_id=CONNECTION_ID,
folder_id=FOLDER_ID,
drive_id=GDRIVE_ID,
file_name=DRIVE_FILE_NAME,
bucket_name=BUCKET_NAME,
object_name=OBJECT,
)
# [END upload_gdrive_to_gcs]

@task(trigger_rule=TriggerRule.ALL_DONE)
def remove_files_from_drive():
def remove_files_from_drive(**context):
ti = context["ti"]
service = GoogleDriveHook(gcp_conn_id=CONNECTION_ID).get_conn()
response = service.files().list(q=f"name = '{DRIVE_FILE_NAME}'").execute()
response = (
service.files()
.list(
q=f"name = '{DRIVE_FILE_NAME}'",
corpora="drive",
driveId=ti.xcom_pull("get_shared_drive_id"),
includeItemsFromAllDrives=True,
supportsAllDrives=True,
)
.execute()
)
if files := response["files"]:
file = files[0]
log.info("Deleting file {}...", file)
service.files().delete(fileId=file["id"])
log.info("Deleting file %s...", file["name"])
service.files().delete(fileId=file["id"], supportsAllDrives=True).execute()
log.info("Done.")

remove_files_from_drive_task = remove_files_from_drive()
Expand All @@ -153,7 +178,7 @@ def delete_connection(connection_id: str) -> None:
delete_connection_task = delete_connection(connection_id=CONNECTION_ID)

(
[create_bucket >> upload_file, create_connection_task]
[get_shared_drive_id_task >> create_bucket >> upload_file, create_connection_task]
>> copy_single_file
# TEST BODY
>> detect_file
Expand Down
Loading