Skip to content

Commit

Permalink
Bugfix GCSToGCSOperator when copy files to folder without wildcard (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
moiseenkov committed Jul 11, 2023
1 parent 86c6cc9 commit 2ad91a7
Show file tree
Hide file tree
Showing 4 changed files with 217 additions and 10 deletions.
48 changes: 38 additions & 10 deletions airflow/providers/google/cloud/transfers/gcs_to_gcs.py
Expand Up @@ -106,11 +106,12 @@ class GCSToGCSOperator(BaseOperator):
source_objects=['sales/sales-2017/january.avro'],
destination_bucket='data_backup',
destination_object='copied_sales/2017/january-backup.avro',
exact_match=True,
gcp_conn_id=google_cloud_conn_id
)
The following Operator would copy all the Avro files from ``sales/sales-2017``
folder (i.e. with names starting with that prefix) in ``data`` bucket to the
folder (i.e. all files with names starting with that prefix) in ``data`` bucket to the
``copied_sales/2017`` folder in the ``data_backup`` bucket. ::
copy_files = GCSToGCSOperator(
Expand All @@ -135,7 +136,7 @@ class GCSToGCSOperator(BaseOperator):
)
The following Operator would move all the Avro files from ``sales/sales-2017``
folder (i.e. with names starting with that prefix) in ``data`` bucket to the
folder (i.e. all files with names starting with that prefix) in ``data`` bucket to the
same folder in the ``data_backup`` bucket, deleting the original files in the
process. ::
Expand Down Expand Up @@ -314,9 +315,11 @@ def _copy_source_without_wildcard(self, hook, prefix):
"""
List all files in source_objects, copy files to destination_object, and rename each source file.
For source_objects with no wildcard, this operator would first list all
files in source_objects, using provided delimiter if any. Then copy files
from source_objects to destination_object and rename each source file.
For source_objects with no wildcard, this operator would first list
all files in source_objects, using provided delimiter if any. Then copy
files from source_objects to destination_object and rename each source
file. Note that if the flag exact_match=False, then each item in the source_objects
(or source_object itself) will be considered as a prefix for the source objects search.
Example 1:
Expand Down Expand Up @@ -366,6 +369,22 @@ def _copy_source_without_wildcard(self, hook, prefix):
destination_object='b/',
gcp_conn_id=google_cloud_conn_id
)
Example 4:
The following Operator would copy files corresponding to the prefix 'a/foo.txt'
(a/foo.txt, a/foo.txt.abc, a/foo.txt/subfolder/file.txt) in ``data`` bucket to
the ``b/`` folder in the ``data_backup`` bucket
(b/foo.txt, b/foo.txt.abc, b/foo.txt/subfolder/file.txt) ::
copy_files = GCSToGCSOperator(
task_id='copy_files_without_wildcard',
source_bucket='data',
source_object='a/foo.txt',
destination_bucket='data_backup',
destination_object='b/',
gcp_conn_id=google_cloud_conn_id
)
"""
objects = hook.list(
self.source_bucket, prefix=prefix, delimiter=self.delimiter, match_glob=self.match_glob
Expand All @@ -390,11 +409,10 @@ def _copy_source_without_wildcard(self, hook, prefix):
msg = f"{prefix} does not exist in bucket {self.source_bucket}"
self.log.warning(msg)
raise AirflowException(msg)

if len(objects) == 1 and objects[0][-1] != "/":
self._copy_file(hook=hook, source_object=objects[0])
elif len(objects):
self._copy_directory(hook=hook, source_objects=objects, prefix=prefix)
self._copy_multiple_objects(hook=hook, source_objects=objects, prefix=prefix)

def _copy_file(self, hook, source_object):
destination_object = self.destination_object or source_object
Expand All @@ -405,15 +423,25 @@ def _copy_file(self, hook, source_object):
hook=hook, source_object=source_object, destination_object=destination_object
)

def _copy_directory(self, hook, source_objects, prefix):
_prefix = prefix.rstrip("/") + "/"
def _copy_multiple_objects(self, hook, source_objects, prefix):
# Check whether the prefix is a root directory for all the rest of objects.
_pref = prefix.rstrip("/")
is_directory = prefix.endswith("/") or all(
[obj.replace(_pref, "", 1).startswith("/") for obj in source_objects]
)

if is_directory:
base_path = prefix.rstrip("/") + "/"
else:
base_path = prefix[0 : prefix.rfind("/") + 1] if "/" in prefix else ""

for source_obj in source_objects:
if not self._check_exact_match(source_obj, prefix):
continue
if self.destination_object is None:
destination_object = source_obj
else:
file_name_postfix = source_obj.replace(_prefix, "", 1)
file_name_postfix = source_obj.replace(base_path, "", 1)
destination_object = self.destination_object.rstrip("/") + "/" + file_name_postfix

self._copy_single_object(
Expand Down
Expand Up @@ -97,6 +97,9 @@ Copy single file
----------------

The following example would copy a single file, ``OBJECT_1`` from the ``BUCKET_1_SRC`` GCS bucket to the ``BUCKET_1_DST`` bucket.
Note that if the flag ``exact_match=False`` then the ``source_object`` will be considered as a prefix for search objects
in the ``BUCKET_1_SRC`` GCS bucket. That's why if any will be found, they will be copied as well. To prevent this from
happening, please use ``exact_match=False``.

.. exampleinclude:: /../../tests/system/providers/google/cloud/gcs/example_gcs_to_gcs.py
:language: python
Expand Down Expand Up @@ -165,6 +168,9 @@ Move single file
----------------

Supplying ``True`` to the ``move`` argument causes the operator to delete ``source_object`` once the copy is complete.
Note that if the flag ``exact_match=False`` then the ``source_object`` will be considered as a prefix for search objects
in the ``BUCKET_1_SRC`` GCS bucket. That's why if any will be found, they will be copied as well. To prevent this from
happening, please use ``exact_match=False``.

.. exampleinclude:: /../../tests/system/providers/google/cloud/gcs/example_gcs_to_gcs.py
:language: python
Expand Down
171 changes: 171 additions & 0 deletions tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
Expand Up @@ -656,3 +656,174 @@ def test_execute_source_object_required_flag_true(self, mock_hook):
AirflowException, match=f"{SOURCE_OBJECTS_SINGLE_FILE} does not exist in bucket {TEST_BUCKET}"
):
operator.execute(None)

@pytest.mark.parametrize(
"existing_objects, source_object, match_glob, exact_match, expected_source_objects, "
"expected_destination_objects",
[
(["source/foo.txt"], "source/foo.txt", None, True, ["source/foo.txt"], ["{prefix}/foo.txt"]),
(["source/foo.txt"], "source/foo.txt", None, False, ["source/foo.txt"], ["{prefix}/foo.txt"]),
(["source/foo.txt"], "source", None, False, ["source/foo.txt"], ["{prefix}/foo.txt"]),
(["source/foo.txt"], "source/", None, False, ["source/foo.txt"], ["{prefix}/foo.txt"]),
(["source/foo.txt"], "source/*", None, False, ["source/foo.txt"], ["{prefix}/foo.txt"]),
(["source/foo.txt"], "source/foo.*", None, False, ["source/foo.txt"], ["{prefix}/txt"]),
(["source/foo.txt"], "source/", "**/foo*", False, ["source/foo.txt"], ["{prefix}/foo.txt"]),
(["source/foo.txt"], "source/", "**/foo.txt", False, ["source/foo.txt"], ["{prefix}/foo.txt"]),
(
["source/foo.txt", "source/foo.txt.abc"],
"source/foo.txt",
None,
True,
["source/foo.txt"],
["{prefix}/foo.txt"],
),
(
["source/foo.txt", "source/foo.txt.abc"],
"source/foo.txt",
None,
False,
["source/foo.txt", "source/foo.txt.abc"],
["{prefix}/foo.txt", "{prefix}/foo.txt.abc"],
),
(
["source/foo.txt", "source/foo.txt.abc"],
"source",
None,
False,
["source/foo.txt", "source/foo.txt.abc"],
["{prefix}/foo.txt", "{prefix}/foo.txt.abc"],
),
(
["source/foo.txt", "source/foo.txt.abc"],
"source/",
None,
False,
["source/foo.txt", "source/foo.txt.abc"],
["{prefix}/foo.txt", "{prefix}/foo.txt.abc"],
),
(
["source/foo.txt", "source/foo.txt.abc"],
"source/*",
None,
False,
["source/foo.txt", "source/foo.txt.abc"],
["{prefix}/foo.txt", "{prefix}/foo.txt.abc"],
),
(
["source/foo.txt", "source/foo.txt.abc"],
"source/foo.*",
None,
False,
["source/foo.txt", "source/foo.txt.abc"],
["{prefix}/txt", "{prefix}/txt.abc"],
),
(
["source/foo.txt", "source/foo.txt.abc"],
"source/",
"**/foo*",
False,
["source/foo.txt", "source/foo.txt.abc"],
["{prefix}/foo.txt", "{prefix}/foo.txt.abc"],
),
(
["source/foo.txt", "source/foo.txt.abc"],
"source/",
"**/foo.txt",
False,
["source/foo.txt"],
["{prefix}/foo.txt"],
),
(
["source/foo.txt", "source/foo.txt.abc", "source/foo.txt/subfolder/file.txt"],
"source/foo.txt",
None,
True,
["source/foo.txt"],
["{prefix}/foo.txt"],
),
(
["source/foo.txt", "source/foo.txt.abc", "source/foo.txt/subfolder/file.txt"],
"source/foo.txt",
None,
False,
["source/foo.txt", "source/foo.txt.abc", "source/foo.txt/subfolder/file.txt"],
["{prefix}/foo.txt", "{prefix}/foo.txt.abc", "{prefix}/foo.txt/subfolder/file.txt"],
),
(
["source/foo.txt", "source/foo.txt.abc", "source/foo.txt/subfolder/file.txt"],
"source",
None,
False,
["source/foo.txt", "source/foo.txt.abc", "source/foo.txt/subfolder/file.txt"],
["{prefix}/foo.txt", "{prefix}/foo.txt.abc", "{prefix}/foo.txt/subfolder/file.txt"],
),
(
["source/foo.txt", "source/foo.txt.abc", "source/foo.txt/subfolder/file.txt"],
"source/",
None,
False,
["source/foo.txt", "source/foo.txt.abc", "source/foo.txt/subfolder/file.txt"],
["{prefix}/foo.txt", "{prefix}/foo.txt.abc", "{prefix}/foo.txt/subfolder/file.txt"],
),
(
["source/foo.txt", "source/foo.txt.abc", "source/foo.txt/subfolder/file.txt"],
"source/*",
None,
False,
["source/foo.txt", "source/foo.txt.abc", "source/foo.txt/subfolder/file.txt"],
["{prefix}/foo.txt", "{prefix}/foo.txt.abc", "{prefix}/foo.txt/subfolder/file.txt"],
),
(
["source/foo.txt", "source/foo.txt.abc", "source/foo.txt/subfolder/file.txt"],
"source/foo.*",
None,
False,
["source/foo.txt", "source/foo.txt.abc", "source/foo.txt/subfolder/file.txt"],
["{prefix}/txt", "{prefix}/txt.abc", "{prefix}/txt/subfolder/file.txt"],
),
(
["source/foo.txt", "source/foo.txt.abc", "source/foo.txt/subfolder/file.txt"],
"source/",
"**/foo*",
False,
["source/foo.txt", "source/foo.txt.abc", "source/foo.txt/subfolder/file.txt"],
["{prefix}/foo.txt", "{prefix}/foo.txt.abc", "{prefix}/foo.txt/subfolder/file.txt"],
),
(
["source/foo.txt", "source/foo.txt.abc", "source/foo.txt/subfolder/file.txt"],
"source/",
"**/foo.txt",
False,
["source/foo.txt"],
["{prefix}/foo.txt"],
),
],
)
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
def test_copy_files_into_a_folder(
self,
mock_hook,
existing_objects,
source_object,
match_glob,
exact_match,
expected_source_objects,
expected_destination_objects,
):
mock_hook.return_value.list.return_value = existing_objects
operator = GCSToGCSOperator(
task_id=TASK_ID,
source_bucket=TEST_BUCKET,
source_object=source_object,
destination_bucket=DESTINATION_BUCKET,
destination_object=DESTINATION_OBJECT_PREFIX + "/",
exact_match=exact_match,
match_glob=match_glob,
)
operator.execute(None)

mock_calls = [
mock.call(TEST_BUCKET, src, DESTINATION_BUCKET, dst.format(prefix=DESTINATION_OBJECT_PREFIX))
for src, dst in zip(expected_source_objects, expected_destination_objects)
]
mock_hook.return_value.rewrite.assert_has_calls(mock_calls)
2 changes: 2 additions & 0 deletions tests/system/providers/google/cloud/gcs/example_gcs_to_gcs.py
Expand Up @@ -139,6 +139,7 @@
source_object=OBJECT_1,
destination_bucket=BUCKET_NAME_DST, # If not supplied the source_bucket value will be used
destination_object="backup_" + OBJECT_1, # If not supplied the source_object value will be used
exact_match=True,
)
# [END howto_operator_gcs_to_gcs_single_file]

Expand Down Expand Up @@ -201,6 +202,7 @@
source_object=OBJECT_1,
destination_bucket=BUCKET_NAME_DST,
destination_object="backup_" + OBJECT_1,
exact_match=True,
move_object=True,
)
# [END howto_operator_gcs_to_gcs_single_file_move]
Expand Down

0 comments on commit 2ad91a7

Please sign in to comment.