Skip to content

Commit

Permalink
drop server from active list if it responds
Browse files Browse the repository at this point in the history
a 50x error and retry with next server
  • Loading branch information
chaudum committed Aug 26, 2014
1 parent f425763 commit 153be58
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 16 deletions.
3 changes: 3 additions & 0 deletions CHANGES.txt
Expand Up @@ -2,6 +2,9 @@
Changes for crate
=================

- drop server from active list if it responds
a valid 50x error and retry with next server

- use bulk_args in executemany to increase performance:
With crate server >= 0.42.0 executemany uses bulk_args
and returns a list of results.
Expand Down
36 changes: 20 additions & 16 deletions src/crate/client/http.py
Expand Up @@ -288,25 +288,25 @@ def _request(self, method, path, server=None, **kwargs):
self._add_server(redirect_server)
return self._request(
method, path, server=redirect_server, **kwargs)
if not server and (500 <= response.status < 600):
with self._lock:
# drop server from active ones
self._drop_server(next_server, response.reason)
return response
except (urllib3.exceptions.MaxRetryError,
urllib3.exceptions.ReadTimeoutError,
urllib3.exceptions.SSLError,
urllib3.exceptions.HTTPError,
urllib3.exceptions.ProxyError,
) as ex:
# drop server from active ones
ex_message = hasattr(ex, 'message') and ex.message or str(ex)
if server:
raise ConnectionError(
"Server not available, exception: %s" % ex_message
)
self._drop_server(next_server, ex_message)
# if this is the last server raise exception, otherwise try next
if not self._active_servers:
raise ConnectionError(
("No more Servers available, "
"exception from last server: %s") % ex_message)
with self._lock:
# drop server from active ones
self._drop_server(next_server, ex_message)
except Exception as e:
ex_message = hasattr(e, 'message') and e.message or str(e)
raise ProgrammingError(ex_message)
Expand Down Expand Up @@ -409,15 +409,19 @@ def _drop_server(self, server, message):
"""
Drop server from active list and adds it to the inactive ones.
"""
with self._lock:
try:
self._active_servers.remove(server)
except ValueError:
pass
else:
heapq.heappush(self._inactive_servers, (time(), server,
message))
logger.warning("Removed server %s from active pool", server)
try:
self._active_servers.remove(server)
except ValueError:
pass
else:
heapq.heappush(self._inactive_servers, (time(), server, message))
logger.warning("Removed server %s from active pool", server)

# if this is the last server raise exception, otherwise try next
if not self._active_servers:
raise ConnectionError(
("No more Servers available, "
"exception from last server: %s") % message)

def _roundrobin(self):
"""
Expand Down
38 changes: 38 additions & 0 deletions src/crate/client/test_http.py
Expand Up @@ -84,6 +84,33 @@ class FakeServerServiceUnavailable(FakeServerErrorResponse):
reason = "Service Unavailable"


class FakeServer50xResponse(FakeServerErrorResponse):

counter = 0
STATI = [200, 503]
REASONS = ["Success", "Service Unavailable"]

_status = 200
_reason = "Success"

@property
def status(self):
return self._status

@property
def reason(self):
return self._reason

def request(self, method, path, data=None, stream=False, **kwargs):
self._reason = self.REASONS[self.counter%2]
self._status = self.STATI[self.counter%2]
self.counter += 1
mock_response = MagicMock()
mock_response.status = self._status
mock_response.reason = self._reason
mock_response.headers = {"content-type": self.content_type}
return mock_response

class FakeServerUnauthorized(FakeServerErrorResponse):

status = 401
Expand Down Expand Up @@ -166,6 +193,17 @@ def test_programming_error_contains_http_error_response_content(self):
else:
self.assertTrue(False)

@patch('crate.client.http.Server', FakeServer50xResponse)
def test_server_error_50x(self):
client = Client(servers="localhost:4200")
client.sql('select 1')
try:
client.sql('select 2')
except ProgrammingError as e:
self.assertEqual("No more Servers available, exception from last server: Service Unavailable",
e.message)
self.assertEqual([], list(client._active_servers))

def test_connect(self):
client = Client(servers="localhost:4200 localhost:4201")
self.assertEqual(client._active_servers,
Expand Down

0 comments on commit 153be58

Please sign in to comment.