From f14a6b74e2df5e40939ff34e5429e3bbc751af8c Mon Sep 17 00:00:00 2001 From: Daniel Brotsky Date: Sun, 26 Mar 2017 23:54:57 -0700 Subject: [PATCH 1/2] Accept the user-sync key names in the authentication dictionary. Since User sync uses the names "tech_acct" and "priv_key_file" rather than "tech_acct_id" and "private_key_file", we accept those as well so clients can use the User Sync config files directly. --- HISTORY.md | 7 +++++++ umapi_client/connection.py | 2 ++ 2 files changed, 9 insertions(+) diff --git a/HISTORY.md b/HISTORY.md index 31d9b5b2..8d852ed5 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -42,3 +42,10 @@ Server-compatibility release: * fix typos in docs * fix param documentation in functional API * update wire protocol for remove_from_organization with deletion of account to match server changes + +### Version TBD + +Enhancement release: + +* (No Issue) + * allow User Sync config key names in the connection `auth_dict` diff --git a/umapi_client/connection.py b/umapi_client/connection.py index 43faae5b..a36c6481 100644 --- a/umapi_client/connection.py +++ b/umapi_client/connection.py @@ -123,6 +123,8 @@ def __init__(self, def _get_auth(self, ims_host, ims_endpoint_jwt, tech_acct_id=None, api_key=None, client_secret=None, private_key_file=None, **kwargs): + tech_acct_id = tech_acct_id or kwargs.get("tech_acct") + private_key_file = private_key_file or kwargs.get("priv_key_path") if not (tech_acct_id and api_key and client_secret and private_key_file): raise ValueError("Connector create: not all required auth parameters were supplied; please see docs") with open(private_key_file, 'r') as private_key_stream: From 4a7a740beb9ec1591b1d8a31a26b86fc963ca09b Mon Sep 17 00:00:00 2001 From: Daniel Brotsky Date: Wed, 29 Mar 2017 01:29:53 -0700 Subject: [PATCH 2/2] Fix #36: handle server and client errors in batches gracefully * catch errors during batch processing * return a new BatchError that has caught exceptions and batch statistics --- HISTORY.md | 5 ++++- tests/test_actions.py | 18 +++++---------- umapi_client/__init__.py | 2 +- umapi_client/connection.py | 46 +++++++++++++++++++------------------- umapi_client/error.py | 9 ++++++++ umapi_client/version.py | 2 +- 6 files changed, 44 insertions(+), 38 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 8d852ed5..0ef3a423 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -43,9 +43,12 @@ Server-compatibility release: * fix param documentation in functional API * update wire protocol for remove_from_organization with deletion of account to match server changes -### Version TBD +### Version 2.2 Enhancement release: +* [Issue 36](https://github.com/adobe-apiplatform/umapi-client.py/issues/36) + * catch errors during batch processing + * return a new BatchError that has caught exceptions and batch statistics * (No Issue) * allow User Sync config key names in the connection `auth_dict` diff --git a/tests/test_actions.py b/tests/test_actions.py index c932a77b..da9773ba 100644 --- a/tests/test_actions.py +++ b/tests/test_actions.py @@ -22,9 +22,9 @@ import mock import pytest - from conftest import mock_connection_params, MockResponse -from umapi_client import Connection, Action, ServerError + +from umapi_client import Connection, Action, BatchError def test_action_create(): @@ -292,7 +292,9 @@ def test_execute_multiple_single_queued_throttle_actions(): def test_execute_multiple_queued_throttle_actions_error(): with mock.patch("umapi_client.connection.requests.Session.post") as mock_post: - mock_post.return_value = MockResponse(500) + mock_post.side_effect = [MockResponse(500), + MockResponse(200, {"result": "success"}), + MockResponse(200, {"result": "success"})] conn = Connection(throttle_actions=2, **mock_connection_params) action0 = Action(top="top0").append(a="a0") action1 = Action(top="top1").append(a="a1") @@ -300,17 +302,9 @@ def test_execute_multiple_queued_throttle_actions_error(): action3 = Action(top="top3").append(a="a3") action4 = Action(top="top4").append(a="a4") action5 = Action(top="top5").append(a="a5") - pytest.raises(ServerError, conn.execute_multiple, + pytest.raises(BatchError, conn.execute_multiple, [action0, action1, action2, action3, action4, action5], immediate=False) local_status, _ = conn.status(remote=False) - assert local_status == {"multiple-query-count": 0, - "single-query-count": 0, - "actions-sent": 2, - "actions-completed": 0, - "actions-queued": 4} - mock_post.return_value = MockResponse(200, {"result": "success"}) - assert conn.execute_queued() == (0, 4, 4) - local_status, _ = conn.status(remote=False) assert local_status == {"multiple-query-count": 0, "single-query-count": 0, "actions-sent": 6, diff --git a/umapi_client/__init__.py b/umapi_client/__init__.py index c4419a78..3d0c84cd 100644 --- a/umapi_client/__init__.py +++ b/umapi_client/__init__.py @@ -20,7 +20,7 @@ from .connection import Connection from .api import Action, QuerySingle, QueryMultiple -from .error import ClientError, RequestError, ServerError, UnavailableError +from .error import BatchError, ClientError, RequestError, ServerError, UnavailableError from .functional import IdentityTypes, GroupTypes, RoleTypes, IfAlreadyExistsOptions from .functional import UserAction, UserQuery, UsersQuery from .functional import UserGroupAction, GroupsQuery diff --git a/umapi_client/connection.py b/umapi_client/connection.py index a36c6481..8a018a94 100644 --- a/umapi_client/connection.py +++ b/umapi_client/connection.py @@ -29,7 +29,7 @@ import six.moves.urllib.parse as urlparse from .auth import JWT, Auth, AccessRequest -from .error import UnavailableError, ClientError, RequestError, ServerError +from .error import BatchError, UnavailableError, ClientError, RequestError, ServerError from .version import __version__ as umapi_version @@ -285,6 +285,10 @@ def execute_multiple(self, actions, immediate=True): NOTE: This is where we throttle the number of commands per action. So the number of actions we were given may not be the same as the number we queue or send to the server. + + NOTE: If the server gives us a response we don't understand, we note that and continue + processing as usual. Then, at the end of the batch, we throw in order to warn the client + that we had a problem understanding the server. :param actions: the list of Action objects to be executed :param immediate: whether to immediately send them to the server @@ -292,6 +296,7 @@ def execute_multiple(self, actions, immediate=True): """ # throttling part 1: split up each action into smaller actions, as needed split_actions = [] + exceptions = [] for a in actions: if len(a.commands) == 0: if self.logger: self.logger.warning("Sending action with no commands: %s", a.frame) @@ -303,29 +308,24 @@ def execute_multiple(self, actions, immediate=True): split_actions.append(a) actions = self.action_queue + split_actions # throttling part 2: execute the action list in batches, as needed - sent = completed = last_batch_sent = last_batch_completed = 0 - try: - while len(actions) >= self.throttle_actions: - batch, actions = actions[0:self.throttle_actions], actions[self.throttle_actions:] - if self.logger: self.logger.debug("Executing %d actions (%d remaining).", len(batch), len(actions)) - sent += len(batch) - completed += self._execute_batch(batch) - finally: - self.action_queue = actions - self.local_status["actions-queued"] = len(actions) - self.local_status["actions-sent"] += sent - self.local_status["actions-completed"] += completed - # there may be actions left over - if actions and immediate: + sent = completed = 0 + batch_size = self.throttle_actions + min_size = 1 if immediate else batch_size + while len(actions) >= min_size: + batch, actions = actions[0:batch_size], actions[batch_size:] + if self.logger: self.logger.debug("Executing %d actions (%d remaining).", len(batch), len(actions)) + sent += len(batch) try: - last_batch_sent = len(actions) - last_batch_completed += self._execute_batch(actions) - finally: - self.action_queue = [] - self.local_status["actions-queued"] = 0 - self.local_status["actions-sent"] += last_batch_sent - self.local_status["actions-completed"] += last_batch_completed - return len(self.action_queue), sent + last_batch_sent, completed + last_batch_completed + completed += self._execute_batch(batch) + except Exception as e: + exceptions.append(e) + self.action_queue = actions + self.local_status["actions-queued"] = queued = len(actions) + self.local_status["actions-sent"] += sent + self.local_status["actions-completed"] += completed + if exceptions: + raise BatchError(exceptions, queued, sent, completed) + return queued, sent, completed def _execute_batch(self, actions): """ diff --git a/umapi_client/error.py b/umapi_client/error.py index 1446857f..1f8aff22 100644 --- a/umapi_client/error.py +++ b/umapi_client/error.py @@ -43,3 +43,12 @@ class ClientError(Exception): def __init__(self, message, result): Exception.__init__(self, "Server response not understood: " + message) self.result = result + + +class BatchError(Exception): + def __init__(self, causes, queued, sent, completed): + prefix = "Exception{} during batch processing: ".format("s" if len(causes) > 1 else "") + tail = ", ".join([str(cause) for cause in causes]) + Exception.__init__(self, prefix + tail) + self.causes = causes + self.statistics = (queued, sent, completed) diff --git a/umapi_client/version.py b/umapi_client/version.py index 9992bba5..9b889c90 100644 --- a/umapi_client/version.py +++ b/umapi_client/version.py @@ -18,4 +18,4 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. -__version__ = "2.1" +__version__ = "2.2"