diff --git a/CHANGES.txt b/CHANGES.txt index 3302b3202..d8ddbbe81 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -2,6 +2,9 @@ Changes for crate ================= + - drop server from active list if it responds + a valid 50x error and retry with next server + - use bulk_args in executemany to increase performance: With crate server >= 0.42.0 executemany uses bulk_args and returns a list of results. diff --git a/src/crate/client/http.py b/src/crate/client/http.py index 2c2d9811b..bc7b4d9f1 100644 --- a/src/crate/client/http.py +++ b/src/crate/client/http.py @@ -288,6 +288,10 @@ def _request(self, method, path, server=None, **kwargs): self._add_server(redirect_server) return self._request( method, path, server=redirect_server, **kwargs) + if not server and (500 <= response.status < 600): + with self._lock: + # drop server from active ones + self._drop_server(next_server, response.reason) return response except (urllib3.exceptions.MaxRetryError, urllib3.exceptions.ReadTimeoutError, @@ -295,18 +299,14 @@ def _request(self, method, path, server=None, **kwargs): urllib3.exceptions.HTTPError, urllib3.exceptions.ProxyError, ) as ex: - # drop server from active ones ex_message = hasattr(ex, 'message') and ex.message or str(ex) if server: raise ConnectionError( "Server not available, exception: %s" % ex_message ) - self._drop_server(next_server, ex_message) - # if this is the last server raise exception, otherwise try next - if not self._active_servers: - raise ConnectionError( - ("No more Servers available, " - "exception from last server: %s") % ex_message) + with self._lock: + # drop server from active ones + self._drop_server(next_server, ex_message) except Exception as e: ex_message = hasattr(e, 'message') and e.message or str(e) raise ProgrammingError(ex_message) @@ -409,15 +409,19 @@ def _drop_server(self, server, message): """ Drop server from active list and adds it to the inactive ones. """ - with self._lock: - try: - self._active_servers.remove(server) - except ValueError: - pass - else: - heapq.heappush(self._inactive_servers, (time(), server, - message)) - logger.warning("Removed server %s from active pool", server) + try: + self._active_servers.remove(server) + except ValueError: + pass + else: + heapq.heappush(self._inactive_servers, (time(), server, message)) + logger.warning("Removed server %s from active pool", server) + + # if this is the last server raise exception, otherwise try next + if not self._active_servers: + raise ConnectionError( + ("No more Servers available, " + "exception from last server: %s") % message) def _roundrobin(self): """ diff --git a/src/crate/client/test_http.py b/src/crate/client/test_http.py index 34ef66563..312653a5e 100644 --- a/src/crate/client/test_http.py +++ b/src/crate/client/test_http.py @@ -84,6 +84,33 @@ class FakeServerServiceUnavailable(FakeServerErrorResponse): reason = "Service Unavailable" +class FakeServer50xResponse(FakeServerErrorResponse): + + counter = 0 + STATI = [200, 503] + REASONS = ["Success", "Service Unavailable"] + + _status = 200 + _reason = "Success" + + @property + def status(self): + return self._status + + @property + def reason(self): + return self._reason + + def request(self, method, path, data=None, stream=False, **kwargs): + self._reason = self.REASONS[self.counter%2] + self._status = self.STATI[self.counter%2] + self.counter += 1 + mock_response = MagicMock() + mock_response.status = self._status + mock_response.reason = self._reason + mock_response.headers = {"content-type": self.content_type} + return mock_response + class FakeServerUnauthorized(FakeServerErrorResponse): status = 401 @@ -166,6 +193,17 @@ def test_programming_error_contains_http_error_response_content(self): else: self.assertTrue(False) + @patch('crate.client.http.Server', FakeServer50xResponse) + def test_server_error_50x(self): + client = Client(servers="localhost:4200") + client.sql('select 1') + try: + client.sql('select 2') + except ProgrammingError as e: + self.assertEqual("No more Servers available, exception from last server: Service Unavailable", + e.message) + self.assertEqual([], list(client._active_servers)) + def test_connect(self): client = Client(servers="localhost:4200 localhost:4201") self.assertEqual(client._active_servers,