Skip to content

Commit

Permalink
Merge pull request #1905 from davidmarin/glacier-restore
Browse files Browse the repository at this point in the history
allow input on S3 that's been restored from Glacier (fixes #1887)
  • Loading branch information
David Marin committed Dec 13, 2018
2 parents 05e8c11 + d8685db commit 0210193
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 6 deletions.
11 changes: 10 additions & 1 deletion mrjob/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@
_CLUSTER_SELF_TERMINATED_RE = re.compile(
'^.*(node|instances) .* terminated.*$', re.I)

# if this appears in an S3 object's "restore" field, the object
# is available to read even if it's Glacier-archived
_RESTORED_FROM_GLACIER = 'ongoing-request="false"'

# used to bail out and retry when a pooled cluster self-terminates
class _PooledClusterSelfTerminatedException(Exception):
Expand Down Expand Up @@ -749,7 +752,13 @@ def _check_input_path(self, path):
for uri, obj in self._s3_fs._ls(path):
exists = True

if obj.storage_class == 'GLACIER':
# we currently just look for 'ongoing-request="false"'
# in the *restore* field and ignore the expiration date
# (if the object has expired, the *restore* field won't be set).
#
# See #1887 for more discussion of checking expiration.
if obj.storage_class == 'GLACIER' and not (
obj.restore and _RESTORED_FROM_GLACIER in obj.restore):
raise IOError(
'%s is archived in Glacier and'
' cannot be read as input!' % uri)
Expand Down
6 changes: 4 additions & 2 deletions tests/mock_boto3/case.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,12 @@ def fake_create_mrjob_zip(mocked_runner, *args, **kwargs):
self.start(patch.object(time, 'sleep'))

def add_mock_s3_data(self, data,
age=None, location=None, storage_class=None):
age=None, location=None, storage_class=None,
restore=None):
"""Update self.mock_s3_fs with a map from bucket name
to key name to data."""
add_mock_s3_data(self.mock_s3_fs, data, age, location, storage_class)
add_mock_s3_data(
self.mock_s3_fs, data, age, location, storage_class, restore)

def add_mock_ec2_image(self, image):
"""Add information about a mock EC2 Image (AMI) to be returned by
Expand Down
18 changes: 16 additions & 2 deletions tests/mock_boto3/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ class MockS3Client(object):
:py:class:`~datetime.datetime`. *location* is an
optional location constraint for the bucket
(a region name). *storage_class* is an optional
non-Standard storage class (e.g. ``'GLACIER'``).
non-Standard storage class (e.g. ``'GLACIER'``),
*restore* is an optional field showing whether
the object has been restored or is being restored.
"""
def __init__(self,
mock_s3_fs,
Expand Down Expand Up @@ -92,12 +94,17 @@ def list_buckets(self):


def add_mock_s3_data(mock_s3_fs, data,
age=None, location=None, storage_class=None):
age=None, location=None,
storage_class=None,
restore=None):
"""Update *mock_s3_fs* with a map from bucket name to key name to data.
:param age: a timedelta
:param location string: the bucket's location constraint (a region name)
:param storage_class string: storage class for all data added
:param restore: x-amz-restore header (see
https://docs.aws.amazon.com/AmazonS3/latest/API/\
RESTObjectHEAD.html#RESTObjectHEAD-responses)
"""
age = age or timedelta(0)
time_modified = _boto3_now() - age
Expand All @@ -113,8 +120,11 @@ def add_mock_s3_data(mock_s3_fs, data,

mock_key = dict(
body=key_data, time_modified=time_modified)

if storage_class:
mock_key['storage_class'] = storage_class
if restore:
mock_key['restore'] = restore

bucket['keys'][key_name] = mock_key

Expand Down Expand Up @@ -247,6 +257,7 @@ def get(self):
self.last_modified = mock_key['time_modified']
self.size = len(mock_key['body'])
self.storage_class = mock_key.get('storage_class')
self.restore = mock_key.get('restore')

result = dict(
Body=MockStreamingBody(mock_key['body']),
Expand All @@ -258,6 +269,9 @@ def get(self):
if self.storage_class is not None:
result['StorageClass'] = self.storage_class

if self.restore is not None:
result['Restore'] = self.storage_class

return result

def put(self, Body):
Expand Down
36 changes: 35 additions & 1 deletion tests/test_emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -5671,7 +5671,41 @@ def test_existing_s3_dir(self):
with job.make_runner() as runner:
self.assertRaises(StopIteration, runner.run)

def test_s3_path_in_glacier(self):
def test_s3_path_archived_in_glacier(self):
self.add_mock_s3_data(
{'walrus': {'data/foo': b'foo\n'}}, storage_class='GLACIER')

job = MRTwoStepJob(['-r', 'emr', 's3://walrus/data/foo'])

with job.make_runner() as runner:
self.assertRaises(IOError, runner.run)

def test_s3_path_being_restored_from_glacier(self):
# should fail: a restore request doesn't make something available
self.add_mock_s3_data(
{'walrus': {'data/foo': b'foo\n'}},
storage_class='GLACIER',
restore='ongoing-request="true"')

job = MRTwoStepJob(['-r', 'emr', 's3://walrus/data/foo'])

with job.make_runner() as runner:
self.assertRaises(IOError, runner.run)

def test_s3_path_restored_from_glacier(self):
# should succeed
self.add_mock_s3_data(
{'walrus': {'data/foo': b'foo\n'}},
storage_class='GLACIER',
restore=('ongoing-request="false",'
' expiry-date="Tue, May 23, 3030 00:00:00 GMT'))

job = MRTwoStepJob(['-r', 'emr', 's3://walrus/data/foo'])

with job.make_runner() as runner:
self.assertRaises(StopIteration, runner.run)

def test_all_paths_for_glacier_status(self):
self.add_mock_s3_data({'walrus': {'data/one': b'one\n'}})
self.add_mock_s3_data({'walrus': {'data/two': b'two\n'}},
storage_class='GLACIER')
Expand Down

0 comments on commit 0210193

Please sign in to comment.