Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition for zip upload #110

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 68 additions & 29 deletions cluster_pack/filesystem.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from genericpath import isdir
import logging
from operator import ne
import os
import uuid
import pyarrow
import shutil
import types
Expand All @@ -18,6 +21,7 @@ def _makedirs(self: Any, path: str, exist_ok: bool = False) -> None:
bucket, _, _ = self.split_path(path)
if not self.exists(bucket):
self.mkdir(bucket)

S3FileSystem.makedirs = _makedirs
except (ModuleNotFoundError, ImportError):
pass
Expand All @@ -30,6 +34,7 @@ def _make_function(base_fs: Any, method_name: str) -> Any:
def f(*args: Any, **kwargs: Any) -> Any:
func = getattr(base_fs, method_name)
return func(*args, **kwargs)

return f


Expand All @@ -41,13 +46,17 @@ def _expose_methods(child_class: Any, base_class: Any, ignored: List[str] = [])
:param child_class: instance of child class to add the methods to
:param ignored: methods that will be redefined manually
"""
method_list = [func for func in dir(base_class)
if callable(getattr(base_class, func))
and not func.startswith("__")
and not [f for f in ignored if func.startswith(f)]]
method_list = [
func
for func in dir(base_class)
if callable(getattr(base_class, func))
and not func.startswith("__")
and not [f for f in ignored if func.startswith(f)]
]
for method_name in method_list:
_logger.debug(f"add method impl from {type(base_class)}.{method_name}"
f" to {type(child_class)}")
_logger.debug(
f"add method impl from {type(base_class)}.{method_name}" f" to {type(child_class)}"
)
setattr(child_class, method_name, _make_function(base_class, method_name))


Expand All @@ -65,10 +74,27 @@ def _rm(self: Any, path: str, recursive: bool = False) -> None:
os.rmdir(path)


def _rename(self: Any, path: str, new_path: str) -> None:
if os.path.exists(new_path):
raise OSError(f"File exist at destination {new_path}")
os.rename(path, new_path)


def _s3_rename(self: Any, path: str, new_path: str) -> None:
if self.exists(new_path):
raise OSError(f"File exist at destination {new_path}")
self.fs.mv(path, new_path)


def _hdfs_st_mode(self: Any, path: str) -> int:
st_mode = self.ls(path, True)[0]["permissions"]
return int(st_mode)


def _preserve_acls(base_fs: Any, local_file: str, remote_file: str) -> None:
# this is useful for keeing pex excutable rights
if (isinstance(base_fs, pyarrow.filesystem.LocalFileSystem) or
isinstance(base_fs, pyarrow.hdfs.HadoopFileSystem)
if isinstance(base_fs, pyarrow.filesystem.LocalFileSystem) or isinstance(
base_fs, pyarrow.hdfs.HadoopFileSystem
):
st = os.stat(local_file)
base_fs.chmod(remote_file, st.st_mode & 0o777)
Expand All @@ -77,20 +103,16 @@ def _preserve_acls(base_fs: Any, local_file: str, remote_file: str) -> None:


class EnhancedHdfsFile(pyarrow.HdfsFile):

def __init__(self, base_hdfs_file: pyarrow.HdfsFile):
self.base_hdfs_file = base_hdfs_file
_expose_methods(
self,
base_hdfs_file,
ignored=["write", "readline", "readlines"])
_expose_methods(self, base_hdfs_file, ignored=["write", "readline", "readlines"])

def ensure_bytes(self, s: Any) -> bytes:
if isinstance(s, bytes):
return s
if hasattr(s, 'encode'):
if hasattr(s, "encode"):
return s.encode()
if hasattr(s, 'tobytes'):
if hasattr(s, "tobytes"):
return s.tobytes()
if isinstance(s, bytearray):
return bytes(s)
Expand All @@ -99,8 +121,8 @@ def ensure_bytes(self, s: Any) -> bytes:
def write(self, data: Any) -> None:
self.base_hdfs_file.write(self.ensure_bytes(data))

def _seek_delimiter(self, delimiter: bytes, blocksize: int = 2 ** 16) -> None:
""" Seek current file to next byte after a delimiter
def _seek_delimiter(self, delimiter: bytes, blocksize: int = 2**16) -> None:
"""Seek current file to next byte after a delimiter
from https://github.com/dask/hdfs3/blob/master/hdfs3/utils.py#L11

Parameters
Expand All @@ -110,7 +132,7 @@ def _seek_delimiter(self, delimiter: bytes, blocksize: int = 2 ** 16) -> None:
blocksize: int
number of bytes to read
"""
last = b''
last = b""
while True:
current = self.read(blocksize)
if not current:
Expand Down Expand Up @@ -185,34 +207,51 @@ def readlines(self, hint: int = None) -> List[bytes]:


class EnhancedFileSystem(filesystem.FileSystem):

def __init__(self, base_fs: Any):
self.base_fs = base_fs
if isinstance(base_fs, pyarrow.filesystem.LocalFileSystem):
base_fs.chmod = types.MethodType(_chmod, base_fs)
base_fs.rm = types.MethodType(_rm, base_fs)
base_fs.rename = types.MethodType(_rename, base_fs)

if isinstance(base_fs, pyarrow.filesystem.S3FSWrapper):
base_fs.rename = types.MethodType(_s3_rename, base_fs)

if isinstance(base_fs, pyarrow.hdfs.HadoopFileSystem):
base_fs.st_mode = types.MethodType(_hdfs_st_mode, base_fs)
_expose_methods(self, base_fs, ignored=["open"])

def put(self, filename: str, path: str, chunk: int = 2**16) -> None:
with self.base_fs.open(path, 'wb') as target:
with open(filename, 'rb') as source:
with self.base_fs.open(path, "wb") as target:
with open(filename, "rb") as source:
while True:
out = source.read(chunk)
if len(out) == 0:
break
target.write(out)
_preserve_acls(self.base_fs, filename, path)

def put_atomic(self, source_path: str, destination_path: str) -> None:
tmp_upload_dir = f"{destination_path}.{uuid.uuid4()}.tmp"
self.put(source_path, tmp_upload_dir)
try:
self.mv(tmp_upload_dir, destination_path)
except OSError as err:
raise FileExistsError(f"File exist at destination {destination_path}", err)
finally:
if self.exists(tmp_upload_dir):
self.rm(tmp_upload_dir, True)

def get(self, filename: str, path: str, chunk: int = 2**16) -> None:
with open(path, 'wb') as target:
with self.base_fs.open(filename, 'rb') as source:
with open(path, "wb") as target:
with self.base_fs.open(filename, "rb") as source:
while True:
out = source.read(chunk)
if len(out) == 0:
break
target.write(out)

def open(self, path: str, mode: str = 'rb') -> EnhancedHdfsFile:
def open(self, path: str, mode: str = "rb") -> EnhancedHdfsFile:
return EnhancedHdfsFile(self.base_fs.open(path, mode))


Expand All @@ -221,19 +260,19 @@ def resolve_filesystem_and_path(uri: str, **kwargs: Any) -> Tuple[EnhancedFileSy
fs_path = parsed_uri.path
# from https://github.com/apache/arrow/blob/master/python/pyarrow/filesystem.py#L419
# with viewfs support
if parsed_uri.scheme == 'hdfs' or parsed_uri.scheme == 'viewfs':
netloc_split = parsed_uri.netloc.split(':')
if parsed_uri.scheme == "hdfs" or parsed_uri.scheme == "viewfs":
netloc_split = parsed_uri.netloc.split(":")
host = netloc_split[0]
if host == '':
host = 'default'
if host == "":
host = "default"
else:
host = parsed_uri.scheme + "://" + host
port = 0
if len(netloc_split) == 2 and netloc_split[1].isnumeric():
port = int(netloc_split[1])

fs = EnhancedFileSystem(pyarrow.hdfs.connect(host=host, port=port))
elif parsed_uri.scheme == 's3' or parsed_uri.scheme == 's3a':
elif parsed_uri.scheme == "s3" or parsed_uri.scheme == "s3a":
fs = EnhancedFileSystem(pyarrow.filesystem.S3FSWrapper(S3FileSystem(**kwargs)))
else:
# Input is local path such as /home/user/myfile.parquet
Expand Down
Loading