diff --git a/airflow/providers/google/suite/transfers/gcs_to_gdrive.py b/airflow/providers/google/suite/transfers/gcs_to_gdrive.py index 9220b6b3b59d9..0915906b1c3bb 100644 --- a/airflow/providers/google/suite/transfers/gcs_to_gdrive.py +++ b/airflow/providers/google/suite/transfers/gcs_to_gdrive.py @@ -62,12 +62,15 @@ class GCSToGoogleDriveOperator(BaseOperator): For example, with prefix ``foo/*`` and destination_object ``blah/``, the file ``foo/baz`` will be copied to ``blah/baz``; to retain the prefix write the destination_object as e.g. ``blah/foo``, in which case the copied file will be named ``blah/foo/baz``. + :param destination_folder_id: The folder ID where the destination objects will be placed. It is + an additive prefix for anything specified in destination_object. + For example if folder ID ``xXyYzZ`` is called ``foo`` and the destination is ``bar/baz``, the file + will end up in `foo/bar/baz`. + This can be used to target an existing folder that is already visible to other users. The credentials + provided must have access to this folder. :param move_object: When move object is True, the object is moved instead of copied to the new location. This is the equivalent of a mv command as opposed to a cp command. :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud. - :param delegate_to: The account to impersonate using domain-wide delegation of authority, - if any. For this to work, the service account making the request must have - domain-wide delegation enabled. :param impersonation_chain: Optional service account to impersonate using short-term credentials, or chained list of accounts required to get the access_token of the last account in the list, which will be impersonated in the request. @@ -92,6 +95,7 @@ def __init__( source_bucket: str, source_object: str, destination_object: str | None = None, + destination_folder_id: str | None = None, move_object: bool = False, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, @@ -102,6 +106,7 @@ def __init__( self.source_bucket = source_bucket self.source_object = source_object self.destination_object = destination_object + self.destination_folder_id = destination_folder_id self.move_object = move_object self.gcp_conn_id = gcp_conn_id self.impersonation_chain = impersonation_chain @@ -161,7 +166,11 @@ def _copy_single_object(self, source_object, destination_object): self.gcs_hook.download( bucket_name=self.source_bucket, object_name=source_object, filename=filename ) - self.gdrive_hook.upload_file(local_location=filename, remote_location=destination_object) + self.gdrive_hook.upload_file( + local_location=filename, + remote_location=destination_object, + folder_id=self.destination_folder_id, + ) if self.move_object: self.gcs_hook.delete(self.source_bucket, source_object) diff --git a/docs/apache-airflow-providers-google/operators/transfer/gcs_to_gdrive.rst b/docs/apache-airflow-providers-google/operators/transfer/gcs_to_gdrive.rst index 9ed1cd58a8c58..0d59a22915a93 100644 --- a/docs/apache-airflow-providers-google/operators/transfer/gcs_to_gdrive.rst +++ b/docs/apache-airflow-providers-google/operators/transfer/gcs_to_gdrive.rst @@ -53,6 +53,17 @@ The following Operator would copy a single file. :start-after: [START howto_operator_gcs_to_gdrive_copy_single_file] :end-before: [END howto_operator_gcs_to_gdrive_copy_single_file] +Copy into an existing folder +---------------------------- + +The following Operator would copy a single file into an existing folder with the specified ID. + +.. exampleinclude:: /../../tests/system/providers/google/cloud/gcs/example_gcs_to_gdrive.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_gcs_to_gdrive_copy_single_file_into_folder] + :end-before: [END howto_operator_gcs_to_gdrive_copy_single_file_into_folder] + Copy multiple files ------------------- diff --git a/tests/providers/google/suite/transfers/test_gcs_to_gdrive.py b/tests/providers/google/suite/transfers/test_gcs_to_gdrive.py index 525f20398ab36..47d1f6e038c10 100644 --- a/tests/providers/google/suite/transfers/test_gcs_to_gdrive.py +++ b/tests/providers/google/suite/transfers/test_gcs_to_gdrive.py @@ -64,7 +64,52 @@ def test_should_copy_single_file(self, mock_named_temporary_file, mock_gdrive, m impersonation_chain=None, ), mock.call().upload_file( - local_location="TMP1", remote_location="copied_sales/2017/january-backup.avro" + local_location="TMP1", + remote_location="copied_sales/2017/january-backup.avro", + folder_id=None, + ), + ] + ) + + @mock.patch(MODULE + ".GCSHook") + @mock.patch(MODULE + ".GoogleDriveHook") + @mock.patch(MODULE + ".tempfile.NamedTemporaryFile") + def test_should_copy_single_file_with_folder(self, mock_named_temporary_file, mock_gdrive, mock_gcs_hook): + type(mock_named_temporary_file.return_value.__enter__.return_value).name = mock.PropertyMock( + side_effect=["TMP1"] + ) + task = GCSToGoogleDriveOperator( + task_id="copy_single_file", + source_bucket="data", + source_object="sales/sales-2017/january.avro", + destination_object="copied_sales/2017/january-backup.avro", + destination_folder_id="aAopls6bE4tUllZVGJvRUU", + ) + + task.execute(mock.MagicMock()) + + mock_gcs_hook.assert_has_calls( + [ + mock.call( + gcp_conn_id="google_cloud_default", + impersonation_chain=None, + ), + mock.call().download( + bucket_name="data", filename="TMP1", object_name="sales/sales-2017/january.avro" + ), + ] + ) + + mock_gdrive.assert_has_calls( + [ + mock.call( + gcp_conn_id="google_cloud_default", + impersonation_chain=None, + ), + mock.call().upload_file( + local_location="TMP1", + remote_location="copied_sales/2017/january-backup.avro", + folder_id="aAopls6bE4tUllZVGJvRUU", ), ] ) @@ -110,9 +155,15 @@ def test_should_copy_files(self, mock_named_temporary_file, mock_gdrive, mock_gc gcp_conn_id="google_cloud_default", impersonation_chain=IMPERSONATION_CHAIN, ), - mock.call().upload_file(local_location="TMP1", remote_location="sales/A.avro"), - mock.call().upload_file(local_location="TMP2", remote_location="sales/B.avro"), - mock.call().upload_file(local_location="TMP3", remote_location="sales/C.avro"), + mock.call().upload_file( + local_location="TMP1", remote_location="sales/A.avro", folder_id=None + ), + mock.call().upload_file( + local_location="TMP2", remote_location="sales/B.avro", folder_id=None + ), + mock.call().upload_file( + local_location="TMP3", remote_location="sales/C.avro", folder_id=None + ), ] ) @@ -158,9 +209,15 @@ def test_should_move_files(self, mock_named_temporary_file, mock_gdrive, mock_gc gcp_conn_id="google_cloud_default", impersonation_chain=IMPERSONATION_CHAIN, ), - mock.call().upload_file(local_location="TMP1", remote_location="sales/A.avro"), - mock.call().upload_file(local_location="TMP2", remote_location="sales/B.avro"), - mock.call().upload_file(local_location="TMP3", remote_location="sales/C.avro"), + mock.call().upload_file( + local_location="TMP1", remote_location="sales/A.avro", folder_id=None + ), + mock.call().upload_file( + local_location="TMP2", remote_location="sales/B.avro", folder_id=None + ), + mock.call().upload_file( + local_location="TMP3", remote_location="sales/C.avro", folder_id=None + ), ] ) diff --git a/tests/system/providers/google/cloud/gcs/example_gcs_to_gdrive.py b/tests/system/providers/google/cloud/gcs/example_gcs_to_gdrive.py index e9b1c3a041a94..c3371a2c3c111 100644 --- a/tests/system/providers/google/cloud/gcs/example_gcs_to_gdrive.py +++ b/tests/system/providers/google/cloud/gcs/example_gcs_to_gdrive.py @@ -35,6 +35,7 @@ ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") +FOLDER_ID = os.environ.get("GCP_GDRIVE_FOLDER_ID", "abcd1234") DAG_ID = "example_gcs_to_gdrive" @@ -82,6 +83,16 @@ ) # [END howto_operator_gcs_to_gdrive_copy_single_file] + # [START howto_operator_gcs_to_gdrive_copy_single_file_into_folder] + copy_single_file_into_folder = GCSToGoogleDriveOperator( + task_id="copy_single_file_into_folder", + source_bucket=BUCKET_NAME, + source_object=f"{TMP_PATH}/{FILE_NAME}", + destination_object=f"copied_tmp/copied_{FILE_NAME}", + destination_folder_id=FOLDER_ID, + ) + # [END howto_operator_gcs_to_gdrive_copy_single_file_into_folder] + # [START howto_operator_gcs_to_gdrive_copy_files] copy_files = GCSToGoogleDriveOperator( task_id="copy_files",