Skip to content
This repository has been archived by the owner on Jan 25, 2022. It is now read-only.

Commit

Permalink
Merge 52b1ba9 into 40736c5
Browse files Browse the repository at this point in the history
  • Loading branch information
leplatrem committed Jul 20, 2016
2 parents 40736c5 + 52b1ba9 commit 9740f85
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 88 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ This document describes changes between each past release.
- Update the `last_modified` value when updating the collection status and signature (#97)
- Prevents crash with events on ``default`` bucket on Kinto < 3.3

**New features**

- Trigger ``ResourceChanged`` events when the destination collection and records are updated
during signing. This allows plugins like ``kinto-changes`` and ``kinto.plugins.history``
to catch the changes.


0.7.0 (2016-06-28)
------------------
Expand Down
2 changes: 1 addition & 1 deletion kinto_signer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def on_collection_changed(event, resources):
destination=resource['destination'])

try:
updater.sign_and_update_destination()
updater.sign_and_update_destination(event.request)
except Exception:
logger.exception("Could not sign '{0}'".format(key))
event.request.response.status = 503
Expand Down
135 changes: 135 additions & 0 deletions kinto_signer/tests/test_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import os

from kinto.tests.support import BaseWebTest, unittest

here = os.path.abspath(os.path.dirname(__file__))


class Listener(object):
def __init__(self):
self.received = []

def __call__(self, event):
self.received.append(event)


listener = Listener()


def load_from_config(config, prefix):
return listener


class ResourceEventsTest(BaseWebTest, unittest.TestCase):
def get_app_settings(self, extra=None):
settings = super(ResourceEventsTest, self).get_app_settings(extra)

settings['storage_backend'] = 'kinto.core.storage.memory'
settings['cache_backend'] = 'kinto.core.cache.memory'
settings['permission_backend'] = 'kinto.core.permission.memory'

settings['includes'] = 'kinto_signer'
settings['signer.ecdsa.private_key'] = os.path.join(
here, 'config', 'ecdsa.private.pem')

self.source_collection = "/buckets/alice/collections/scid"
self.destination_collection = "/buckets/destination/collections/dcid"

settings['signer.resources'] = '%s;%s' % (
self.source_collection,
self.destination_collection)

settings['event_listeners'] = 'ks'
settings['event_listeners.ks.use'] = 'kinto_signer.tests.test_events'
return settings

def setUp(self):
super(ResourceEventsTest, self).setUp()
self.app.put_json("/buckets/alice", headers=self.headers)
self.app.put_json(self.source_collection, headers=self.headers)
self.app.post_json(self.source_collection + "/records",
{"data": {"title": "hello"}},
headers=self.headers)
self.app.post_json(self.source_collection + "/records",
{"data": {"title": "bonjour"}},
headers=self.headers)

def _sign(self):
self.app.patch_json(self.source_collection,
{"data": {"status": "to-sign"}},
headers=self.headers)

resp = self.app.get(self.source_collection, headers=self.headers)
data = resp.json["data"]
self.assertEqual(data["status"], "signed")

resp = self.app.get(self.destination_collection, headers=self.headers)
data = resp.json["data"]
self.assertIn("signature", data)

def test_resource_changed_is_triggered_for_destination_creation(self):
self._sign()
event = [e for e in listener.received
if e.payload["uri"] == "/buckets/destination"
and e.payload["action"] == "create"][0]
self.assertEqual(len(event.impacted_records), 1)

event = [e for e in listener.received
if e.payload["uri"] == self.destination_collection
and e.payload["action"] == "create"][0]
self.assertEqual(len(event.impacted_records), 1)

def test_resource_changed_is_triggered_for_source_collection(self):
before = len(listener.received)

self._sign()
event = [e for e in listener.received[before:]
if e.payload["resource_name"] == "collection"
and e.payload["collection_id"] == "scid"
and e.payload["action"] == "update"][0]
self.assertEqual(len(event.impacted_records), 2)
self.assertEqual(event.impacted_records[0]["new"]["status"], "to-sign")
self.assertEqual(event.impacted_records[1]["new"]["status"], "signed")

def test_resource_changed_is_triggered_for_destination_collection(self):
before = len(listener.received)

self._sign()
event = [e for e in listener.received[before:]
if e.payload["resource_name"] == "collection"
and e.payload.get("collection_id") == "dcid"
and e.payload["action"] == "update"][0]

self.assertEqual(len(event.impacted_records), 1)
self.assertNotEqual(event.impacted_records[0]["old"].get("signature"),
event.impacted_records[0]["new"]["signature"])

def test_resource_changed_is_triggered_for_destination_records(self):
before = len(listener.received)

self._sign()
events = [e for e in listener.received[before:]
if e.payload["resource_name"] == "record"
and e.payload["collection_id"] == "dcid"]

self.assertEqual(len(events), 1)
self.assertEqual(len(events[0].impacted_records), 2)

def test_resource_changed_is_triggered_for_destination_removal(self):
record_uri = self.source_collection + "/records/xyz"
self.app.put_json(record_uri,
{"data": {"title": "servus"}},
headers=self.headers)
self._sign()
self.app.delete(record_uri, headers=self.headers)

before = len(listener.received)
self._sign()

events = [e for e in listener.received[before:]
if e.payload["resource_name"] == "record"]

self.assertEqual(len(events), 1)
self.assertEqual(events[0].payload["action"], "delete")
self.assertEqual(events[0].payload["uri"],
self.destination_collection + "/records/xyz")
164 changes: 77 additions & 87 deletions kinto_signer/updater.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,25 @@
from kinto.core.utils import COMPARISON
from pyramid import httpexceptions

from kinto.core.utils import COMPARISON, build_request, instance_uri
from kinto_signer.serializer import canonical_json
from kinto.core.storage import Filter
from kinto.core.storage.exceptions import UnicityError, RecordNotFoundError


def _invoke_subrequest(request, params):
subrequest = build_request(request, params)
subrequest.bound_data = request.bound_data # Contains resource events.
return request.invoke_subrequest(subrequest)


def _ensure_resource_exists(request, uri):
try:
_invoke_subrequest(request, {
'method': 'PUT',
'path': uri,
'headers': {'If-None-Match': '*'}
})
except httpexceptions.HTTPPreconditionFailed:
pass


class LocalUpdater(object):
Expand Down Expand Up @@ -41,19 +59,7 @@ def _ensure_resource(resource):
self.storage = storage
self.permission = permission

# Define resource IDs.

self.destination_bucket_id = '/buckets/%s' % self.destination['bucket']
self.destination_collection_id = '/buckets/%s/collections/%s' % (
self.destination['bucket'],
self.destination['collection'])

self.source_bucket_id = '/buckets/%s' % self.source['bucket']
self.source_collection_id = '/buckets/%s/collections/%s' % (
self.source['bucket'],
self.source['collection'])

def sign_and_update_destination(self):
def sign_and_update_destination(self, request):
"""Sign the specified collection.
0. Create the destination bucket / collection
Expand All @@ -64,42 +70,40 @@ def sign_and_update_destination(self):
server
5. Send the signature to the Authoritative server.
"""
self.create_destination()
before = len(request.get_resource_events())

self.create_destination(request)
records, last_modified = self.get_source_records()
serialized_records = canonical_json(records, last_modified)
signature = self.signer.sign(serialized_records)

self.push_records_to_destination()
self.set_destination_signature(signature)
self.update_source_status("signed")
self.push_records_to_destination(request)
self.set_destination_signature(signature, request)
self.update_source_status("signed", request)

def _ensure_resource_exists(self, resource_type, parent_id, record_id):
try:
self.storage.create(
collection_id=resource_type,
parent_id=parent_id,
record={'id': record_id})
except UnicityError:
pass
# Re-trigger events from event listener \o/
for event in request.get_resource_events()[before:]:
request.registry.notify(event)

def create_destination(self):
def create_destination(self, request):
# Create the destination bucket/collection if they don't already exist.
bucket_name = self.destination['bucket']
collection_name = self.destination['collection']
bucket_uri = instance_uri(request,
'bucket',
id=self.destination['bucket'])
_ensure_resource_exists(request, bucket_uri)

self._ensure_resource_exists('bucket', '', bucket_name)
self._ensure_resource_exists(
'collection',
self.destination_bucket_id,
collection_name)
collection_uri = instance_uri(request,
'collection',
bucket_id=self.destination['bucket'],
id=self.destination['collection'])
_ensure_resource_exists(request, collection_uri)

# Set the permissions on the destination collection.
# With the current implementation, the destination is not writable by
# anyone and readable by everyone.
# https://github.com/Kinto/kinto-signer/issues/55
permissions = {'read': ("system.Everyone",)}
self.permission.replace_object_permissions(
self.destination_collection_id, permissions)
self.permission.replace_object_permissions(collection_uri, permissions)

def get_source_records(self, last_modified=None, include_deleted=False):
# If last_modified was specified, only retrieve items since then.
Expand Down Expand Up @@ -136,7 +140,7 @@ def get_destination_last_modified(self):

return collection_timestamp, records_count

def push_records_to_destination(self):
def push_records_to_destination(self, request):
last_modified, records_count = self.get_destination_last_modified()
if records_count == 0:
last_modified = None
Expand All @@ -146,60 +150,46 @@ def push_records_to_destination(self):

# Update the destination collection.
for record in new_records:
uri = instance_uri(request, 'record',
bucket_id=self.destination['bucket'],
collection_id=self.destination['collection'],
id=record['id'])

if record.get('deleted', False):
uri += "?last_modified=%s" % record['last_modified']
try:
self.storage.delete(
parent_id=self.destination_collection_id,
collection_id='record',
object_id=record['id'],
last_modified=record['last_modified']
)
except RecordNotFoundError:
_invoke_subrequest(request, {
'method': 'DELETE',
'path': uri
})
except httpexceptions.HTTPNotFound:
# If the record doesn't exists in the destination
# we are good and can ignore it.
pass
else:
self.storage.update(
parent_id=self.destination_collection_id,
collection_id='record',
object_id=record['id'],
record=record)
_invoke_subrequest(request, {
'method': 'PUT',
'path': uri,
'body': {'data': record}
})

def set_destination_signature(self, signature):
def set_destination_signature(self, signature, request):
# Push the new signature to the destination collection.
parent_id = '/buckets/%s' % self.destination['bucket']
collection_id = 'collection'

collection_record = self.storage.get(
parent_id=parent_id,
collection_id=collection_id,
object_id=self.destination['collection'])

# Update the collection_record
del collection_record['last_modified']
collection_record['signature'] = signature

self.storage.update(
parent_id=parent_id,
collection_id=collection_id,
object_id=self.destination['collection'],
record=collection_record)

def update_source_status(self, status):
parent_id = '/buckets/%s' % self.source['bucket']
collection_id = 'collection'

collection_record = self.storage.get(
parent_id=parent_id,
collection_id=collection_id,
object_id=self.source['collection'])

# Update the collection_record
del collection_record['last_modified']
collection_record['status'] = status

self.storage.update(
parent_id=parent_id,
collection_id=collection_id,
object_id=self.source['collection'],
record=collection_record)
uri = instance_uri(request, 'collection',
bucket_id=self.destination['bucket'],
id=self.destination['collection'])
_invoke_subrequest(request, {
'method': 'PATCH',
'path': uri,
'body': {'data': {'signature': signature}}
})

def update_source_status(self, status, request):
uri = instance_uri(request, 'collection',
bucket_id=self.source['bucket'],
id=self.source['collection'])
_invoke_subrequest(request, {
'method': 'PATCH',
'path': uri,
'body': {'data': {'status': 'signed'}}
})

0 comments on commit 9740f85

Please sign in to comment.