Skip to content

Commit

Permalink
Merge tag '0.36.2' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
felliott committed Dec 20, 2017
2 parents 3ec764a + 070f629 commit 1b60f40
Show file tree
Hide file tree
Showing 9 changed files with 147 additions and 45 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
ChangeLog
*********

0.36.2 (2017-12-20)
===================
- Feature: Log osfstorage parity file metadata back to the OSF after upload.

0.36.1 (2017-12-11)
===================
- Fix: Update OneDrive metadata to report the correct materialized name.
Expand Down Expand Up @@ -53,7 +57,7 @@ This bypasses the 10 second wait for shutdown when running it in Docker.

0.33.1 (2017-09-05)
===================
- Reject requests for Box IDs if the ID is valid, but the file or folder is outside of the
- Fix: Reject requests for Box IDs if the ID is valid, but the file or folder is outside of the
project root. (thanks, @AddisonSchiller!)

0.33.0 (2017-08-09)
Expand Down
7 changes: 4 additions & 3 deletions tests/providers/osfstorage/test_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -732,11 +732,12 @@ async def test_upload_and_tasks(self, monkeypatch, provider_and_mock, file_strea
inner_provider.upload.assert_called_once_with(file_stream, WaterButlerPath('/uniquepath'),
check_created=False, fetch_metadata=False)
complete_path = os.path.join(FILE_PATH_COMPLETE, file_stream.writers['sha256'].hexdigest)
mock_parity.assert_called_once_with(complete_path, credentials['parity'],
settings['parity'])
mock_parity.assert_called_once_with(complete_path, upload_response['version'],
'https://waterbutler.io/hooks/metadata/',
credentials['parity'], settings['parity'])
mock_backup.assert_called_once_with(complete_path, upload_response['version'],
'https://waterbutler.io/hooks/metadata/',
credentials['archive'], settings['parity'])
credentials['archive'], settings['archive'])
expected_path = WaterButlerPath('/' + file_stream.writers['sha256'].hexdigest)
inner_provider.metadata.assert_called_once_with(expected_path)
inner_provider.move.assert_called_once_with(inner_provider, WaterButlerPath('/uniquepath'),
Expand Down
91 changes: 79 additions & 12 deletions tests/providers/osfstorage/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,21 @@
from unittest import mock

import pytest
import aiohttpretty
from boto.glacier.exceptions import UnexpectedHTTPResponseError

from tests import utils as test_utils

from boto.glacier.exceptions import UnexpectedHTTPResponseError

from waterbutler.core import signing
from waterbutler.core.path import WaterButlerPath
from waterbutler.providers.osfstorage import settings
from waterbutler.providers.osfstorage.tasks import utils
from waterbutler.providers.osfstorage.tasks import backup
from waterbutler.providers.osfstorage.tasks import parity
from waterbutler.providers.osfstorage.tasks import exceptions
from waterbutler.providers.osfstorage import settings as osf_settings

EMPTY_SHA256 = 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'


@pytest.fixture
def credentials():
Expand All @@ -29,7 +31,7 @@ def settings():
return {
'storage': {
'provider': 'cloud',
'container': 'butt',
'container': 'foo',
},
}

Expand All @@ -55,29 +57,30 @@ def test_main_delays(self, monkeypatch, event_loop, credentials, settings):
task = mock.Mock()
monkeypatch.setattr(parity, '_parity_create_files', task)

fut = parity.main('The Best', credentials, settings)
fut = parity.main('The Best', 0, None, credentials, settings)
event_loop.run_until_complete(fut)

task.delay.assert_called_once_with('The Best', credentials, settings)
task.delay.assert_called_once_with('The Best', 0, None, credentials, settings)

def test_creates_upload_futures(self, monkeypatch, event_loop, credentials, settings):
paths = range(10)
paths = ['/foo/bar{}'.format(p) for p in range(10)]
future = asyncio.Future()
future.set_result(None)
future.set_result(('faake_name', 'fake_sha256'))
mock_upload_parity = mock.Mock()
mock_upload_parity.return_value = future
mock_create_parity = mock.Mock(return_value=paths)
monkeypatch.setattr(parity, '_upload_parity', mock_upload_parity)
monkeypatch.setattr(parity.utils, 'create_parity_files', mock_create_parity)
monkeypatch.setattr(parity, '_push_parity_complete', mock.Mock())

parity._parity_create_files('Triangles', credentials, settings)
parity._parity_create_files('Triangles', None, None, credentials, settings)

mock_create_parity.assert_called_once_with(
os.path.join(osf_settings.FILE_PATH_COMPLETE, 'Triangles'),
redundancy=osf_settings.PARITY_REDUNDANCY,
)
for num in reversed(range(10)):
mock_upload_parity.assert_any_call(num, credentials, settings)
for p in reversed(paths):
mock_upload_parity.assert_any_call(p, credentials, settings)

@pytest.mark.asyncio
async def test_uploads(self, monkeypatch, tmpdir, mock_provider):
Expand All @@ -97,6 +100,36 @@ async def test_uploads(self, monkeypatch, tmpdir, mock_provider):
WaterButlerPath('/' + os.path.split(path)[1])
)

def test_calls_complete(self, monkeypatch, event_loop, mock_provider, credentials, settings):
paths = ['/foo/bar{}'.format(p) for p in range(10)]
paths.sort(key=lambda datum: datum[0])
mock_complete = mock.Mock()
monkeypatch.setattr(parity.utils, 'create_parity_files', mock.Mock(return_value=paths))
monkeypatch.setattr(parity, '_push_parity_complete', mock_complete)

with mock.patch('waterbutler.providers.osfstorage.tasks.parity.open', mock.mock_open(),
create=False):
parity._parity_create_files('Triangles', 0, 'http://callbackurl', credentials, settings)

complete_call_args = mock_complete.delay.call_args[0]
complete_call_args[2]['parity']['files'].sort(key=lambda datum: datum['name'])
assert complete_call_args == (
0,
'http://callbackurl',
{
'parity': {
'redundancy': osf_settings.PARITY_REDUNDANCY,
'files': sorted([
{
'name': os.path.split(p)[1],
'sha256': EMPTY_SHA256,
}
for p in paths
], key=lambda datum: datum['name']),
}
},
)

def test_exceptions_get_raised(self, monkeypatch):
mock_sp_call = mock.Mock(return_value=7)
monkeypatch.setattr(utils.subprocess, 'call', mock_sp_call)
Expand All @@ -122,8 +155,25 @@ def test_skip_empty_files(self, monkeypatch):
assert paths == []
assert not mock_sp_call.called

@pytest.mark.aiohttpretty
def test_push_complete(self, event_loop):
callback_url = 'https://fakeosf.io/guidz/osfstorage/hooks/metadata/'
aiohttpretty.register_json_uri('PUT', callback_url, status=200, body={'status': 'success'})

parity._push_parity_complete(123, callback_url, {'some': 'metadata'})

assert aiohttpretty.has_call(method='PUT', uri=callback_url)

@pytest.mark.aiohttpretty
def test_push_complete_error(self, event_loop):
callback_url = 'https://fakeosf.io/guidz/osfstorage/hooks/metadata/'
aiohttpretty.register_json_uri('PUT', callback_url, status=500)

with pytest.raises(Exception):
parity._push_parity_complete(123, callback_url, {'some': 'metadata'})


class TestBackUpTask:
class TestBackupTask:

def test_main_delays(self, monkeypatch, event_loop):
task = mock.Mock()
Expand Down Expand Up @@ -211,3 +261,20 @@ def test_upload_error(self, monkeypatch):
with pytest.raises(UnexpectedHTTPResponseError):
backup._push_file_archive('Triangles', None, None, {}, {})
assert not mock_complete.called

@pytest.mark.aiohttpretty
def test_push_complete(self, event_loop):
callback_url = 'https://fakeosf.io/guidz/osfstorage/hooks/metadata/'
aiohttpretty.register_json_uri('PUT', callback_url, status=200, body={'status': 'success'})

backup._push_archive_complete(123, callback_url, {'some': 'metadata'})

assert aiohttpretty.has_call(method='PUT', uri=callback_url)

@pytest.mark.aiohttpretty
def test_push_complete_error(self, event_loop):
callback_url = 'https://fakeosf.io/guidz/osfstorage/hooks/metadata/'
aiohttpretty.register_json_uri('PUT', callback_url, status=500)

with pytest.raises(Exception):
backup._push_archive_complete(123, callback_url, {'some': 'metadata'})
2 changes: 1 addition & 1 deletion waterbutler/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
__version__ = '0.36.1'
__version__ = '0.36.2'
__import__('pkg_resources').declare_namespace(__name__)
2 changes: 1 addition & 1 deletion waterbutler/core/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def materialized_path(self) -> str:

@property
def extra(self) -> dict:
"""A dict of optional metdata from the provider. Non-mandatory metadata should be stored
"""A dict of optional metadata from the provider. Non-mandatory metadata should be stored
in this property.
While this field is not explicitly structured, the `hashes` key should be reserved for the
Expand Down
2 changes: 2 additions & 0 deletions waterbutler/providers/osfstorage/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,8 @@ async def upload(self, stream, path, **kwargs):
if settings.RUN_TASKS and data.pop('archive', True):
parity.main(
local_complete_path,
data['version'],
self.build_url('hooks', 'metadata') + '/',
self.parity_credentials,
self.parity_settings,
)
Expand Down
24 changes: 2 additions & 22 deletions waterbutler/providers/osfstorage/tasks/backup.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
import os
import json
import asyncio
from http import HTTPStatus

import aiohttp
from boto.glacier.layer2 import Layer2
from boto.glacier.exceptions import UnexpectedHTTPResponseError

from waterbutler.core import signing
from waterbutler.core.utils import async_retry
from waterbutler.providers.osfstorage import settings
from waterbutler.providers.osfstorage.tasks import utils


Expand Down Expand Up @@ -46,26 +42,10 @@ def _push_file_archive(self, local_path, version_id, callback_url,

@utils.task
def _push_archive_complete(self, version_id, callback_url, metadata):
signer = signing.Signer(settings.HMAC_SECRET, settings.HMAC_ALGORITHM)
with utils.RetryHook(self):
data = signing.sign_data(
signer,
{
'version': version_id,
'metadata': metadata,
},
)
future = aiohttp.request(
'PUT',
callback_url,
data=json.dumps(data),
headers={'Content-Type': 'application/json'},
)
future = utils.push_metadata(version_id, callback_url, metadata)
loop = asyncio.get_event_loop()
response = loop.run_until_complete(future)

if response.status != HTTPStatus.OK:
raise Exception('Failed to report archive completion, got status code {}'.format(response.status))
loop.run_until_complete(future)


@async_retry(retries=5, backoff=5)
Expand Down
33 changes: 28 additions & 5 deletions waterbutler/providers/osfstorage/tasks/parity.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import asyncio
import hashlib

from waterbutler.core import streams
from waterbutler.core.utils import async_retry
Expand All @@ -10,7 +11,7 @@


@utils.task
def _parity_create_files(self, name, credentials, settings):
def _parity_create_files(self, name, version_id, callback_url, credentials, settings):
path = os.path.join(osf_settings.FILE_PATH_COMPLETE, name)
loop = asyncio.get_event_loop()
with utils.RetryUpload(self):
Expand All @@ -21,28 +22,50 @@ def _parity_create_files(self, name, credentials, settings):
if not parity_paths:
# create_parity_files will return [] for empty files
return
futures = [asyncio.async(_upload_parity(each, credentials, settings)) for each in parity_paths]
results, _ = loop.run_until_complete(asyncio.wait(futures, return_when=asyncio.FIRST_EXCEPTION))
futures = [asyncio.async(_upload_parity(each, credentials, settings))
for each in parity_paths]
results, _ = loop.run_until_complete(asyncio.wait(futures,
return_when=asyncio.FIRST_EXCEPTION))
# Errors are not raised in `wait`; explicitly check results for errors
# and raise if any found
for each in results:
error = each.exception()
if error:
raise error

metadata = {
'parity': {
'redundancy': osf_settings.PARITY_REDUNDANCY,
'files': [
(lambda r: {'name': r[0], 'sha256': r[1]})(r.result()) for r in results
],
},
}
_push_parity_complete.delay(version_id, callback_url, metadata)


async def _upload_parity(path, credentials, settings):
_, name = os.path.split(path)
provider_name = settings.get('provider')
provider = make_provider(provider_name, {}, credentials, settings)
with open(path, 'rb') as file_pointer:
stream = streams.FileStreamReader(file_pointer)
stream.add_writer('sha256', streams.HashStreamWriter(hashlib.sha256))
await provider.upload(
stream,
(await provider.validate_path('/' + name))
)
return (name, stream.writers['sha256'].hexdigest)


@utils.task
def _push_parity_complete(self, version_id, callback_url, metadata):
with utils.RetryHook(self):
future = utils.push_metadata(version_id, callback_url, metadata)
loop = asyncio.get_event_loop()
loop.run_until_complete(future)


@async_retry(retries=5, backoff=5)
def main(name, credentials, settings):
return _parity_create_files.delay(name, credentials, settings)
def main(name, version_id, callback_url, credentials, settings):
return _parity_create_files.delay(name, version_id, callback_url, credentials, settings)
25 changes: 25 additions & 0 deletions waterbutler/providers/osfstorage/tasks/utils.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
import os
import glob
import json
import errno
import logging
import functools
import contextlib
import subprocess
from http import HTTPStatus

import aiohttp
from celery.utils.log import get_task_logger

from waterbutler.core import signing
from waterbutler.tasks.app import app, client
from waterbutler.providers.osfstorage import settings
from waterbutler.providers.osfstorage.tasks import exceptions
Expand Down Expand Up @@ -66,6 +70,27 @@ def create_parity_files(file_path, redundancy=5):
]


async def push_metadata(version_id, callback_url, metadata):
signer = signing.Signer(settings.HMAC_SECRET, settings.HMAC_ALGORITHM)
data = signing.sign_data(
signer,
{
'version': version_id,
'metadata': metadata,
},
)
response = await aiohttp.request(
'PUT',
callback_url,
data=json.dumps(data),
headers={'Content-Type': 'application/json'},
)

if response.status != HTTPStatus.OK:
raise Exception('Failed to report archive completion, got status '
'code {}'.format(response.status))


def sanitize_request(request):
"""Return dictionary of request attributes, excluding args and kwargs. Used
to ensure that potentially sensitive values aren't logged or sent to Sentry.
Expand Down

0 comments on commit 1b60f40

Please sign in to comment.