Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry batch requests (fixes #39) #51

Merged
merged 11 commits into from Feb 10, 2016
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 28 additions & 1 deletion README.rst
Expand Up @@ -211,12 +211,39 @@ It is possible to do batch requests using a Python context manager (``with``):
.. code-block:: python

with client.batch() as batch:
for idx in range(0,100):
for idx in range(0,100):
batch.update_record(data={'id': idx})

A batch object shares the same methods as another client.


Retry on error
--------------

When the server is throttled, under heavy load, or maintenance, it can
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider When the server is throttled (under heavy load or maintenance), it can return error responses.

return error responses.

The client can hence retry to send the same request until it succeeds.
To enable this, specify the number of retries on the client:

.. code-block:: python

client = Client(server_url='http://localhost:8888/v1',
auth=credentials,
retry=10)

In the Kinto protocol, it is specified that the server `tells the duration in seconds between retries
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"The Kinto protocol let the server define the duration between retries"

<http://kinto.readthedocs.org/en/latest/api/1.x/cliquet/backoff.html#retry-after-indicators>`_.
It is possible to force this value:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible (but not recommended) to force this value in the client:


.. code-block:: python

client = Client(server_url='http://localhost:8888/v1',
auth=credentials,
retry=10,
retry_after=5)


Run tests
=========

Expand Down
90 changes: 12 additions & 78 deletions kinto_client/__init__.py
@@ -1,13 +1,12 @@
import collections
import requests
import uuid
from six import iteritems
from six.moves.urllib.parse import urlparse

from contextlib import contextmanager


from kinto_client import utils
from kinto_client.session import create_session, Session
from kinto_client.batch import Batch
from kinto_client.exceptions import BucketNotFound, KintoException

Expand All @@ -27,31 +26,6 @@
DO_NOT_OVERWRITE = {'If-None-Match': '*'}


def create_session(server_url=None, auth=None, session=None):
"""Returns a session from the passed arguments.

:param server_url:
The URL of the server to use, with the prefix.
:param auth:
A requests authentication policy object.
:param session:
An optional session object to use, rather than creating a new one.
"""
# XXX Refactor the create_session to take place in the caller objects.
# E.g. test if the session exists before calling create_session.
if session is not None and (
server_url is not None or auth is not None):
msg = ("You cannot specify session and server_url or auth. "
"Chose either session or (auth + server_url).")
raise AttributeError(msg)
if session is None and server_url is None and auth is None:
msg = ("You need to either set session or auth + server_url")
raise AttributeError(msg)
if session is None:
session = Session(server_url=server_url, auth=auth)
return session


class Endpoints(object):
endpoints = {
'root': '{root}/',
Expand Down Expand Up @@ -81,66 +55,26 @@ def get(self, endpoint, **kwargs):
field=','.join(e.args)))


class Session(object):
"""Handles all the interactions with the network.
"""
def __init__(self, server_url, auth=None):
self.server_url = server_url
self.auth = auth

def request(self, method, endpoint, data=None, permissions=None,
payload=None, **kwargs):
parsed = urlparse(endpoint)
if not parsed.scheme:
actual_url = utils.urljoin(self.server_url, endpoint)
else:
actual_url = endpoint

if self.auth is not None:
kwargs.setdefault('auth', self.auth)

payload = payload or {}
# if data is not None:
payload['data'] = data or {}
if permissions is not None:
if hasattr(permissions, 'as_dict'):
permissions = permissions.as_dict()
payload['permissions'] = permissions
if payload:
payload_kwarg = 'data' if 'files' in kwargs else 'json'
kwargs.setdefault(payload_kwarg, payload)
resp = requests.request(method, actual_url, **kwargs)
if not (200 <= resp.status_code < 400):
message = '{0} - {1}'.format(resp.status_code, resp.json())
exception = KintoException(message)
exception.request = resp.request
exception.response = resp
raise exception

if resp.status_code == 304:
body = None
else:
body = resp.json()
# XXX Add the status code.
return body, resp.headers


class Client(object):

def __init__(self, server_url=None, session=None, auth=None,
bucket="default", collection=None):
bucket="default", collection=None, retry=0, retry_after=None):
self.endpoints = Endpoints()
self.session = create_session(server_url, auth, session)
self.session_kwargs = dict(server_url=server_url,
auth=auth,
session=session,
retry=retry,
retry_after=retry_after)
self.session = create_session(**self.session_kwargs)
self._bucket_name = bucket
self._collection_name = collection
self._server_settings = None

def clone(self, **kwargs):
return Client(**{
'session': kwargs.get('session', self.session),
'bucket': kwargs.get('bucket', self._bucket_name),
'collection': kwargs.get('collection', self._collection_name),
})
kwargs.setdefault('session', self.session)
kwargs.setdefault('bucket', self._bucket_name)
kwargs.setdefault('collection', self._collection_name)
return Client(**kwargs)

@contextmanager
def batch(self, **kwargs):
Expand Down
9 changes: 4 additions & 5 deletions kinto_client/batch.py
Expand Up @@ -44,10 +44,9 @@ def send(self):
result = []
requests = self._build_requests()
for chunk in utils.chunks(requests, self.batch_max_requests):
resp, headers = self.session.request(
'POST',
self.endpoints.get('batch'),
payload={'requests': chunk}
)
kwargs = dict(method='POST',
endpoint=self.endpoints.get('batch'),
payload={'requests': chunk})
resp, headers = self.session.request(**kwargs)
result.append((resp, headers))
return result
95 changes: 95 additions & 0 deletions kinto_client/session.py
@@ -0,0 +1,95 @@
import time

import requests
from six.moves.urllib.parse import urlparse

from kinto_client import utils
from kinto_client.exceptions import KintoException


def create_session(server_url=None, auth=None, session=None, retry=0,
retry_after=None):
"""Returns a session from the passed arguments.

:param server_url:
The URL of the server to use, with the prefix.
:param auth:
A requests authentication policy object.
:param session:
An optional session object to use, rather than creating a new one.
"""
# XXX Refactor the create_session to take place in the caller objects.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the purposes of this function was to return the session, rather than doing the check in all the callers. Seeing this note, I wonder what's the best approach.

# E.g. test if the session exists before calling create_session.
if session is not None and (
server_url is not None or auth is not None):
msg = ("You cannot specify session and server_url or auth. "
"Chose either session or (auth + server_url).")
raise AttributeError(msg)
if session is None and server_url is None and auth is None:
msg = ("You need to either set session or auth + server_url")
raise AttributeError(msg)
if session is None:
session = Session(server_url=server_url, auth=auth, retry=retry,
retry_after=retry_after)
return session


class Session(object):
"""Handles all the interactions with the network.
"""
def __init__(self, server_url, auth=None, retry=0, retry_after=None):
self.server_url = server_url
self.auth = auth
self.nb_retry = retry
self.retry_after = retry_after

def request(self, method, endpoint, data=None, permissions=None,
payload=None, **kwargs):
parsed = urlparse(endpoint)
if not parsed.scheme:
actual_url = utils.urljoin(self.server_url, endpoint)
else:
actual_url = endpoint

if self.auth is not None:
kwargs.setdefault('auth', self.auth)

payload = payload or {}
# if data is not None:
payload['data'] = data or {}
if permissions is not None:
if hasattr(permissions, 'as_dict'):
permissions = permissions.as_dict()
payload['permissions'] = permissions
if payload:
payload_kwarg = 'data' if 'files' in kwargs else 'json'
kwargs.setdefault(payload_kwarg, payload)

retry = self.nb_retry
while retry >= 0:
resp = requests.request(method, actual_url, **kwargs)
retry = retry - 1
if not (200 <= resp.status_code < 400):
if resp.status_code >= 500 and retry >= 0:
# Wait and try again.
# If not forced, use retry-after header and wait.
if self.retry_after is None:
retry_after = resp.headers.get("Retry-After", 0)
else:
retry_after = self.retry_after
time.sleep(retry_after)
continue

# Retries exhausted, raise expection.
message = '{0} - {1}'.format(resp.status_code, resp.json())
exception = KintoException(message)
exception.request = resp.request
exception.response = resp
raise exception

if resp.status_code == 304:
body = None
else:
body = resp.json()
# XXX Add the status code.
return body, resp.headers
12 changes: 6 additions & 6 deletions kinto_client/tests/test_batch.py
Expand Up @@ -23,8 +23,8 @@ def test_send_adds_data_attribute(self):
batch.send()

self.client.session.request.assert_called_with(
'POST',
self.client.endpoints.get('batch'),
method='POST',
endpoint=self.client.endpoints.get('batch'),
payload={'requests': [{
'method': 'GET',
'path': '/foobar/baz',
Expand All @@ -39,8 +39,8 @@ def test_send_adds_permissions_attribute(self):
batch.send()

self.client.session.request.assert_called_with(
'POST',
self.client.endpoints.get('batch'),
method='POST',
endpoint=self.client.endpoints.get('batch'),
payload={'requests': [{
'method': 'GET',
'path': '/foobar/baz',
Expand All @@ -54,8 +54,8 @@ def test_send_adds_headers_if_specified(self):
batch.send()

self.client.session.request.assert_called_with(
'POST',
self.client.endpoints.get('batch'),
method='POST',
endpoint=self.client.endpoints.get('batch'),
payload={'requests': [{
'method': 'GET',
'path': '/foobar/baz',
Expand Down
14 changes: 12 additions & 2 deletions kinto_client/tests/test_client.py
Expand Up @@ -22,8 +22,8 @@ def test_context_manager_works_as_expected(self):
batch.create_record(id=5678, data={'bar': 'baz'})

self.session.request.assert_called_with(
'POST',
'/batch',
method='POST',
endpoint='/batch',
payload={'requests': [
{'body': {'data': {'foo': 'bar'}},
'path': '/buckets/mozilla/collections/test/records/1234',
Expand Down Expand Up @@ -65,6 +65,16 @@ def test_batch_raises_exception_if_subrequest_failed(self):
batch.create_record(id=1234, data={'foo': 'bar'})
batch.create_record(id=5678, data={'tutu': 'toto'})

def test_batch_options_are_transmitted(self):
settings = {"batch_max_requests": 25}
self.session.request.side_effect = [({"settings": settings}, [])]
with mock.patch('kinto_client.create_session') as create_session:
with self.client.batch(bucket='moz', collection='test', retry=12,
retry_after=20):
_, last_call_kwargs = create_session.call_args_list[-1]
self.assertEqual(last_call_kwargs['retry'], 12)
self.assertEqual(last_call_kwargs['retry_after'], 20)

def test_client_is_represented_properly(self):
client = Client(
server_url="https://kinto.notmyidea.org/v1",
Expand Down