Skip to content

Commit

Permalink
[BEAM-13250] Factorise gcsio.GcsIO() from gcsfilesystem.
Browse files Browse the repository at this point in the history
  • Loading branch information
mlhamel committed Nov 15, 2021
1 parent fd00fc9 commit 01c6ea9
Showing 1 changed file with 19 additions and 10 deletions.
29 changes: 19 additions & 10 deletions sdks/python/apache_beam/io/gcp/gcsfilesystem.py
Expand Up @@ -135,7 +135,7 @@ def _path_open(
"""
compression_type = FileSystem._get_compression_type(path, compression_type)
mime_type = CompressionTypes.mime_type(compression_type, mime_type)
raw_file = gcsio.GcsIO().open(path, mode, mime_type=mime_type)
raw_file = self.get_gcsio().open(path, mode, mime_type=mime_type)
if compression_type == CompressionTypes.UNCOMPRESSED:
return raw_file
return CompressedFile(raw_file, compression_type=compression_type)
Expand Down Expand Up @@ -198,9 +198,9 @@ def _copy_path(source, destination):
raise ValueError('Destination %r must be GCS path.' % destination)
# Use copy_tree if the path ends with / as it is a directory
if source.endswith('/'):
gcsio.GcsIO().copytree(source, destination)
self.get_gcsio().copytree(source, destination)
else:
gcsio.GcsIO().copy(source, destination)
self.get_gcsio().copy(source, destination)

exceptions = {}
for source, destination in zip(source_file_names, destination_file_names):
Expand Down Expand Up @@ -241,15 +241,15 @@ def rename(self, source_file_names, destination_file_names):
# Execute GCS renames if any and return exceptions.
exceptions = {}
for batch in gcs_batches:
copy_statuses = gcsio.GcsIO().copy_batch(batch)
copy_statuses = self.get_gcsio().copy_batch(batch)
copy_succeeded = []
for src, dest, exception in copy_statuses:
if exception:
exceptions[(src, dest)] = exception
else:
copy_succeeded.append((src, dest))
delete_batch = [src for src, dest in copy_succeeded]
delete_statuses = gcsio.GcsIO().delete_batch(delete_batch)
delete_statuses = self.get_gcsio().delete_batch(delete_batch)
for i, (src, exception) in enumerate(delete_statuses):
dest = copy_succeeded[i][1]
if exception:
Expand All @@ -266,7 +266,7 @@ def exists(self, path):
Returns: boolean flag indicating if path exists
"""
return gcsio.GcsIO().exists(path)
return self.get_gcsio().exists(path)

def size(self, path):
"""Get size of path on the FileSystem.
Expand All @@ -279,7 +279,7 @@ def size(self, path):
Raises:
``BeamIOError``: if path doesn't exist.
"""
return gcsio.GcsIO().size(path)
return self.get_gcsio().size(path)

def last_updated(self, path):
"""Get UNIX Epoch time in seconds on the FileSystem.
Expand All @@ -292,7 +292,7 @@ def last_updated(self, path):
Raises:
``BeamIOError``: if path doesn't exist.
"""
return gcsio.GcsIO().last_updated(path)
return self.get_gcsio().last_updated(path)

def checksum(self, path):
"""Fetch checksum metadata of a file on the
Expand All @@ -307,7 +307,7 @@ def checksum(self, path):
``BeamIOError``: if path isn't a file or doesn't exist.
"""
try:
return gcsio.GcsIO().checksum(path)
return self.get_gcsio().checksum(path)
except Exception as e: # pylint: disable=broad-except
raise BeamIOError("Checksum operation failed", {path: e})

Expand All @@ -326,7 +326,7 @@ def _delete_path(path):
else:
path_to_use = path
match_result = self.match([path_to_use])[0]
statuses = gcsio.GcsIO().delete_batch(
statuses = self.get_gcsio().delete_batch(
[m.path for m in match_result.metadata_list])
# pylint: disable=used-before-assignment
failures = [e for (_, e) in statuses if e is not None]
Expand All @@ -342,3 +342,12 @@ def _delete_path(path):

if exceptions:
raise BeamIOError("Delete operation failed", exceptions)

@staticmethod
def get_gcsio():
"""
Get an instance of gcsio.GcsIO
Return:
A new instance of gcsio.GcsIO
"""
return gcsio.GcsIO()

0 comments on commit 01c6ea9

Please sign in to comment.