/
s3.py
262 lines (224 loc) · 10 KB
/
s3.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
import logging
from interface_meta import override
from omniduct.filesystems.base import FileSystemClient, FileSystemFileDesc
# Python 2 compatibility imports
try:
FileNotFoundError
except NameError:
FileNotFoundError = IOError
class S3Client(FileSystemClient):
"""
This Duct connects to an Amazon S3 bucket instance using the `boto3`
library. Authentication is (optionally) handled using `opinel`.
Attributes:
bucket (str): The name of the Amazon S3 bucket to use.
aws_profile (str): The name of configured AWS profile to use. This should
refer to the name of a profile configured in, for example,
`~/.aws/credentials`. Authentication is handled by the `opinel`
library, which is also aware of environment variables.
"""
PROTOCOLS = ['s3']
DEFAULT_PORT = 80
@override
def _init(self, bucket=None, aws_profile=None, use_opinel=False,
session=None, path_separator='/', skip_hadoop_artifacts=True):
"""
bucket (str): The name of the Amazon S3 bucket to use.
aws_profile (str): The name of configured AWS profile to use. This should
refer to the name of a profile configured in, for example,
`~/.aws/credentials`. Authentication is (optionally) handled by the
`opinel` library, which is also aware of environment variables.
use_opinel (bool): Use Opinel to extract AWS credentials. This is mainly
useful if you have used opinel to set up MFA. Note: Opinel must be
installed manually alongside omniduct to take advantage of this
feature.
session (botocore.session.Session): A pre-configured botocore Session
instance to use instead of creating a new one when this client
connects.
path_separator (str): Amazon S3 is essentially a key-based storage
system, and so one is free to choose an arbitrary "directory"
separator. This defaults to '/' for consistency with other
filesystems.
skip_hadoop_artifacts (bool): Whether to skip hadoop artifacts like
'*_$folder$' when enumerating directories (default=True).
Note 1: aws_profile, if specified, should be the name of a profile as
specified in ~/.aws/credentials. Authentication is handled by the
`opinel` library, which is also aware of environment variables.
Set up your command line aws client, and if it works, this should too.
Note 2: Some institutions have nuanced AWS configurations that with
configurations that are generated by scripts. It may be useful in these
environments to subclass `S3Client` and override the `_get_boto3_session`
method to suit your needs.
"""
assert bucket is not None, 'S3 Bucket must be specified using the `bucket` kwarg.'
self.bucket = bucket
self.aws_profile = aws_profile
self.use_opinel = use_opinel
self.skip_hadoop_artifacts = skip_hadoop_artifacts
self.__path_separator = path_separator
self._session = session
self._client = None
# Ensure self.host is updated with correct AWS region
import boto3
self.host = 'autoscaling.{}.amazonaws.com'.format(
(session or boto3.Session(profile_name=self.aws_profile)).region_name or 'us-east-1'
)
# Mask logging from botocore's vendored libraries
logging.getLogger('botocore.vendored').setLevel(100)
@override
def _connect(self):
self._session = self._session or self._get_boto3_session()
self._client = self._session.client('s3')
self._resource = self._session.resource('s3')
def _get_boto3_session(self):
import boto3
if self.use_opinel:
from opinel.utils.credentials import read_creds
# Refresh access token, and attach credentials to current object for debugging
self._credentials = read_creds(self.aws_profile)
return boto3.Session(
aws_access_key_id=self._credentials['AccessKeyId'],
aws_secret_access_key=self._credentials['SecretAccessKey'],
aws_session_token=self._credentials['SessionToken'],
profile_name=self.aws_profile,
)
return boto3.Session(profile_name=self.aws_profile)
@override
def _is_connected(self):
if self._client is None:
return False
# Check if still able to perform requests against AWS
import botocore
try:
self._client.list_buckets()
except botocore.exceptions.ClientError as e:
if len(e.args) > 0:
if 'ExpiredToken' in e.args[0] or 'InvalidToken' in e.args[0]:
return False
elif 'AccessDenied' in e.args[0]:
return True
@override
def _disconnect(self):
pass
# Path properties and helpers
@override
def _path_home(self):
return self.path_separator
@override
def _path_separator(self):
return self.__path_separator
# File node properties
@override
def _exists(self, path):
return self.isfile(path) or self.isdir(path)
def _s3_path(self, path):
if path.startswith(self.path_separator):
path = path[len(self.path_separator):]
if path.endswith(self.path_separator):
path = path[:-len(self.path_separator)]
return path
@override
def _isdir(self, path):
response = next(iter(self.__dir_paginator(path)))
if 'CommonPrefixes' in response or 'Contents' in response:
return True
return False
@override
def _isfile(self, path):
try:
self._client.get_object(Bucket=self.bucket, Key=self._s3_path(path) or '')
return True
except:
return False
# Directory handling and enumeration
def __dir_paginator(self, path):
path = self._s3_path(path)
paginator = self._client.get_paginator('list_objects')
iterator = paginator.paginate(
Bucket=self.bucket,
Prefix=path + (self.path_separator if path else ''),
Delimiter=self.path_separator,
PaginationConfig={'PageSize': 500}
)
return iterator
@override
def _dir(self, path):
iterator = self.__dir_paginator(path)
for response_data in iterator:
for prefix in response_data.get('CommonPrefixes', []):
yield FileSystemFileDesc(
fs=self,
path=prefix['Prefix'][:-len(self.path_separator)],
name=prefix['Prefix'][:-len(self.path_separator)].split(self.path_separator)[-1], # Remove trailing slash
type='directory',
)
for prefix in response_data.get('Contents', []):
if self.skip_hadoop_artifacts and prefix['Key'].endswith('_$folder$'):
continue
yield FileSystemFileDesc(
fs=self,
path=prefix['Key'],
name=prefix['Key'].split(self.path_separator)[-1],
type='file',
bytes=prefix['Size'],
owner=prefix['Owner']['DisplayName'] if 'Owner' in prefix else None,
last_modified=prefix['LastModified']
)
# TODO: Interestingly, directly using Amazon S3 methods seems slower than generic approach. Hypothesis: keys is not asynchronous.
# def _find(self, path_prefix, **attrs):
# if len(set(attrs).difference(('name',))) > 0 or hasattr(attrs.get('name'), '__call__'):
# logger.warning('Falling back to recursive search, rather than using S3, since find requires filters on more than just name.')
# for result in super(S3Client, self)._find(path_prefix, **attrs):
# yield result
#
# pattern = re.compile(attrs.get('name') or '.*')
#
# b = self._resource.Bucket(self.bucket)
# keys = b.objects.filter(Prefix=path_prefix)
# for k in keys:
# if pattern is None or pattern.match(k.key[len(path_prefix):]):
# yield k.key
@override
def _mkdir(self, path, recursive, exist_ok):
path = self._s3_path(path)
if not path.endswith(self.path_separator):
path += self.path_separator
if not self._exists(path):
self._client.put_object(Bucket=self.bucket, Key=path)
@override
def _remove(self, path, recursive):
path = self._s3_path(path)
if recursive:
bucket = self._resource.Bucket(self.bucket)
to_delete = []
for obj in bucket.objects.filter(Prefix=path + self.path_separator):
to_delete.append({'Key': obj.key})
if len(to_delete) == 1000: # Maximum number of simultaneous deletes is 1000
self._client.delete_objects(Bucket=self.bucket, Delete={'Objects': to_delete})
to_delete = []
self._client.delete_objects(Bucket=self.bucket, Delete={'Objects': to_delete})
self._client.delete_object(Bucket=self.bucket, Key=path)
# File handling
@override
def _file_read_(self, path, size=-1, offset=0, binary=False):
if not self.isfile(path):
raise FileNotFoundError("File `{}` does not exist.".format(path))
obj = self._resource.Object(self.bucket, self._s3_path(path))
body = obj.get()['Body'].read()
if not binary:
body = body.decode('utf-8')
if offset > 0:
body = body[offset:]
if size >= 0:
body = body[:size]
return body
@override
def _file_append_(self, path, s, binary):
raise NotImplementedError("Support for S3 append operation has yet to be implemented.")
@override
def _file_write_(self, path, s, binary):
obj = self._resource.Object(self.bucket, self._s3_path(path))
if not binary:
s = s.encode('utf-8')
obj.put(Body=s)
return True