diff --git a/.travis.yml b/.travis.yml index b73f928..42727e3 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,7 +7,7 @@ script: env: - TOXENV=py26 - TOXENV=py27 - - TOXENV=py32 - - TOXENV=py33 + - TOXENV=py34 + - TOXENV=py35 after_success: coveralls diff --git a/docs/conf.py b/docs/conf.py index f2e43e4..b6fb039 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -54,9 +54,9 @@ # built documents. # # The short X.Y version. -version = '0.2.3' +version = '0.2.4' # The full version, including alpha/beta/rc tags. -release = '0.2.3' +release = '0.2.4' # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/setup.py b/setup.py index 708072c..4346b25 100644 --- a/setup.py +++ b/setup.py @@ -50,8 +50,8 @@ def find_version(*file_paths): 'Operating System :: OS Independent', 'Programming Language :: Python :: 2.6', 'Programming Language :: Python :: 2.7', - 'Programming Language :: Python :: 3.2', - 'Programming Language :: Python :: 3.3', + 'Programming Language :: Python :: 3.4', + 'Programming Language :: Python :: 3.5', 'Topic :: System :: Distributed Computing', ], ) diff --git a/tests/test_base.py b/tests/test_base.py index 75b72d9..dc9ef52 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -12,13 +12,6 @@ class BaseYarnAPITestCase(TestCase): - def test_http_property_cache(self): - client = self.get_client() - http_conn1 = client.http_conn - http_conn2 = client.http_conn - - self.assertIs(http_conn1, http_conn2) - def test_request(self): client = self.get_client() with patch('yarn_api_client.base.HTTPConnection') as http_conn_mock: diff --git a/tests/test_hadoop_conf.py b/tests/test_hadoop_conf.py index 09d9987..389e992 100644 --- a/tests/test_hadoop_conf.py +++ b/tests/test_hadoop_conf.py @@ -1,11 +1,25 @@ # -*- coding: utf-8 -*- from tempfile import NamedTemporaryFile +import mock from mock import patch from tests import TestCase from yarn_api_client import hadoop_conf + +_http_request_method = '' +_http_getresponse_method = '' + +try: + from httplib import HTTPConnection, OK, NOT_FOUND + _http_request_method = 'httplib.HTTPConnection.request' + _http_getresponse_method = 'httplib.HTTPConnection.getresponse' +except ImportError: + from http.client import HTTPConnection, OK, NOT_FOUND + _http_request_method = 'http.client.HTTPConnection.request' + _http_getresponse_method = 'http.client.HTTPConnection.getresponse' + empty_config = ''.encode('latin1') yarn_site_xml = """\ @@ -37,19 +51,104 @@ def test_parse(self): self.assertEqual(None, value) def test_get_resource_host_port(self): + with patch('yarn_api_client.hadoop_conf.parse') as parse_mock: + with patch('yarn_api_client.hadoop_conf._get_rm_ids') as get_rm_ids_mock: + parse_mock.return_value = 'example.com:8022' + get_rm_ids_mock.return_value = None + + host_port = hadoop_conf.get_resource_manager_host_port() + + self.assertEqual(('example.com', '8022'), host_port) + parse_mock.assert_called_with('/etc/hadoop/conf/yarn-site.xml', + 'yarn.resourcemanager.webapp.address') + + parse_mock.reset_mock() + parse_mock.return_value = None + + host_port = hadoop_conf.get_resource_manager_host_port() + self.assertIsNone(host_port) + + @mock.patch('yarn_api_client.hadoop_conf._get_rm_ids') + @mock.patch('yarn_api_client.hadoop_conf.parse') + @mock.patch('yarn_api_client.hadoop_conf._check_is_active_rm') + def test_get_resource_host_port_with_ha(self, check_is_active_rm_mock, parse_mock, get_rm_ids_mock): + get_rm_ids_mock.return_value = ['rm1', 'rm2'] + parse_mock.return_value = 'example.com:8022' + check_is_active_rm_mock.return_value = True + host_port = hadoop_conf.get_resource_manager_host_port() + + self.assertEqual(('example.com', '8022'), host_port) + parse_mock.assert_called_with('/etc/hadoop/conf/yarn-site.xml', + 'yarn.resourcemanager.webapp.address.rm1') + + parse_mock.reset_mock() + parse_mock.return_value = None + + host_port = hadoop_conf.get_resource_manager_host_port() + self.assertIsNone(host_port) + + def test_get_rm_ids(self): + with patch('yarn_api_client.hadoop_conf.parse') as parse_mock: + parse_mock.return_value = 'rm1,rm2' + rm_list = hadoop_conf._get_rm_ids(hadoop_conf.CONF_DIR) + self.assertEqual(['rm1', 'rm2'], rm_list) + parse_mock.assert_called_with('/etc/hadoop/conf/yarn-site.xml', 'yarn.resourcemanager.ha.rm-ids') + + parse_mock.reset_mock() + parse_mock.return_value = None + + rm_list = hadoop_conf._get_rm_ids(hadoop_conf.CONF_DIR) + self.assertIsNone(rm_list) + + @mock.patch(_http_request_method) + @mock.patch(_http_getresponse_method) + def test_check_is_active_rm(self, http_getresponse_mock, http_conn_request_mock): + class ResponseMock(): + def __init__(self, status, header_dict): + self.status = status + self.header_dict = header_dict + + def getheader(self, header_key, default_return): + if header_key in self.header_dict: + return self.header_dict[header_key] + else: + return default_return + + http_conn_request_mock.return_value = None + http_getresponse_mock.return_value = ResponseMock(OK, {}) + self.assertTrue(hadoop_conf._check_is_active_rm('example2', '8022')) + http_getresponse_mock.reset_mock() + http_getresponse_mock.return_value = ResponseMock(OK, {'Refresh':"testing"}) + self.assertFalse(hadoop_conf._check_is_active_rm('example2', '8022')) + http_getresponse_mock.reset_mock() + http_getresponse_mock.return_value = ResponseMock(NOT_FOUND, {'Refresh':"testing"}) + self.assertFalse(hadoop_conf._check_is_active_rm('example2', '8022')) + http_conn_request_mock.side_effect = Exception('error') + http_conn_request_mock.reset_mock() + http_conn_request_mock.return_value = None + self.assertFalse(hadoop_conf._check_is_active_rm('example2', '8022')) + pass + + def test_get_resource_manager(self): with patch('yarn_api_client.hadoop_conf.parse') as parse_mock: parse_mock.return_value = 'example.com:8022' - host_port = hadoop_conf.get_resource_manager_host_port() + host_port = hadoop_conf._get_resource_manager(hadoop_conf.CONF_DIR, None) + + self.assertEqual(('example.com', '8022'), host_port) + parse_mock.assert_called_with('/etc/hadoop/conf/yarn-site.xml', + 'yarn.resourcemanager.webapp.address') + + host_port = hadoop_conf._get_resource_manager(hadoop_conf.CONF_DIR, 'rm1') self.assertEqual(('example.com', '8022'), host_port) parse_mock.assert_called_with('/etc/hadoop/conf/yarn-site.xml', - 'yarn.resourcemanager.webapp.address') + 'yarn.resourcemanager.webapp.address.rm1') parse_mock.reset_mock() parse_mock.return_value = None - host_port = hadoop_conf.get_resource_manager_host_port() + host_port = hadoop_conf._get_resource_manager(hadoop_conf.CONF_DIR, 'rm1') self.assertIsNone(host_port) def test_get_jobhistory_host_port(self): diff --git a/tox.ini b/tox.ini index 2c7da0c..aa4429b 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = py26,py27,py32,py33 +envlist = py26,py27,py34,py35 [testenv] deps = diff --git a/yarn_api_client/__init__.py b/yarn_api_client/__init__.py index 1ae88d7..6f4ff0b 100644 --- a/yarn_api_client/__init__.py +++ b/yarn_api_client/__init__.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -__version__ = '0.2.3' +__version__ = '0.2.4' __all__ = ['ApplicationMaster', 'HistoryServer', 'NodeManager', 'ResourceManager'] diff --git a/yarn_api_client/base.py b/yarn_api_client/base.py index 9a61de3..c86a617 100644 --- a/yarn_api_client/base.py +++ b/yarn_api_client/base.py @@ -30,9 +30,10 @@ def request(self, api_path, **query_args): path = api_path self.logger.info('Request http://%s:%s%s', self.address, self.port, path) - self.http_conn.request('GET', path) - - response = self.http_conn.getresponse() + + http_conn = self.http_conn + http_conn.request('GET', path) + response = http_conn.getresponse() if response.status == OK: return self.response_class(response) @@ -44,19 +45,13 @@ def construct_parameters(self, arguments): params = dict((key, value) for key, value in arguments if value is not None) return params - - __http_conn = None @property def http_conn(self): - if self.__http_conn is None: - if self.address is None: - raise ConfigurationError('API address is not set') - elif self.port is None: - raise ConfigurationError('API port is not set') - self.__http_conn = HTTPConnection(self.address, self.port, - timeout=self.timeout) - - return self.__http_conn + if self.address is None: + raise ConfigurationError('API address is not set') + elif self.port is None: + raise ConfigurationError('API port is not set') + return HTTPConnection(self.address, self.port, timeout=self.timeout) __logger = None @property diff --git a/yarn_api_client/hadoop_conf.py b/yarn_api_client/hadoop_conf.py index 458a7e2..663b48d 100644 --- a/yarn_api_client/hadoop_conf.py +++ b/yarn_api_client/hadoop_conf.py @@ -1,21 +1,64 @@ # -*- coding: utf-8 -*- import os import xml.etree.ElementTree as ET +try: + from httplib import HTTPConnection, OK +except ImportError: + from http.client import HTTPConnection, OK CONF_DIR = '/etc/hadoop/conf' -def get_resource_manager_host_port(): - config_path = os.path.join(CONF_DIR, 'yarn-site.xml') +def _get_rm_ids(hadoop_conf_path): + rm_ids = parse(os.path.join(hadoop_conf_path, 'yarn-site.xml'), 'yarn.resourcemanager.ha.rm-ids') + if rm_ids is not None: + rm_ids = rm_ids.split(',') + return rm_ids + + +def _get_resource_manager(hadoop_conf_path, rm_id = None): prop_name = 'yarn.resourcemanager.webapp.address' - value = parse(config_path, prop_name) - if value is not None: - host, _, port = value.partition(':') - return host, port + if rm_id is not None: + rm_webapp_address = parse(os.path.join(hadoop_conf_path, 'yarn-site.xml'), '%s.%s' % (prop_name, rm_id)) + else: + rm_webapp_address = parse(os.path.join(hadoop_conf_path, 'yarn-site.xml'), prop_name) + if rm_webapp_address is not None: + [host, port] = rm_webapp_address.split(':') + return (host, port) else: return None +def _check_is_active_rm(rm_web_host, rm_web_port): + conn = HTTPConnection(rm_web_host, rm_web_port) + try: + conn.request('GET', '/cluster') + except: + return False + response = conn.getresponse() + if response.status != OK: + return False + else: + if response.getheader('Refresh', None) is not None: + return False + return True + + +def get_resource_manager_host_port(): + hadoop_conf_path = CONF_DIR + rm_ids = _get_rm_ids(hadoop_conf_path) + if rm_ids is not None: + for rm_id in rm_ids: + ret = _get_resource_manager(hadoop_conf_path, rm_id) + if ret is not None: + (host, port) = ret + if _check_is_active_rm(host, port): + return host, port + return None + else: + return _get_resource_manager(hadoop_conf_path, None) + + def get_jobhistory_host_port(): config_path = os.path.join(CONF_DIR, 'mapred-site.xml') prop_name = 'mapreduce.jobhistory.webapp.address'