Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
Changes for crate
=================

- improved server failover / retry behaviour

- use bulk_args in executemany to increase performance:
With crate server >= 0.42.0 executemany uses bulk_args
and returns a list of results.
Expand Down
36 changes: 20 additions & 16 deletions src/crate/client/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,25 +288,25 @@ def _request(self, method, path, server=None, **kwargs):
self._add_server(redirect_server)
return self._request(
method, path, server=redirect_server, **kwargs)
if not server and (500 <= response.status < 600):
with self._lock:
# drop server from active ones
self._drop_server(next_server, response.reason)
return response
except (urllib3.exceptions.MaxRetryError,
urllib3.exceptions.ReadTimeoutError,
urllib3.exceptions.SSLError,
urllib3.exceptions.HTTPError,
urllib3.exceptions.ProxyError,
) as ex:
# drop server from active ones
ex_message = hasattr(ex, 'message') and ex.message or str(ex)
if server:
raise ConnectionError(
"Server not available, exception: %s" % ex_message
)
self._drop_server(next_server, ex_message)
# if this is the last server raise exception, otherwise try next
if not self._active_servers:
raise ConnectionError(
("No more Servers available, "
"exception from last server: %s") % ex_message)
with self._lock:
# drop server from active ones
self._drop_server(next_server, ex_message)
except Exception as e:
ex_message = hasattr(e, 'message') and e.message or str(e)
raise ProgrammingError(ex_message)
Expand Down Expand Up @@ -409,15 +409,19 @@ def _drop_server(self, server, message):
"""
Drop server from active list and adds it to the inactive ones.
"""
with self._lock:
try:
self._active_servers.remove(server)
except ValueError:
pass
else:
heapq.heappush(self._inactive_servers, (time(), server,
message))
logger.warning("Removed server %s from active pool", server)
try:
self._active_servers.remove(server)
except ValueError:
pass
else:
heapq.heappush(self._inactive_servers, (time(), server, message))
logger.warning("Removed server %s from active pool", server)

# if this is the last server raise exception, otherwise try next
if not self._active_servers:
raise ConnectionError(
("No more Servers available, "
"exception from last server: %s") % message)

def _roundrobin(self):
"""
Expand Down
2 changes: 1 addition & 1 deletion src/crate/client/http.txt
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ is raised when timeout is reached::
>>> http_client.sql('select name from locations')
Traceback (most recent call last):
...
ConnectionError: No more Servers available, exception from last server: HTTPConnectionPool(host='...', port=...): Read timed out. (read timeout=0.001)
ConnectionError: No more Servers available, exception from last server: ...

When connecting to non Crate servers the HttpClient will raise a ConnectionError like this::

Expand Down
38 changes: 38 additions & 0 deletions src/crate/client/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,33 @@ class FakeServerServiceUnavailable(FakeServerErrorResponse):
reason = "Service Unavailable"


class FakeServer50xResponse(FakeServerErrorResponse):

counter = 0
STATI = [200, 503]
REASONS = ["Success", "Service Unavailable"]

_status = 200
_reason = "Success"

@property
def status(self):
return self._status

@property
def reason(self):
return self._reason

def request(self, method, path, data=None, stream=False, **kwargs):
self._reason = self.REASONS[self.counter%2]
self._status = self.STATI[self.counter%2]
self.counter += 1
mock_response = MagicMock()
mock_response.status = self._status
mock_response.reason = self._reason
mock_response.headers = {"content-type": self.content_type}
return mock_response

class FakeServerUnauthorized(FakeServerErrorResponse):

status = 401
Expand Down Expand Up @@ -166,6 +193,17 @@ def test_programming_error_contains_http_error_response_content(self):
else:
self.assertTrue(False)

@patch('crate.client.http.Server', FakeServer50xResponse)
def test_server_error_50x(self):
client = Client(servers="localhost:4200")
client.sql('select 1')
try:
client.sql('select 2')
except ProgrammingError as e:
self.assertEqual("No more Servers available, exception from last server: Service Unavailable",
e.message)
self.assertEqual([], list(client._active_servers))

def test_connect(self):
client = Client(servers="localhost:4200 localhost:4201")
self.assertEqual(client._active_servers,
Expand Down