diff --git a/README.md b/README.md index bcc1f86..9cae469 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,8 @@ Simple python api for the chronos job scheduler ``` import chronos c = chronos.connect("chronos.mesos.server.com:8080") +# or specify multilple servers that will be tried in order +c = chronos.connect(["chronos1.mesos.server.com:8080", "chronos2.mesos.server.com:8080"]) # get list of scheduled jobs and their status as # [{ 'name': 'job1', ..}, { 'name': 'job2', ..}] diff --git a/chronos/__init__.py b/chronos/__init__.py index d5e2aa9..b9b21a4 100755 --- a/chronos/__init__.py +++ b/chronos/__init__.py @@ -28,12 +28,21 @@ from urllib import quote +class ChronosAPIError(Exception): + pass + + +class MissingFieldError(Exception): + pass + + class ChronosClient(object): _user = None _password = None - def __init__(self, hostname, proto="http", username=None, password=None, level='WARN'): - self.baseurl = "%s://%s" % (proto, hostname) + def __init__(self, servers, proto="http", username=None, password=None, level='WARN'): + server_list = servers if isinstance(servers, list) else [servers] + self.servers = ["%s://%s" % (proto, server) for server in server_list] if username and password: self._user = username self._password = password @@ -86,9 +95,21 @@ def _call(self, url, method="GET", body=None, headers={}): conn = httplib2.Http(disable_ssl_certificate_validation=True) if self._user and self._password: conn.add_credentials(self._user, self._password) - endpoint = "%s%s" % (self.baseurl, quote(url)) - self.logger.debug(endpoint) - return self._check(*conn.request(endpoint, method, body=body, headers=hdrs)) + + response = None + servers = list(self.servers) + while servers: + server = servers.pop(0) + endpoint = "%s%s" % (server, quote(url)) + self.logger.debug(endpoint) + try: + response = self._check(*conn.request(endpoint, method, body=body, headers=hdrs)) + self.logger.info('Got response from %s', endpoint) + return response + except Exception as e: + self.logger.error('Error while calling %s: %s', endpoint, e.message) + + raise ChronosAPIError('No remaining Chronos servers to try') def _check(self, resp, content): status = resp.status @@ -102,18 +123,18 @@ def _check(self, resp, content): payload = content if payload is None and status != 204: - raise Exception("HTTP Error %d occurred." % status) + raise ChronosAPIError("Request to Chronos API failed: status: %d, response: %s" % (status, content)) return payload def _check_fields(self, job): for k in ChronosJob.fields: if k not in job: - raise Exception("missing required field %s" % k) + raise MissingFieldError("missing required field %s" % k) for k in ChronosJob.one_of: if k in job: return True - raise Exception("Job must include one of %s" % ChronosJob.one_of) + raise MissingFieldError("Job must include one of %s" % ChronosJob.one_of) class ChronosJob(object): @@ -128,5 +149,5 @@ class ChronosJob(object): one_of = ["schedule", "parents"] -def connect(hostname, proto="http", username=None, password=None): - return ChronosClient(hostname, proto="http", username=username, password=password) +def connect(servers, proto="http", username=None, password=None): + return ChronosClient(servers, proto=proto, username=username, password=password) diff --git a/itests/itest_utils.py b/itests/itest_utils.py index 5406c01..1ee54fa 100644 --- a/itests/itest_utils.py +++ b/itests/itest_utils.py @@ -2,8 +2,6 @@ from functools import wraps import os import signal -import sys -import threading import time import requests diff --git a/tests/test_chronos_init.py b/tests/test_chronos_init.py index 85d5113..726bc63 100644 --- a/tests/test_chronos_init.py +++ b/tests/test_chronos_init.py @@ -1,12 +1,28 @@ import json - import mock +import pytest +import httplib2 import chronos +def test_connect_accepts_single_host(): + client = chronos.ChronosClient("localhost", proto="http") + assert client.servers == ['http://localhost'] + + +def test_connect_accepts_list_of_hosts(): + client = chronos.ChronosClient(["host1", "host2"], proto="http") + assert client.servers == ['http://host1', 'http://host2'] + + +def test_connect_accepts_proto(): + client = chronos.ChronosClient("localhost", proto="fake_proto") + assert client.servers == ['fake_proto://localhost'] + + def test_check_accepts_json(): - client = chronos.ChronosClient(hostname="localhost") + client = chronos.ChronosClient("localhost") fake_response = mock.Mock() fake_response.status = 200 fake_content = '{ "foo": "bar" }' @@ -15,9 +31,45 @@ def test_check_accepts_json(): def test_check_returns_raw_response_when_not_json(): - client = chronos.ChronosClient(hostname="localhost") + client = chronos.ChronosClient("localhost") fake_response = mock.Mock() fake_response.status = 401 fake_content = 'UNAUTHORIZED' actual = client._check(fake_response, fake_content) assert actual == fake_content + + +def test_uses_server_list(): + client = chronos.ChronosClient(["host1", "host2", "host3"], proto="http") + good_request = (mock.Mock(status=204), '') + bad_request = (mock.Mock(status=500), '') + + conn_mock = mock.Mock(request=mock.Mock(side_effect=[bad_request, good_request, bad_request])) + with mock.patch('httplib2.Http', return_value=conn_mock): + client._call('/fake_url') + assert conn_mock.request.call_count == 2 + + +def test_api_error_throws_exception(): + client = chronos.ChronosClient(servers="localhost") + mock_response = mock.Mock() + mock_response.status = 500 + mock_request = mock.Mock(return_value=(mock_response, None)) + with mock.patch.object(httplib2.Http, 'request', mock_request): + with pytest.raises(chronos.ChronosAPIError): + client.list() + + +def test_check_missing_fields(): + client = chronos.ChronosClient(servers="localhost") + for field in chronos.ChronosJob.fields: + without_field = {x: 'foo' for x in filter(lambda y: y != field, chronos.ChronosJob.fields)} + with pytest.raises(chronos.MissingFieldError): + client._check_fields(without_field) + + +def test_check_one_of(): + client = chronos.ChronosClient(servers="localhost") + job = {field: 'foo' for field in chronos.ChronosJob.fields} + with pytest.raises(chronos.MissingFieldError): + client._check_fields(job)