From 06a5b6cf4699fcc28dd3ad5451c93336be241906 Mon Sep 17 00:00:00 2001 From: icereval Date: Tue, 19 Dec 2017 19:46:02 -0500 Subject: [PATCH 1/5] log osfstorage parity metadata to callback after upload * After uploading the parity files to remote storage, send a metadata payload to an OSF callback url. Metadata includes: version, parity redundancy, and names and sha256 of all files in the run. --- tests/providers/osfstorage/test_provider.py | 5 +- tests/providers/osfstorage/test_tasks.py | 48 ++++++++++++++---- waterbutler/providers/osfstorage/provider.py | 2 + .../providers/osfstorage/tasks/parity.py | 49 +++++++++++++++++-- 4 files changed, 89 insertions(+), 15 deletions(-) diff --git a/tests/providers/osfstorage/test_provider.py b/tests/providers/osfstorage/test_provider.py index e0e6d52da..bf177c66b 100644 --- a/tests/providers/osfstorage/test_provider.py +++ b/tests/providers/osfstorage/test_provider.py @@ -732,8 +732,9 @@ async def test_upload_and_tasks(self, monkeypatch, provider_and_mock, file_strea inner_provider.upload.assert_called_once_with(file_stream, WaterButlerPath('/uniquepath'), check_created=False, fetch_metadata=False) complete_path = os.path.join(FILE_PATH_COMPLETE, file_stream.writers['sha256'].hexdigest) - mock_parity.assert_called_once_with(complete_path, credentials['parity'], - settings['parity']) + mock_parity.assert_called_once_with(complete_path, upload_response['version'], + 'https://waterbutler.io/hooks/metadata/', + credentials['parity'], settings['parity']) mock_backup.assert_called_once_with(complete_path, upload_response['version'], 'https://waterbutler.io/hooks/metadata/', credentials['archive'], settings['parity']) diff --git a/tests/providers/osfstorage/test_tasks.py b/tests/providers/osfstorage/test_tasks.py index 5ed9ba568..e4a646e85 100644 --- a/tests/providers/osfstorage/test_tasks.py +++ b/tests/providers/osfstorage/test_tasks.py @@ -29,7 +29,7 @@ def settings(): return { 'storage': { 'provider': 'cloud', - 'container': 'butt', + 'container': 'foo', }, } @@ -55,29 +55,30 @@ def test_main_delays(self, monkeypatch, event_loop, credentials, settings): task = mock.Mock() monkeypatch.setattr(parity, '_parity_create_files', task) - fut = parity.main('The Best', credentials, settings) + fut = parity.main('The Best', 0, None, credentials, settings) event_loop.run_until_complete(fut) - task.delay.assert_called_once_with('The Best', credentials, settings) + task.delay.assert_called_once_with('The Best', 0, None, credentials, settings) def test_creates_upload_futures(self, monkeypatch, event_loop, credentials, settings): - paths = range(10) + paths = ['/foo/bar{}'.format(p) for p in range(10)] future = asyncio.Future() - future.set_result(None) + future.set_result(('faake_name', 'fake_sha256')) mock_upload_parity = mock.Mock() mock_upload_parity.return_value = future mock_create_parity = mock.Mock(return_value=paths) monkeypatch.setattr(parity, '_upload_parity', mock_upload_parity) monkeypatch.setattr(parity.utils, 'create_parity_files', mock_create_parity) + monkeypatch.setattr(parity, '_push_parity_complete', mock.Mock()) - parity._parity_create_files('Triangles', credentials, settings) + parity._parity_create_files('Triangles', None, None, credentials, settings) mock_create_parity.assert_called_once_with( os.path.join(osf_settings.FILE_PATH_COMPLETE, 'Triangles'), redundancy=osf_settings.PARITY_REDUNDANCY, ) - for num in reversed(range(10)): - mock_upload_parity.assert_any_call(num, credentials, settings) + for p in reversed(paths): + mock_upload_parity.assert_any_call(p, credentials, settings) @pytest.mark.asyncio async def test_uploads(self, monkeypatch, tmpdir, mock_provider): @@ -97,6 +98,35 @@ async def test_uploads(self, monkeypatch, tmpdir, mock_provider): WaterButlerPath('/' + os.path.split(path)[1]) ) + def test_calls_complete(self, monkeypatch, event_loop, mock_provider, credentials, settings): + paths = ['/foo/bar{}'.format(p) for p in range(10)] + paths.sort(key=lambda datum: datum[0]) + mock_complete = mock.Mock() + monkeypatch.setattr(parity.utils, 'create_parity_files', mock.Mock(return_value=paths)) + monkeypatch.setattr(parity, '_push_parity_complete', mock_complete) + + with mock.patch('waterbutler.providers.osfstorage.tasks.parity.open', mock.mock_open(), create=False): + parity._parity_create_files('Triangles', 0, 'http://callbackurl', credentials, settings) + + complete_call_args = mock_complete.delay.call_args[0] + complete_call_args[2]['parity']['files'].sort(key=lambda datum: datum['name']) + assert complete_call_args == ( + 0, + 'http://callbackurl', + { + 'parity': { + 'redundancy': osf_settings.PARITY_REDUNDANCY, + 'files': sorted([ + { + 'name': os.path.split(p)[1], + 'sha256': 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855', # mock_provider = /dev/null + } + for p in paths + ], key=lambda datum: datum['name']), + } + }, + ) + def test_exceptions_get_raised(self, monkeypatch): mock_sp_call = mock.Mock(return_value=7) monkeypatch.setattr(utils.subprocess, 'call', mock_sp_call) @@ -123,7 +153,7 @@ def test_skip_empty_files(self, monkeypatch): assert not mock_sp_call.called -class TestBackUpTask: +class TestBackupTask: def test_main_delays(self, monkeypatch, event_loop): task = mock.Mock() diff --git a/waterbutler/providers/osfstorage/provider.py b/waterbutler/providers/osfstorage/provider.py index 3ec6f2a07..66fdc6808 100644 --- a/waterbutler/providers/osfstorage/provider.py +++ b/waterbutler/providers/osfstorage/provider.py @@ -368,6 +368,8 @@ async def upload(self, stream, path, **kwargs): if settings.RUN_TASKS and data.pop('archive', True): parity.main( local_complete_path, + data['version'], + self.build_url('hooks', 'metadata') + '/', self.parity_credentials, self.parity_settings, ) diff --git a/waterbutler/providers/osfstorage/tasks/parity.py b/waterbutler/providers/osfstorage/tasks/parity.py index 43572a425..c0efd21b1 100644 --- a/waterbutler/providers/osfstorage/tasks/parity.py +++ b/waterbutler/providers/osfstorage/tasks/parity.py @@ -1,16 +1,22 @@ import os +import json import asyncio +import hashlib +from http import HTTPStatus -from waterbutler.core import streams +import aiohttp + +from waterbutler.core import streams, signing from waterbutler.core.utils import async_retry from waterbutler.core.utils import make_provider +from waterbutler.providers.osfstorage import settings from waterbutler.providers.osfstorage.tasks import utils from waterbutler.providers.osfstorage import settings as osf_settings @utils.task -def _parity_create_files(self, name, credentials, settings): +def _parity_create_files(self, name, version_id, callback_url, credentials, settings): path = os.path.join(osf_settings.FILE_PATH_COMPLETE, name) loop = asyncio.get_event_loop() with utils.RetryUpload(self): @@ -29,6 +35,15 @@ def _parity_create_files(self, name, credentials, settings): error = each.exception() if error: raise error + metadata = { + 'parity': { + 'redundancy': osf_settings.PARITY_REDUNDANCY, + 'files': [ + (lambda r: {'name': r[0], 'sha256': r[1]})(r.result()) for r in results + ], + }, + } + _push_parity_complete.delay(version_id, callback_url, metadata) async def _upload_parity(path, credentials, settings): @@ -37,12 +52,38 @@ async def _upload_parity(path, credentials, settings): provider = make_provider(provider_name, {}, credentials, settings) with open(path, 'rb') as file_pointer: stream = streams.FileStreamReader(file_pointer) + stream.add_writer('sha256', streams.HashStreamWriter(hashlib.sha256)) await provider.upload( stream, (await provider.validate_path('/' + name)) ) + return (name, stream.writers['sha256'].hexdigest) + + +@utils.task +def _push_parity_complete(self, version_id, callback_url, metadata): + signer = signing.Signer(settings.HMAC_SECRET, settings.HMAC_ALGORITHM) + with utils.RetryHook(self): + data = signing.sign_data( + signer, + { + 'version': version_id, + 'metadata': metadata, + }, + ) + future = aiohttp.request( + 'PUT', + callback_url, + data=json.dumps(data), + headers={'Content-Type': 'application/json'}, + ) + loop = asyncio.get_event_loop() + response = loop.run_until_complete(future) + + if response.status != HTTPStatus.OK: + raise Exception('Failed to report parity completion, got status code {}'.format(response.status)) @async_retry(retries=5, backoff=5) -def main(name, credentials, settings): - return _parity_create_files.delay(name, credentials, settings) +def main(name, version_id, callback_url, credentials, settings): + return _parity_create_files.delay(name, version_id, callback_url, credentials, settings) From 4d7e2ea79b3c8de204e11c9884c39aa9b44e2e65 Mon Sep 17 00:00:00 2001 From: icereval Date: Tue, 19 Dec 2017 23:31:12 -0500 Subject: [PATCH 2/5] fix typo in .core.metadata docstring --- waterbutler/core/metadata.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/waterbutler/core/metadata.py b/waterbutler/core/metadata.py index 8e92ceb12..97f3c3856 100644 --- a/waterbutler/core/metadata.py +++ b/waterbutler/core/metadata.py @@ -188,7 +188,7 @@ def materialized_path(self) -> str: @property def extra(self) -> dict: - """A dict of optional metdata from the provider. Non-mandatory metadata should be stored + """A dict of optional metadata from the provider. Non-mandatory metadata should be stored in this property. While this field is not explicitly structured, the `hashes` key should be reserved for the From 2eb78fd9b5edc16dc497f45066cc8fe786fc3827 Mon Sep 17 00:00:00 2001 From: icereval Date: Wed, 20 Dec 2017 11:16:03 -0500 Subject: [PATCH 3/5] refactor task metadata reporting into a utility function * Both the parity and backups tasks need to log metadata payloads to the OSF, so factor out the common code into a function. * Expand test coverage while we're at it. --- tests/providers/osfstorage/test_tasks.py | 47 +++++++++++++++++-- .../providers/osfstorage/tasks/backup.py | 24 +--------- .../providers/osfstorage/tasks/parity.py | 34 ++++---------- .../providers/osfstorage/tasks/utils.py | 25 ++++++++++ 4 files changed, 77 insertions(+), 53 deletions(-) diff --git a/tests/providers/osfstorage/test_tasks.py b/tests/providers/osfstorage/test_tasks.py index e4a646e85..f68e3eefd 100644 --- a/tests/providers/osfstorage/test_tasks.py +++ b/tests/providers/osfstorage/test_tasks.py @@ -4,19 +4,21 @@ from unittest import mock import pytest +import aiohttpretty +from boto.glacier.exceptions import UnexpectedHTTPResponseError from tests import utils as test_utils -from boto.glacier.exceptions import UnexpectedHTTPResponseError - +from waterbutler.core import signing from waterbutler.core.path import WaterButlerPath -from waterbutler.providers.osfstorage import settings from waterbutler.providers.osfstorage.tasks import utils from waterbutler.providers.osfstorage.tasks import backup from waterbutler.providers.osfstorage.tasks import parity from waterbutler.providers.osfstorage.tasks import exceptions from waterbutler.providers.osfstorage import settings as osf_settings +EMPTY_SHA256 = 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855' + @pytest.fixture def credentials(): @@ -105,7 +107,8 @@ def test_calls_complete(self, monkeypatch, event_loop, mock_provider, credential monkeypatch.setattr(parity.utils, 'create_parity_files', mock.Mock(return_value=paths)) monkeypatch.setattr(parity, '_push_parity_complete', mock_complete) - with mock.patch('waterbutler.providers.osfstorage.tasks.parity.open', mock.mock_open(), create=False): + with mock.patch('waterbutler.providers.osfstorage.tasks.parity.open', mock.mock_open(), + create=False): parity._parity_create_files('Triangles', 0, 'http://callbackurl', credentials, settings) complete_call_args = mock_complete.delay.call_args[0] @@ -119,7 +122,7 @@ def test_calls_complete(self, monkeypatch, event_loop, mock_provider, credential 'files': sorted([ { 'name': os.path.split(p)[1], - 'sha256': 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855', # mock_provider = /dev/null + 'sha256': EMPTY_SHA256, } for p in paths ], key=lambda datum: datum['name']), @@ -152,6 +155,23 @@ def test_skip_empty_files(self, monkeypatch): assert paths == [] assert not mock_sp_call.called + @pytest.mark.aiohttpretty + def test_push_complete(self, event_loop): + callback_url = 'https://fakeosf.io/guidz/osfstorage/hooks/metadata/' + aiohttpretty.register_json_uri('PUT', callback_url, status=200, body={'status': 'success'}) + + parity._push_parity_complete(123, callback_url, {'some': 'metadata'}) + + assert aiohttpretty.has_call(method='PUT', uri=callback_url) + + @pytest.mark.aiohttpretty + def test_push_complete_error(self, event_loop): + callback_url = 'https://fakeosf.io/guidz/osfstorage/hooks/metadata/' + aiohttpretty.register_json_uri('PUT', callback_url, status=500) + + with pytest.raises(Exception): + parity._push_parity_complete(123, callback_url, {'some': 'metadata'}) + class TestBackupTask: @@ -241,3 +261,20 @@ def test_upload_error(self, monkeypatch): with pytest.raises(UnexpectedHTTPResponseError): backup._push_file_archive('Triangles', None, None, {}, {}) assert not mock_complete.called + + @pytest.mark.aiohttpretty + def test_push_complete(self, event_loop): + callback_url = 'https://fakeosf.io/guidz/osfstorage/hooks/metadata/' + aiohttpretty.register_json_uri('PUT', callback_url, status=200, body={'status': 'success'}) + + backup._push_archive_complete(123, callback_url, {'some': 'metadata'}) + + assert aiohttpretty.has_call(method='PUT', uri=callback_url) + + @pytest.mark.aiohttpretty + def test_push_complete_error(self, event_loop): + callback_url = 'https://fakeosf.io/guidz/osfstorage/hooks/metadata/' + aiohttpretty.register_json_uri('PUT', callback_url, status=500) + + with pytest.raises(Exception): + backup._push_archive_complete(123, callback_url, {'some': 'metadata'}) diff --git a/waterbutler/providers/osfstorage/tasks/backup.py b/waterbutler/providers/osfstorage/tasks/backup.py index 092afee62..36cc6d424 100644 --- a/waterbutler/providers/osfstorage/tasks/backup.py +++ b/waterbutler/providers/osfstorage/tasks/backup.py @@ -1,15 +1,11 @@ import os import json import asyncio -from http import HTTPStatus -import aiohttp from boto.glacier.layer2 import Layer2 from boto.glacier.exceptions import UnexpectedHTTPResponseError -from waterbutler.core import signing from waterbutler.core.utils import async_retry -from waterbutler.providers.osfstorage import settings from waterbutler.providers.osfstorage.tasks import utils @@ -46,26 +42,10 @@ def _push_file_archive(self, local_path, version_id, callback_url, @utils.task def _push_archive_complete(self, version_id, callback_url, metadata): - signer = signing.Signer(settings.HMAC_SECRET, settings.HMAC_ALGORITHM) with utils.RetryHook(self): - data = signing.sign_data( - signer, - { - 'version': version_id, - 'metadata': metadata, - }, - ) - future = aiohttp.request( - 'PUT', - callback_url, - data=json.dumps(data), - headers={'Content-Type': 'application/json'}, - ) + future = utils.push_metadata(version_id, callback_url, metadata) loop = asyncio.get_event_loop() - response = loop.run_until_complete(future) - - if response.status != HTTPStatus.OK: - raise Exception('Failed to report archive completion, got status code {}'.format(response.status)) + loop.run_until_complete(future) @async_retry(retries=5, backoff=5) diff --git a/waterbutler/providers/osfstorage/tasks/parity.py b/waterbutler/providers/osfstorage/tasks/parity.py index c0efd21b1..865c47f7d 100644 --- a/waterbutler/providers/osfstorage/tasks/parity.py +++ b/waterbutler/providers/osfstorage/tasks/parity.py @@ -1,16 +1,11 @@ import os -import json import asyncio import hashlib -from http import HTTPStatus -import aiohttp - -from waterbutler.core import streams, signing +from waterbutler.core import streams from waterbutler.core.utils import async_retry from waterbutler.core.utils import make_provider -from waterbutler.providers.osfstorage import settings from waterbutler.providers.osfstorage.tasks import utils from waterbutler.providers.osfstorage import settings as osf_settings @@ -27,14 +22,17 @@ def _parity_create_files(self, name, version_id, callback_url, credentials, sett if not parity_paths: # create_parity_files will return [] for empty files return - futures = [asyncio.async(_upload_parity(each, credentials, settings)) for each in parity_paths] - results, _ = loop.run_until_complete(asyncio.wait(futures, return_when=asyncio.FIRST_EXCEPTION)) + futures = [asyncio.async(_upload_parity(each, credentials, settings)) + for each in parity_paths] + results, _ = loop.run_until_complete(asyncio.wait(futures, + return_when=asyncio.FIRST_EXCEPTION)) # Errors are not raised in `wait`; explicitly check results for errors # and raise if any found for each in results: error = each.exception() if error: raise error + metadata = { 'parity': { 'redundancy': osf_settings.PARITY_REDUNDANCY, @@ -62,26 +60,10 @@ async def _upload_parity(path, credentials, settings): @utils.task def _push_parity_complete(self, version_id, callback_url, metadata): - signer = signing.Signer(settings.HMAC_SECRET, settings.HMAC_ALGORITHM) with utils.RetryHook(self): - data = signing.sign_data( - signer, - { - 'version': version_id, - 'metadata': metadata, - }, - ) - future = aiohttp.request( - 'PUT', - callback_url, - data=json.dumps(data), - headers={'Content-Type': 'application/json'}, - ) + future = utils.push_metadata(version_id, callback_url, metadata) loop = asyncio.get_event_loop() - response = loop.run_until_complete(future) - - if response.status != HTTPStatus.OK: - raise Exception('Failed to report parity completion, got status code {}'.format(response.status)) + loop.run_until_complete(future) @async_retry(retries=5, backoff=5) diff --git a/waterbutler/providers/osfstorage/tasks/utils.py b/waterbutler/providers/osfstorage/tasks/utils.py index 111c2c5d6..f57d716c9 100644 --- a/waterbutler/providers/osfstorage/tasks/utils.py +++ b/waterbutler/providers/osfstorage/tasks/utils.py @@ -1,13 +1,17 @@ import os import glob +import json import errno import logging import functools import contextlib import subprocess +from http import HTTPStatus +import aiohttp from celery.utils.log import get_task_logger +from waterbutler.core import signing from waterbutler.tasks.app import app, client from waterbutler.providers.osfstorage import settings from waterbutler.providers.osfstorage.tasks import exceptions @@ -66,6 +70,27 @@ def create_parity_files(file_path, redundancy=5): ] +async def push_metadata(version_id, callback_url, metadata): + signer = signing.Signer(settings.HMAC_SECRET, settings.HMAC_ALGORITHM) + data = signing.sign_data( + signer, + { + 'version': version_id, + 'metadata': metadata, + }, + ) + response = await aiohttp.request( + 'PUT', + callback_url, + data=json.dumps(data), + headers={'Content-Type': 'application/json'}, + ) + + if response.status != HTTPStatus.OK: + raise Exception('Failed to report archive completion, got status ' + 'code {}'.format(response.status)) + + def sanitize_request(request): """Return dictionary of request attributes, excluding args and kwargs. Used to ensure that potentially sensitive values aren't logged or sent to Sentry. From 47922e36e48f04dce634ea57d4482ad59e2d09f3 Mon Sep 17 00:00:00 2001 From: Fitz Elliott Date: Wed, 20 Dec 2017 11:41:59 -0500 Subject: [PATCH 4/5] fix osfstorage test to use correct config key --- tests/providers/osfstorage/test_provider.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/providers/osfstorage/test_provider.py b/tests/providers/osfstorage/test_provider.py index bf177c66b..68b8d2376 100644 --- a/tests/providers/osfstorage/test_provider.py +++ b/tests/providers/osfstorage/test_provider.py @@ -737,7 +737,7 @@ async def test_upload_and_tasks(self, monkeypatch, provider_and_mock, file_strea credentials['parity'], settings['parity']) mock_backup.assert_called_once_with(complete_path, upload_response['version'], 'https://waterbutler.io/hooks/metadata/', - credentials['archive'], settings['parity']) + credentials['archive'], settings['archive']) expected_path = WaterButlerPath('/' + file_stream.writers['sha256'].hexdigest) inner_provider.metadata.assert_called_once_with(expected_path) inner_provider.move.assert_called_once_with(inner_provider, WaterButlerPath('/uniquepath'), From d252d2d8372ba7bb6914169a8136813bdc26ea7b Mon Sep 17 00:00:00 2001 From: Fitz Elliott Date: Wed, 20 Dec 2017 11:58:50 -0500 Subject: [PATCH 5/5] bump version & update changelog --- CHANGELOG | 6 +++++- waterbutler/__init__.py | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/CHANGELOG b/CHANGELOG index b049273e7..6d9ee53d5 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -2,6 +2,10 @@ ChangeLog ********* +0.36.2 (2017-12-20) +=================== +- Feature: Log osfstorage parity file metadata back to the OSF after upload. + 0.36.1 (2017-12-11) =================== - Fix: Update OneDrive metadata to report the correct materialized name. @@ -53,7 +57,7 @@ This bypasses the 10 second wait for shutdown when running it in Docker. 0.33.1 (2017-09-05) =================== -- Reject requests for Box IDs if the ID is valid, but the file or folder is outside of the +- Fix: Reject requests for Box IDs if the ID is valid, but the file or folder is outside of the project root. (thanks, @AddisonSchiller!) 0.33.0 (2017-08-09) diff --git a/waterbutler/__init__.py b/waterbutler/__init__.py index bfc7521f1..6d86d2fa5 100644 --- a/waterbutler/__init__.py +++ b/waterbutler/__init__.py @@ -1,2 +1,2 @@ -__version__ = '0.36.1' +__version__ = '0.36.2' __import__('pkg_resources').declare_namespace(__name__)