Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replication #26

Merged
merged 10 commits into from Feb 1, 2016
59 changes: 34 additions & 25 deletions kinto_client/__init__.py
Expand Up @@ -117,8 +117,12 @@ def request(self, method, endpoint, data=None, permissions=None,
exception.response = resp
raise exception

if resp.status_code == 304:
body = None
else:
body = resp.json()
# XXX Add the status code.
return resp.json(), resp.headers
return body, resp.headers


class Client(object):
Expand Down Expand Up @@ -166,35 +170,31 @@ def _get_endpoint(self, name, bucket=None, collection=None, id=None):
}
return self.endpoints.get(name, **kwargs)

def _paginated(self, endpoint, records=None, visited=None):
def _paginated(self, endpoint, records=None, if_none_match=None, **kwargs):
if records is None:
records = collections.OrderedDict()
if visited is None:
visited = set()

record_resp, headers = self.session.request('get', endpoint)
records.update(collections.OrderedDict(
[(r['id'], r) for r in record_resp['data']]))

visited.add(endpoint)

if 'next-page' in map(str.lower, headers.keys()):
# Paginated wants a relative URL, but the returned one is absolute.
next_page = headers['Next-Page']
# Due to bug mozilla-services/cliquet#366, check for recursion:
if next_page not in visited:
return self._paginated(next_page, records, visited)

headers = {}
if if_none_match is not None:
headers['If-None-Match'] = utils.quote(if_none_match)

record_resp, headers = self.session.request(
'get', endpoint, headers=headers, params=kwargs)
if record_resp:
records_tuples = [(r['id'], r) for r in record_resp['data']]
records.update(collections.OrderedDict(records_tuples))

if 'next-page' in map(str.lower, headers.keys()):
# Paginated wants a relative URL, but the returned one is
# absolute.
next_page = headers['Next-Page']
return self._paginated(next_page, records,
if_none_match=if_none_match)
return records.values()

def _get_cache_headers(self, safe, data=None, last_modified=None):
has_data = data is not None and data.get('last_modified')

if (last_modified is None and has_data):
# Drop the last_modified field since it will be dropped on the
# server anyway.
last_modified = data.pop('last_modified')

last_modified = data['last_modified']
if safe and last_modified is not None:
return {'If-Match': utils.quote(last_modified)}
# else return None
Expand Down Expand Up @@ -277,11 +277,11 @@ def delete_collection(self, collection=None, bucket=None,

# Records

def get_records(self, collection=None, bucket=None):
def get_records(self, collection=None, bucket=None, **kwargs):
"""Returns all the records"""
# XXX Add filter and sorting.
endpoint = self._get_endpoint('records', bucket, collection)
return self._paginated(endpoint)
return self._paginated(endpoint, **kwargs)

def get_record(self, id, collection=None, bucket=None):
endpoint = self._get_endpoint('record', bucket, collection, id)
Expand Down Expand Up @@ -328,3 +328,12 @@ def delete_record(self, id, collection=None, bucket=None,
def delete_records(self, records):
# XXX To be done with a BATCH operation
pass

def __repr__(self):
endpoint = self._get_endpoint(
'collection',
self._bucket_name,
self._collection_name
)
absolute_endpoint = utils.urljoin(self.session.server_url, endpoint)
return "<KintoClient %s>" % absolute_endpoint
6 changes: 4 additions & 2 deletions kinto_client/batch.py
@@ -1,4 +1,5 @@
from . import utils
from collections import defaultdict


class Batch(object):
Expand All @@ -15,7 +16,7 @@ def request(self, method, endpoint, data=None, permissions=None,
# is called.
self.requests.append((method, endpoint, data, permissions, headers))
# This is the signature of the session request.
return None, None
return defaultdict(dict), defaultdict(dict)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this conflicts with #34. I prefer the other approach, returning constants for batched responses here.


def reset(self):
# Reinitialize the batch.
Expand All @@ -24,9 +25,10 @@ def reset(self):
def _build_requests(self):
requests = []
for (method, url, data, permissions, headers) in self.requests:
# Strip the prefix in batch requests.
request = {
'method': method.upper(),
'path': url}
'path': url.replace('v1/', '')}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you have a test somewhere for this one?


request['body'] = {}
if data is not None:
Expand Down
10 changes: 7 additions & 3 deletions kinto_client/exceptions.py
Expand Up @@ -3,8 +3,12 @@ class KintoException(Exception):


class BucketNotFound(KintoException):
def __init__(self, message, exception):
def __init__(self, message=None, exception=None):
super(BucketNotFound, self).__init__(self, message)
self.message = message
self.request = exception.request
self.response = exception.response
if exception is not None:
self.request = exception.request
self.response = exception.response
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't provide default value for those attributes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea :)

else:
self.request = None
self.response = None
110 changes: 110 additions & 0 deletions kinto_client/replication.py
@@ -0,0 +1,110 @@
import argparse
import logging

from kinto_client import Client
from kinto_client import exceptions

logger = logging.getLogger(__name__)


def replicate(origin, destination):
"""Replicates records from one collection to another one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could maybe add a note to specify that it replicates all records and not just the ones that changed.

All records are replicated, not only the ones that changed.
"""
msg = 'Replication from {0} to {1}'.format(origin, destination)
logger.info(msg)

try:
destination.get_bucket()
except exceptions.BucketNotFound:
destination.create_bucket()
try:
destination.get_collection()
except exceptions.KintoException:
collection_data = origin.get_collection()
destination.create_collection(
data=collection_data['data'],
permissions=collection_data['permissions'], safe=False)

records = origin.get_records()
logger.info('replication of {0} records'.format(len(records)))
with destination.batch() as batch:
for record in records:
if record.get('deleted', False) is True:
batch.delete_record(record['id'],
last_modified=record['last_modified'])
else:
batch.update_record(data=record, safe=False)


def get_arguments(): # pragma: nocover
description = 'Migrate data from one kinto instance to another one.'
parser = argparse.ArgumentParser(description=description)
parser.add_argument('origin_server',
help='The location of the origin server (with prefix)')
parser.add_argument('destination_server',
help=('The location of the destination server '
'(with prefix)'))
parser.add_argument('bucket', help='The name of the bucket')
parser.add_argument('collection', help='The name of the collection')

# Auth: XXX improve later. For now only support Basic Auth.
parser.add_argument('-a', '--auth', dest='auth',
help='Authentication, in the form "username:password"')

# Optional arguments. They will be derivated from the "bucket"
# and "collection" ones.
parser.add_argument('--destination-bucket', dest='destination_bucket',
help='The name of the destination bucket',
default=None)
parser.add_argument('--destination-collection',
dest='destination_collection',
help='The name of the destination bucket',
default=None)

# Defaults
parser.add_argument('-v', '--verbose', action='store_const',
const=logging.INFO, dest='verbosity',
help='Show all messages.')

parser.add_argument('-q', '--quiet', action='store_const',
const=logging.CRITICAL, dest='verbosity',
help='Show only critical errors.')

parser.add_argument('-D', '--debug', action='store_const',
const=logging.DEBUG, dest='verbosity',
help='Show all messages, including debug messages.')
return parser.parse_args()


def setup_logger(args): # pragma: nocover
logger.addHandler(logging.StreamHandler())
if args.verbosity:
logger.setLevel(args.verbosity)


def main(): # pragma: nocover
args = get_arguments()
setup_logger(args)

auth = tuple(args.auth.split(':')) if args.auth else None

origin = Client(
server_url=args.origin_server,
auth=auth,
bucket=args.bucket,
collection=args.collection
)
destination = Client(
server_url=args.destination_server,
auth=auth,
bucket=args.destination_bucket or args.bucket,
collection=args.destination_collection or args.collection
)

replicate(origin, destination)


if __name__ == "__main__": # pragma: nocover
main()
27 changes: 27 additions & 0 deletions kinto_client/tests/functional.py
Expand Up @@ -8,6 +8,7 @@
from cliquet import utils as cliquet_utils

from kinto_client import Client, BucketNotFound, KintoException
from kinto_client import replication

__HERE__ = os.path.abspath(os.path.dirname(__file__))

Expand Down Expand Up @@ -251,6 +252,32 @@ def test_request_batching(self):
records = self.client.get_records(bucket='mozilla', collection='fonts')
assert len(records) == 2

def test_replication(self):
# First, create a few records on the first kinto collection.
with self.client.batch(bucket='origin', collection='coll') as batch:
batch.create_bucket()
batch.create_collection()

for n in range(10):
batch.create_record(data={'foo': 'bar', 'n': n})

origin = Client(
server_url=self.server_url,
auth=self.auth,
bucket='origin',
collection='coll'
)
destination = Client(
server_url=self.server_url,
auth=self.auth,
bucket='destination',
collection='coll')

replication.replicate(origin, destination)
records = self.client.get_records(bucket='destination',
collection='coll')
assert len(records) == 10


if __name__ == '__main__':
unittest2.main()
9 changes: 9 additions & 0 deletions kinto_client/tests/test_batch.py
Expand Up @@ -84,3 +84,12 @@ def test_reset_empties_the_requests_cache(self):
assert len(batch.requests) == 1
batch.reset()
assert len(batch.requests) == 0

def test_prefix_is_removed_from_batch_requests(self):
batch = Batch(self.client)
batch.request('GET', '/v1/foobar')
batch.send()

calls = self.client.session.request.call_args_list
_, kwargs1 = calls[0]
assert kwargs1['payload']['requests'][0]['path'] == '/foobar'
40 changes: 38 additions & 2 deletions kinto_client/tests/test_client.py
Expand Up @@ -65,6 +65,16 @@ def test_batch_raises_exception_if_subrequest_failed(self):
batch.create_record(id=1234, data={'foo': 'bar'})
batch.create_record(id=5678, data={'tutu': 'toto'})

def test_client_is_represented_properly(self):
client = Client(
server_url="https://kinto.notmyidea.org/v1",
bucket="homebrewing",
collection="recipes"
)
expected_repr = ("<KintoClient https://kinto.notmyidea.org/v1/"
"buckets/homebrewing/collections/recipes>")
assert str(client) == expected_repr


class BucketTest(unittest.TestCase):

Expand All @@ -87,7 +97,7 @@ def test_patch_is_issued_on_update(self):
self.session.request.assert_called_with(
'patch',
'/buckets/testbucket',
data={'foo': 'bar'},
data={'foo': 'bar', 'last_modified': '1234'},
permissions={'read': ['natim']},
headers={'If-Match': '"1234"'})

Expand Down Expand Up @@ -227,7 +237,7 @@ def test_collection_update_use_an_if_match_header(self):

url = '/buckets/mybucket/collections/mycollection'
self.session.request.assert_called_with(
'put', url, data={'foo': 'bar'},
'put', url, data={'foo': 'bar', 'last_modified': '1234'},
permissions=mock.sentinel.permissions,
headers={'If-Match': '"1234"'})

Expand Down Expand Up @@ -444,6 +454,32 @@ def test_pagination_is_followed(self):
{'id': '4', 'value': 'item4'},
]

def test_pagination_supports_if_none_match(self):
link = ('http://example.org/buckets/buck/collections/coll/records/'
'?token=1234')

self.session.request.side_effect = [
# First one returns a list of items with a pagination token.
build_response(
[{'id': '1', 'value': 'item1'},
{'id': '2', 'value': 'item2'}, ],
{'Next-Page': link}),
# Second one returns a list of items without a pagination token.
build_response(
[{'id': '3', 'value': 'item3'},
{'id': '4', 'value': 'item4'}, ],
),
]
self.client.get_records('bucket', 'collection',
if_none_match="1234")

# Check that the If-None-Match header is present in the requests.
self.session.request.assert_any_call(
'get', '/buckets/collection/collections/bucket/records',
headers={'If-None-Match': '"1234"'}, params={})
self.session.request.assert_any_call(
'get', link, headers={'If-None-Match': '"1234"'}, params={})

def test_collection_can_delete_a_record(self):
mock_response(self.session, data={'id': 1234})
resp = self.client.delete_record(id=1234)
Expand Down