From bfe239b4f482766a931b8fc71467f000d39a0f7e Mon Sep 17 00:00:00 2001 From: Mathieu Leplatre Date: Thu, 21 Jul 2016 11:24:52 +0200 Subject: [PATCH] Change strategy, use notify_resource_event --- kinto_signer/tests/test_updater.py | 31 ++-- kinto_signer/updater.py | 234 ++++++++++++++++++++++------- 2 files changed, 198 insertions(+), 67 deletions(-) diff --git a/kinto_signer/tests/test_updater.py b/kinto_signer/tests/test_updater.py index 0c6718c4..110ccc06 100644 --- a/kinto_signer/tests/test_updater.py +++ b/kinto_signer/tests/test_updater.py @@ -1,8 +1,10 @@ import mock import pytest + from kinto.core.storage import Filter from kinto.core.storage.exceptions import UnicityError, RecordNotFoundError from kinto.core.utils import COMPARISON +from kinto.tests.core.support import DummyRequest from kinto_signer.updater import LocalUpdater from .support import unittest @@ -25,6 +27,11 @@ def setUp(self): storage=self.storage, permission=self.permission) + # Resource events are bypassed completely in this test suite. + patcher = mock.patch('kinto_signer.updater.build_request') + self.addCleanup(patcher.stop) + patcher.start() + def patch(self, obj, *args, **kwargs): patcher = mock.patch.object(obj, *args, **kwargs) self.addCleanup(patcher.stop) @@ -82,7 +89,7 @@ def test_push_records_to_destination(self): records = [{'id': idx, 'foo': 'bar %s' % idx} for idx in range(1, 4)] self.patch(self.updater, 'get_source_records', return_value=(records, '42')) - self.updater.push_records_to_destination() + self.updater.push_records_to_destination(DummyRequest()) assert self.storage.update.call_count == 3 def test_push_records_removes_deleted_records(self): @@ -93,7 +100,7 @@ def test_push_records_removes_deleted_records(self): for idx in range(3, 5)]) self.patch(self.updater, 'get_source_records', return_value=(records, '42')) - self.updater.push_records_to_destination() + self.updater.push_records_to_destination(DummyRequest()) self.updater.get_source_records.assert_called_with( 1324, include_deleted=True) assert self.storage.update.call_count == 2 @@ -111,7 +118,7 @@ def test_push_records_skip_already_deleted_records(self): self.patch(self.updater, 'get_source_records', return_value=(records, '42')) # Calling the updater should not raise the RecordNotFoundError. - self.updater.push_records_to_destination() + self.updater.push_records_to_destination(DummyRequest()) def test_push_records_to_destination_with_no_destination_changes(self): self.patch(self.updater, 'get_destination_last_modified', @@ -119,14 +126,15 @@ def test_push_records_to_destination_with_no_destination_changes(self): records = [{'id': idx, 'foo': 'bar %s' % idx} for idx in range(1, 4)] self.patch(self.updater, 'get_source_records', return_value=(records, '42')) - self.updater.push_records_to_destination() + self.updater.push_records_to_destination(DummyRequest()) self.updater.get_source_records.assert_called_with( None, include_deleted=True) assert self.storage.update.call_count == 3 def test_set_destination_signature_modifies_the_source_collection(self): self.storage.get.return_value = {'id': 1234, 'last_modified': 1234} - self.updater.set_destination_signature(mock.sentinel.signature) + self.updater.set_destination_signature(mock.sentinel.signature, + DummyRequest()) self.storage.update.assert_called_with( collection_id='collection', @@ -140,7 +148,7 @@ def test_set_destination_signature_modifies_the_source_collection(self): def test_update_source_status_modifies_the_source_collection(self): self.storage.get.return_value = {'id': 1234, 'last_modified': 1234, 'status': 'to-sign'} - self.updater.update_source_status("signed") + self.updater.update_source_status("signed", DummyRequest()) self.storage.update.assert_called_with( collection_id='collection', @@ -153,13 +161,13 @@ def test_update_source_status_modifies_the_source_collection(self): def test_create_destination_updates_collection_permissions(self): collection_id = '/buckets/destbucket/collections/destcollection' - self.updater.create_destination() + self.updater.create_destination(DummyRequest()) self.permission.replace_object_permissions.assert_called_with( collection_id, {"read": ("system.Everyone",)}) def test_create_destination_creates_bucket(self): - self.updater.create_destination() + self.updater.create_destination(DummyRequest()) self.storage.create.assert_any_call( collection_id='bucket', parent_id='', @@ -167,7 +175,7 @@ def test_create_destination_creates_bucket(self): def test_create_destination_creates_collection(self): bucket_id = '/buckets/destbucket' - self.updater.create_destination() + self.updater.create_destination(DummyRequest()) self.storage.create.assert_any_call( collection_id='collection', parent_id=bucket_id, @@ -175,7 +183,8 @@ def test_create_destination_creates_collection(self): def test_ensure_resource_exists_handles_uniticy_errors(self): self.storage.create.side_effect = UnicityError('id', 'record') - self.updater._ensure_resource_exists('bucket', '', 'abcd') + self.updater._ensure_resource_exists('bucket', '', 'abcd', + DummyRequest()) def test_sign_and_update_destination(self): records = [{'id': idx, 'foo': 'bar %s' % idx, 'last_modified': idx} @@ -186,7 +195,7 @@ def test_sign_and_update_destination(self): self.patch(self.updater, 'get_source_records', return_value=([], '0')) self.patch(self.updater, 'push_records_to_destination') self.patch(self.updater, 'set_destination_signature') - self.updater.sign_and_update_destination() + self.updater.sign_and_update_destination(DummyRequest()) assert self.updater.get_source_records.call_count == 1 assert self.updater.push_records_to_destination.call_count == 1 diff --git a/kinto_signer/updater.py b/kinto_signer/updater.py index f26960e0..24eef298 100644 --- a/kinto_signer/updater.py +++ b/kinto_signer/updater.py @@ -1,25 +1,8 @@ -from pyramid import httpexceptions - +from kinto.core.events import ACTIONS from kinto.core.utils import COMPARISON, build_request, instance_uri from kinto_signer.serializer import canonical_json from kinto.core.storage import Filter - - -def _invoke_subrequest(request, params): - subrequest = build_request(request, params) - subrequest.bound_data = request.bound_data # Contains resource events. - return request.invoke_subrequest(subrequest) - - -def _ensure_resource_exists(request, uri): - try: - _invoke_subrequest(request, { - 'method': 'PUT', - 'path': uri, - 'headers': {'If-None-Match': '*'} - }) - except httpexceptions.HTTPPreconditionFailed: - pass +from kinto.core.storage.exceptions import UnicityError, RecordNotFoundError class LocalUpdater(object): @@ -59,6 +42,18 @@ def _ensure_resource(resource): self.storage = storage self.permission = permission + # Define resource IDs. + + self.destination_bucket_uri = '/buckets/%s' % self.destination['bucket'] + self.destination_collection_uri = '/buckets/%s/collections/%s' % ( + self.destination['bucket'], + self.destination['collection']) + + self.source_bucket_uri = '/buckets/%s' % self.source['bucket'] + self.source_collection_uri = '/buckets/%s/collections/%s' % ( + self.source['bucket'], + self.source['collection']) + def sign_and_update_destination(self, request): """Sign the specified collection. @@ -85,25 +80,66 @@ def sign_and_update_destination(self, request): for event in request.get_resource_events()[before:]: request.registry.notify(event) + def _ensure_resource_exists(self, resource_type, parent_id, record_id, request): + try: + created = self.storage.create( + collection_id=resource_type, + parent_id=parent_id, + record={'id': record_id}) + except UnicityError: + created = None + return created + def create_destination(self, request): # Create the destination bucket/collection if they don't already exist. - bucket_uri = instance_uri(request, - 'bucket', - id=self.destination['bucket']) - _ensure_resource_exists(request, bucket_uri) - - collection_uri = instance_uri(request, - 'collection', - bucket_id=self.destination['bucket'], - id=self.destination['collection']) - _ensure_resource_exists(request, collection_uri) + bucket_name = self.destination['bucket'] + collection_name = self.destination['collection'] + + created = self._ensure_resource_exists('bucket', '', bucket_name, request) + if created: + # Current request is updating collection metadata. + # We need a fake request on destination records + matchdict = dict(id=self.destination['bucket']) + fakerequest = build_request(request, { + 'method': 'PUT', + 'path': self.destination_bucket_uri + }) + fakerequest.matchdict = matchdict + fakerequest.bound_data = request.bound_data + fakerequest.current_resource_name = "bucket" + fakerequest.notify_resource_event(parent_id='', + timestamp=created['last_modified'], + data=created, + action=ACTIONS.CREATE) + created = self._ensure_resource_exists( + 'collection', + self.destination_bucket_uri, + collection_name, + request) + if created: + # Current request is updating collection metadata. + # We need a fake request on destination records + matchdict = dict(bucket_id=self.destination['bucket'], + id=self.destination['collection']) + fakerequest = build_request(request, { + 'method': 'PUT', + 'path': self.destination_collection_uri + }) + fakerequest.matchdict = matchdict + fakerequest.bound_data = request.bound_data + fakerequest.current_resource_name = "collection" + fakerequest.notify_resource_event(parent_id=self.destination_bucket_uri, + timestamp=created['last_modified'], + data=created, + action=ACTIONS.CREATE) # Set the permissions on the destination collection. # With the current implementation, the destination is not writable by # anyone and readable by everyone. # https://github.com/Kinto/kinto-signer/issues/55 permissions = {'read': ("system.Everyone",)} - self.permission.replace_object_permissions(collection_uri, permissions) + self.permission.replace_object_permissions( + self.destination_collection_uri, permissions) def get_source_records(self, last_modified=None, include_deleted=False): # If last_modified was specified, only retrieve items since then. @@ -150,46 +186,132 @@ def push_records_to_destination(self, request): # Update the destination collection. for record in new_records: - uri = instance_uri(request, 'record', - bucket_id=self.destination['bucket'], - collection_id=self.destination['collection'], - id=record['id']) - if record.get('deleted', False): - uri += "?last_modified=%s" % record['last_modified'] try: - _invoke_subrequest(request, { + deleted = self.storage.delete( + parent_id=self.destination_collection_uri, + collection_id='record', + object_id=record['id'], + last_modified=record['last_modified'] + ) + # Current request is updating collection metadata. + # We need a fake request on destination records + matchdict = dict(bucket_id=self.destination['bucket'], + collection_id=self.destination['collection'], + id=record['id']) + record_uri = instance_uri(request, + 'record', + **matchdict) + fakerequest = build_request(request, { 'method': 'DELETE', - 'path': uri + 'path': record_uri }) - except httpexceptions.HTTPNotFound: + fakerequest.matchdict = matchdict + fakerequest.bound_data = request.bound_data + fakerequest.current_resource_name = "record" + fakerequest.notify_resource_event(parent_id=self.destination_collection_uri, + timestamp=deleted['last_modified'], + data=deleted, + action=ACTIONS.DELETE) + except RecordNotFoundError: # If the record doesn't exists in the destination # we are good and can ignore it. pass else: - _invoke_subrequest(request, { + updated = self.storage.update( + parent_id=self.destination_collection_uri, + collection_id='record', + object_id=record['id'], + record=record) + # Current request is updating collection metadata. + # We need a fake request on destination records + matchdict = dict(bucket_id=self.destination['bucket'], + collection_id=self.destination['collection'], + id=record['id']) + record_uri = instance_uri(request, + 'record', + **matchdict) + fakerequest = build_request(request, { 'method': 'PUT', - 'path': uri, - 'body': {'data': record} + 'path': record_uri }) + fakerequest.matchdict = matchdict + fakerequest.bound_data = request.bound_data + fakerequest.current_resource_name = "record" + fakerequest.notify_resource_event(parent_id=self.destination_collection_uri, + timestamp=updated['last_modified'], + data=updated, + action=ACTIONS.UPDATE) def set_destination_signature(self, signature, request): # Push the new signature to the destination collection. - uri = instance_uri(request, 'collection', - bucket_id=self.destination['bucket'], - id=self.destination['collection']) - _invoke_subrequest(request, { - 'method': 'PATCH', - 'path': uri, - 'body': {'data': {'signature': signature}} + parent_id = '/buckets/%s' % self.destination['bucket'] + collection_id = 'collection' + + collection_record = self.storage.get( + parent_id=parent_id, + collection_id=collection_id, + object_id=self.destination['collection']) + + # Update the collection_record + new_collection = dict(**collection_record) + new_collection.pop('last_modified', None) + new_collection['signature'] = signature + + updated = self.storage.update( + parent_id=parent_id, + collection_id=collection_id, + object_id=self.destination['collection'], + record=new_collection) + # Current request is updating collection metadata. + # We need a fake request on destination records + matchdict = dict(bucket_id=self.destination['bucket'], + id=self.destination['collection']) + fakerequest = build_request(request, { + 'method': 'PUT', + 'path': self.destination_collection_uri }) + fakerequest.matchdict = matchdict + fakerequest.bound_data = request.bound_data + fakerequest.current_resource_name = "collection" + fakerequest.notify_resource_event(parent_id=self.destination_bucket_uri, + timestamp=updated['last_modified'], + data=updated, + action=ACTIONS.UPDATE, + old=collection_record) def update_source_status(self, status, request): - uri = instance_uri(request, 'collection', - bucket_id=self.source['bucket'], - id=self.source['collection']) - _invoke_subrequest(request, { - 'method': 'PATCH', - 'path': uri, - 'body': {'data': {'status': 'signed'}} + parent_id = '/buckets/%s' % self.source['bucket'] + collection_id = 'collection' + + collection_record = self.storage.get( + parent_id=parent_id, + collection_id=collection_id, + object_id=self.source['collection']) + + # Update the collection_record + new_collection = dict(**collection_record) + new_collection.pop('last_modified', None) + new_collection['status'] = status + + updated = self.storage.update( + parent_id=parent_id, + collection_id=collection_id, + object_id=self.source['collection'], + record=new_collection) + # Current request is updating collection metadata. + # We need a fake request on destination records + matchdict = dict(bucket_id=self.source['bucket'], + id=self.source['collection']) + fakerequest = build_request(request, { + 'method': 'PUT', + 'path': self.source_collection_uri }) + fakerequest.matchdict = matchdict + fakerequest.bound_data = request.bound_data + fakerequest.current_resource_name = "collection" + fakerequest.notify_resource_event(parent_id=self.source_bucket_uri, + timestamp=updated['last_modified'], + data=updated, + action=ACTIONS.UPDATE, + old=collection_record)