Skip to content

Commit

Permalink
Improve coding style
Browse files Browse the repository at this point in the history
  • Loading branch information
Martin Durant committed Apr 27, 2018
1 parent 8c39b12 commit aac6dae
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 119 deletions.
1 change: 1 addition & 0 deletions requirements.txt
@@ -1 +1,2 @@
botocore
boto3
162 changes: 100 additions & 62 deletions s3fs/core.py
Expand Up @@ -40,7 +40,7 @@ def tokenize(*args, **kwargs):
True
"""
if kwargs:
args = args + (kwargs,)
args += (kwargs,)
return md5(str(tuple(args)).encode()).hexdigest()


Expand All @@ -65,6 +65,7 @@ def split_path(path):
else:
return path.split('/', 1)


key_acls = {'private', 'public-read', 'public-read-write',
'authenticated-read', 'aws-exec-read', 'bucket-owner-read',
'bucket-owner-full-control'}
Expand Down Expand Up @@ -98,8 +99,8 @@ class S3FileSystem(object):
use_ssl : bool (True)
Whether to use SSL in connections to S3; may be faster without, but
insecure
s3_additional_kwargs : dict of parameters that are used when calling s3 api methods.
Typically used for things like "ServerSideEncryption".
s3_additional_kwargs : dict of parameters that are used when calling s3 api
methods. Typically used for things like "ServerSideEncryption".
client_kwargs : dict of parameters for the boto3 client
requester_pays : bool (False)
If RequesterPays buckets are supported.
Expand All @@ -110,8 +111,9 @@ class S3FileSystem(object):
Whether to use cache filling with open by default. Refer to
``S3File.open``.
version_aware : bool (False)
Whether to support bucket versioning. If enable this will require the user to
have the neccesary IAM permissions for dealing with versioned objects.
Whether to support bucket versioning. If enable this will require the
user to have the neccesary IAM permissions for dealing with versioned
objects.
config_kwargs : dict of parameters passed to ``botocore.client.Config``
kwargs : other parameters for boto3 session
Expand All @@ -134,8 +136,8 @@ class S3FileSystem(object):
def __init__(self, anon=False, key=None, secret=None, token=None,
use_ssl=True, client_kwargs=None, requester_pays=False,
default_block_size=None, default_fill_cache=True,
version_aware=False, config_kwargs=None, s3_additional_kwargs=None,
**kwargs):
version_aware=False, config_kwargs=None,
s3_additional_kwargs=None, **kwargs):
self.anon = anon
self.session = None
self.key = key
Expand Down Expand Up @@ -165,7 +167,8 @@ def _filter_kwargs(self, s3_method, kwargs):
return self._kwargs_helper.filter_dict(s3_method.__name__, kwargs)

def _call_s3(self, method, *akwarglist, **kwargs):
additional_kwargs = self._get_s3_method_kwargs(method, *akwarglist, **kwargs)
additional_kwargs = self._get_s3_method_kwargs(method, *akwarglist,
**kwargs)
return method(**additional_kwargs)

def _get_s3_method_kwargs(self, method, *akwarglist, **kwargs):
Expand Down Expand Up @@ -287,10 +290,12 @@ def open(self, path, mode='rb', block_size=None, acl='', version_id=None,
acl: str
Canned ACL to set when writing
version_id : str
Explicit version of the object to open. This requires that the s3 filesystem is
version aware and bucket versioning is enabled on the relavant bucket.
Explicit version of the object to open. This requires that the s3
filesystem is version aware and bucket versioning is enabled on the
relavant bucket.
kwargs: dict-like
Additional parameters used for s3 methods. Typically used for ServerSideEncryption.
Additional parameters used for s3 methods. Typically used for
ServerSideEncryption.
"""
if block_size is None:
block_size = self.default_block_size
Expand All @@ -303,9 +308,11 @@ def open(self, path, mode='rb', block_size=None, acl='', version_id=None,
kw = self.s3_additional_kwargs.copy()
kw.update(kwargs)
if not self.version_aware and version_id:
raise ValueError("version_id cannot be specified if the filesystem is not version aware")
return S3File(self, path, mode, block_size=block_size, acl=acl, version_id=version_id,
fill_cache=fill_cache, s3_additional_kwargs=kw)
raise ValueError("version_id cannot be specified if the filesystem "
"is not version aware")
return S3File(self, path, mode, block_size=block_size, acl=acl,
version_id=version_id, fill_cache=fill_cache,
s3_additional_kwargs=kw)

def _lsdir(self, path, refresh=False):
if path.startswith('s3://'):
Expand Down Expand Up @@ -361,13 +368,8 @@ def _ls(self, path, refresh=False):
----------
path : string/bytes
location at which to list files
detail : bool (=True)
if True, each list item is a dict of file properties;
otherwise, returns list of filenames
refresh : bool (=False)
if False, look in local cache for file details first
kwargs : dict
additional arguments passed on
"""
if path.startswith('s3://'):
path = path[len('s3://'):]
Expand All @@ -377,7 +379,20 @@ def _ls(self, path, refresh=False):
return self._lsdir(path, refresh)

def ls(self, path, detail=False, refresh=False, **kwargs):
""" List single "directory" with or without details """
""" List single "directory" with or without details
Parameters
----------
path : string/bytes
location at which to list files
detail : bool (=True)
if True, each list item is a dict of file properties;
otherwise, returns list of filenames
refresh : bool (=False)
if False, look in local cache for file details first
kwargs : dict
additional arguments passed on
"""
if path.startswith('s3://'):
path = path[len('s3://'):]
path = path.rstrip('/')
Expand Down Expand Up @@ -421,7 +436,8 @@ def info(self, path, version_id=None, refresh=False, **kwargs):
bucket, key = split_path(path)
if version_id is not None:
if not self.version_aware:
raise ValueError("version_id cannot be specified if the filesystem is not version aware")
raise ValueError("version_id cannot be specified if the "
"filesystem is not version aware")
kwargs['VersionId'] = version_id
out = self._call_s3(self.s3.head_object, kwargs, Bucket=bucket,
Key=key, **self.req_kw)
Expand All @@ -434,19 +450,21 @@ def info(self, path, version_id=None, refresh=False, **kwargs):
'VersionId': out.get('VersionId')
}
return out
except (ClientError, ParamValidationError) as e:
except (ClientError, ParamValidationError):
logger.debug("Failed to head path %s", path, exc_info=True)
raise FileNotFoundError(path)

def object_version_info(self, path, **kwargs):
if not self.version_aware:
raise ValueError("version specific functionality is disabled for non-version aware filesystems")
raise ValueError("version specific functionality is disabled for "
"non-version aware filesystems")
bucket, key = split_path(path)
kwargs = {}
out = {'IsTruncated': True}
versions = []
while out['IsTruncated']:
out = self._call_s3(self.s3.list_object_versions, kwargs, Bucket=bucket, Prefix=key, **self.req_kw)
out = self._call_s3(self.s3.list_object_versions, kwargs,
Bucket=bucket, Prefix=key, **self.req_kw)
versions.extend(out['Versions'])
kwargs['VersionIdMarker'] = out.get('NextVersionIdMarker', '')
return versions
Expand Down Expand Up @@ -507,14 +525,16 @@ def put_tags(self, path, tags, mode='o'):
mode:
One of 'o' or 'm'
'o': Will over-write any existing tags.
'm': Will merge in new tags with existing tags. Incurs two remote calls.
'm': Will merge in new tags with existing tags. Incurs two remote
calls.
"""
bucket, key = split_path(path)

if mode == 'm':
existing_tags = self.get_tags(path=path)
existing_tags.update(tags)
new_tags = [{'Key': k, 'Value': v} for k, v in existing_tags.items()]
new_tags = [{'Key': k, 'Value': v}
for k, v in existing_tags.items()]
elif mode == 'o':
new_tags = [{'Key': k, 'Value': v} for k, v in tags.items()]
else:
Expand All @@ -540,20 +560,25 @@ def getxattr(self, path, attr_name, **kwargs):
def setxattr(self, path, copy_kwargs=None, **kw_args):
""" Set metadata.
Attributes have to be of the form documented in the `Metadata Reference`_.
Attributes have to be of the form documented in the
`Metadata Reference`_.
Parameters
---------
kw_args : key-value pairs like field="value", where the values must be strings. Does not alter existing fields,
unless the field appears here - if the value is None, delete the field.
copy_args : dict, optional
dictionary of additional params to use for the underlying s3.copy_object.
kw_args : key-value pairs like field="value", where the values must be
strings. Does not alter existing fields, unless
the field appears here - if the value is None, delete the
field.
copy_kwargs : dict, optional
dictionary of additional params to use for the underlying
s3.copy_object.
Examples
--------
>>> mys3file.setxattr(attribute_1='value1', attribute_2='value2') # doctest: +SKIP
# Example for use with copy_args
>>> mys3file.setxattr(copy_kwargs={'ContentType': 'application/pdf'}, attribute_1='value1') # doctest: +SKIP
>>> mys3file.setxattr(copy_kwargs={'ContentType': 'application/pdf'},
... attribute_1='value1') # doctest: +SKIP
.. Metadata Reference:
Expand Down Expand Up @@ -713,9 +738,10 @@ def url(self, path, expires=3600, **kwargs):
the number of seconds this signature will be good for.
"""
bucket, key = split_path(path)
return self.s3.generate_presigned_url(ClientMethod='get_object',
Params=dict(Bucket=bucket, Key=key, **kwargs),
ExpiresIn=expires)
return self.s3.generate_presigned_url(
ClientMethod='get_object', Params=dict(Bucket=bucket, Key=key,
**kwargs),
ExpiresIn=expires)

def get(self, path, filename, **kwargs):
""" Stream data from file at path to local filename """
Expand Down Expand Up @@ -748,7 +774,8 @@ def rmdir(self, path, **kwargs):
path = path[len('s3://'):]
path = path.rstrip('/')
bucket, key = split_path(path)
if not self._ls(path) and ((key and self.info(path)['Size'] == 0) or not key):
if not self._ls(path) and ((key and self.info(path)['Size'] == 0)
or not key):
self.rm(path, **kwargs)
else:
raise IOError('Path is not directory-like', path)
Expand Down Expand Up @@ -779,14 +806,17 @@ def merge(self, path, filelist, **kwargs):
Key=key
)
out = [self._call_s3(
self.s3.upload_part_copy,
kwargs,
Bucket=bucket, Key=key, UploadId=mpu['UploadId'],
CopySource=f, PartNumber=i+1) for (i, f) in enumerate(filelist)]
parts = [{'PartNumber': i+1, 'ETag': o['CopyPartResult']['ETag']} for (i, o) in enumerate(out)]
self.s3.upload_part_copy,
kwargs,
Bucket=bucket, Key=key, UploadId=mpu['UploadId'],
CopySource=f, PartNumber=i + 1)
for (i, f) in enumerate(filelist)]
parts = [{'PartNumber': i + 1, 'ETag': o['CopyPartResult']['ETag']} for
(i, o) in enumerate(out)]
part_info = {'Parts': parts}
self.s3.complete_multipart_upload(Bucket=bucket, Key=key,
UploadId=mpu['UploadId'], MultipartUpload=part_info)
UploadId=mpu['UploadId'],
MultipartUpload=part_info)
self.invalidate_cache(path)

def copy_basic(self, path1, path2, **kwargs):
Expand Down Expand Up @@ -839,13 +869,14 @@ def bulk_delete(self, pathlist, **kwargs):
return
buckets = {split_path(path)[0] for path in pathlist}
if len(buckets) > 1:
raise ValueError("Bulk delete files should refer to only one bucket")
raise ValueError("Bulk delete files should refer to only one "
"bucket")
bucket = buckets.pop()
if len(pathlist) > 1000:
for i in range((len(pathlist) // 1000) + 1):
self.bulk_delete(pathlist[i*1000:(i+1)*1000])
return
delete_keys = {'Objects': [{'Key' : split_path(path)[1]} for path
delete_keys = {'Objects': [{'Key': split_path(path)[1]} for path
in pathlist]}
try:
self._call_s3(
Expand Down Expand Up @@ -971,8 +1002,8 @@ def read_block(self, fn, offset, length, delimiter=None, **kwargs):
length = size
if offset + length > size:
length = size - offset
bytes = read_block(f, offset, length, delimiter)
return bytes
b = read_block(f, offset, length, delimiter)
return b


class S3File(object):
Expand All @@ -995,8 +1026,9 @@ class S3File(object):
acl: str
Canned ACL to apply
version_id : str
Optional version to read the file at. If not specified this will default to the current
version of the object. This is only used for reading.
Optional version to read the file at. If not specified this will
default to the current version of the object. This is only used for
reading.
Examples
--------
Expand All @@ -1009,11 +1041,12 @@ class S3File(object):
S3FileSystem.open: used to create ``S3File`` objects
"""

def __init__(self, s3, path, mode='rb', block_size=5 * 2 ** 20, acl="", version_id=None,
fill_cache=True, s3_additional_kwargs=None):
def __init__(self, s3, path, mode='rb', block_size=5 * 2 ** 20, acl="",
version_id=None, fill_cache=True, s3_additional_kwargs=None):
self.mode = mode
if mode not in {'rb', 'wb', 'ab'}:
raise NotImplementedError("File mode must be {'rb', 'wb', 'ab'}, not %s" % mode)
raise NotImplementedError("File mode must be {'rb', 'wb', 'ab'}, "
"not %s" % mode)
if path.startswith('s3://'):
path = path[len('s3://'):]
self.path = path
Expand Down Expand Up @@ -1080,12 +1113,12 @@ def _call_s3(self, method, *kwarglist, **kwargs):

def info(self, **kwargs):
""" File information about this path """
# When the bucket is version aware we need to explicitly get the correct filesize and
# for the particular version. In the case where self.version_id is None, this will be the
# most recent version.
# When the bucket is version aware we need to explicitly get the
# correct filesize and for the particular version. In the case where
# self.version_id is None, this will be the most recent version.
refresh = self.s3.version_aware
return self.s3.info(self.path, version_id=self.version_id, refresh=refresh,
**kwargs)
return self.s3.info(self.path, version_id=self.version_id,
refresh=refresh, **kwargs)

def metadata(self, refresh=False, **kwargs):
""" Return metadata of file.
Expand Down Expand Up @@ -1115,7 +1148,8 @@ def setxattr(self, copy_kwargs=None, **kwargs):
>>> mys3file.setxattr(attribute_1='value1', attribute_2='value2') # doctest: +SKIP
"""
if self.mode != 'rb':
raise NotImplementedError('cannot update metadata while file is open for writing')
raise NotImplementedError('cannot update metadata while file '
'is open for writing')
return self.s3.setxattr(self.path, copy_kwargs=copy_kwargs, **kwargs)

def url(self, **kwargs):
Expand Down Expand Up @@ -1162,7 +1196,7 @@ def readline(self, length=-1):
self._fetch(self.loc, self.loc + 1)
while True:
found = self.cache[self.loc - self.start:].find(b'\n') + 1
if length > 0 and found > length:
if 0 < length < found:
return self.read(length)
if found:
return self.read(found)
Expand All @@ -1186,7 +1220,8 @@ def readlines(self):
return list(self)

def _fetch(self, start, end):
# if we have not enabled version_aware then we should use the latest version.
# if we have not enabled version_aware then we should use the
# latest version.
if self.s3.version_aware:
version_id = self.version_id
else:
Expand All @@ -1195,8 +1230,9 @@ def _fetch(self, start, end):
# First read
self.start = start
self.end = end + self.blocksize
self.cache = _fetch_range(self.s3.s3, self.bucket, self.key, version_id,
start, self.end, req_kw=self.s3.req_kw)
self.cache = _fetch_range(self.s3.s3, self.bucket, self.key,
version_id, start, self.end,
req_kw=self.s3.req_kw)
if start < self.start:
if not self.fill_cache and end + self.blocksize < self.start:
self.start, self.end = None, None
Expand Down Expand Up @@ -1247,7 +1283,8 @@ def write(self, data):
"""
Write data to buffer.
Buffer only sent to S3 on close() or if buffer is greater than or equal to blocksize.
Buffer only sent to S3 on close() or if buffer is greater than or equal
to blocksize.
Parameters
----------
Expand Down Expand Up @@ -1279,6 +1316,7 @@ def flush(self, force=False, retries=10):
force : bool
When closing, write the last block even if it is smaller than
blocks are allowed to be.
retries: int
"""
if self.mode in {'wb', 'ab'} and not self.closed:
if self.buffer.tell() < self.blocksize and not force:
Expand Down

0 comments on commit aac6dae

Please sign in to comment.