From 648c19eec49a3bb55e5f94657fbf0a87c06c2d0b Mon Sep 17 00:00:00 2001 From: "tianjin.gutj" Date: Fri, 24 Jun 2016 17:43:23 +0800 Subject: [PATCH] 1. Reusing HttpConnection cause ResponseNotReady(see also http://stackoverflow.com/questions/3231543/python-httplib-responsenotready), fix this bug 2. Make compatible to resource manager which ha enabled 3. Add cases for new feature 4. Remove run tests in py32 py33 and add py34 py35 5. Bump version to 0.2.4 --- .travis.yml | 4 +- docs/conf.py | 4 +- setup.py | 4 +- tests/test_base.py | 7 --- tests/test_hadoop_conf.py | 105 ++++++++++++++++++++++++++++++++- tox.ini | 2 +- yarn_api_client/__init__.py | 2 +- yarn_api_client/base.py | 23 +++----- yarn_api_client/hadoop_conf.py | 55 +++++++++++++++-- 9 files changed, 168 insertions(+), 38 deletions(-) diff --git a/.travis.yml b/.travis.yml index b73f928..42727e3 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,7 +7,7 @@ script: env: - TOXENV=py26 - TOXENV=py27 - - TOXENV=py32 - - TOXENV=py33 + - TOXENV=py34 + - TOXENV=py35 after_success: coveralls diff --git a/docs/conf.py b/docs/conf.py index f2e43e4..b6fb039 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -54,9 +54,9 @@ # built documents. # # The short X.Y version. -version = '0.2.3' +version = '0.2.4' # The full version, including alpha/beta/rc tags. -release = '0.2.3' +release = '0.2.4' # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/setup.py b/setup.py index 708072c..4346b25 100644 --- a/setup.py +++ b/setup.py @@ -50,8 +50,8 @@ def find_version(*file_paths): 'Operating System :: OS Independent', 'Programming Language :: Python :: 2.6', 'Programming Language :: Python :: 2.7', - 'Programming Language :: Python :: 3.2', - 'Programming Language :: Python :: 3.3', + 'Programming Language :: Python :: 3.4', + 'Programming Language :: Python :: 3.5', 'Topic :: System :: Distributed Computing', ], ) diff --git a/tests/test_base.py b/tests/test_base.py index 75b72d9..dc9ef52 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -12,13 +12,6 @@ class BaseYarnAPITestCase(TestCase): - def test_http_property_cache(self): - client = self.get_client() - http_conn1 = client.http_conn - http_conn2 = client.http_conn - - self.assertIs(http_conn1, http_conn2) - def test_request(self): client = self.get_client() with patch('yarn_api_client.base.HTTPConnection') as http_conn_mock: diff --git a/tests/test_hadoop_conf.py b/tests/test_hadoop_conf.py index 09d9987..389e992 100644 --- a/tests/test_hadoop_conf.py +++ b/tests/test_hadoop_conf.py @@ -1,11 +1,25 @@ # -*- coding: utf-8 -*- from tempfile import NamedTemporaryFile +import mock from mock import patch from tests import TestCase from yarn_api_client import hadoop_conf + +_http_request_method = '' +_http_getresponse_method = '' + +try: + from httplib import HTTPConnection, OK, NOT_FOUND + _http_request_method = 'httplib.HTTPConnection.request' + _http_getresponse_method = 'httplib.HTTPConnection.getresponse' +except ImportError: + from http.client import HTTPConnection, OK, NOT_FOUND + _http_request_method = 'http.client.HTTPConnection.request' + _http_getresponse_method = 'http.client.HTTPConnection.getresponse' + empty_config = ''.encode('latin1') yarn_site_xml = """\ @@ -37,19 +51,104 @@ def test_parse(self): self.assertEqual(None, value) def test_get_resource_host_port(self): + with patch('yarn_api_client.hadoop_conf.parse') as parse_mock: + with patch('yarn_api_client.hadoop_conf._get_rm_ids') as get_rm_ids_mock: + parse_mock.return_value = 'example.com:8022' + get_rm_ids_mock.return_value = None + + host_port = hadoop_conf.get_resource_manager_host_port() + + self.assertEqual(('example.com', '8022'), host_port) + parse_mock.assert_called_with('/etc/hadoop/conf/yarn-site.xml', + 'yarn.resourcemanager.webapp.address') + + parse_mock.reset_mock() + parse_mock.return_value = None + + host_port = hadoop_conf.get_resource_manager_host_port() + self.assertIsNone(host_port) + + @mock.patch('yarn_api_client.hadoop_conf._get_rm_ids') + @mock.patch('yarn_api_client.hadoop_conf.parse') + @mock.patch('yarn_api_client.hadoop_conf._check_is_active_rm') + def test_get_resource_host_port_with_ha(self, check_is_active_rm_mock, parse_mock, get_rm_ids_mock): + get_rm_ids_mock.return_value = ['rm1', 'rm2'] + parse_mock.return_value = 'example.com:8022' + check_is_active_rm_mock.return_value = True + host_port = hadoop_conf.get_resource_manager_host_port() + + self.assertEqual(('example.com', '8022'), host_port) + parse_mock.assert_called_with('/etc/hadoop/conf/yarn-site.xml', + 'yarn.resourcemanager.webapp.address.rm1') + + parse_mock.reset_mock() + parse_mock.return_value = None + + host_port = hadoop_conf.get_resource_manager_host_port() + self.assertIsNone(host_port) + + def test_get_rm_ids(self): + with patch('yarn_api_client.hadoop_conf.parse') as parse_mock: + parse_mock.return_value = 'rm1,rm2' + rm_list = hadoop_conf._get_rm_ids(hadoop_conf.CONF_DIR) + self.assertEqual(['rm1', 'rm2'], rm_list) + parse_mock.assert_called_with('/etc/hadoop/conf/yarn-site.xml', 'yarn.resourcemanager.ha.rm-ids') + + parse_mock.reset_mock() + parse_mock.return_value = None + + rm_list = hadoop_conf._get_rm_ids(hadoop_conf.CONF_DIR) + self.assertIsNone(rm_list) + + @mock.patch(_http_request_method) + @mock.patch(_http_getresponse_method) + def test_check_is_active_rm(self, http_getresponse_mock, http_conn_request_mock): + class ResponseMock(): + def __init__(self, status, header_dict): + self.status = status + self.header_dict = header_dict + + def getheader(self, header_key, default_return): + if header_key in self.header_dict: + return self.header_dict[header_key] + else: + return default_return + + http_conn_request_mock.return_value = None + http_getresponse_mock.return_value = ResponseMock(OK, {}) + self.assertTrue(hadoop_conf._check_is_active_rm('example2', '8022')) + http_getresponse_mock.reset_mock() + http_getresponse_mock.return_value = ResponseMock(OK, {'Refresh':"testing"}) + self.assertFalse(hadoop_conf._check_is_active_rm('example2', '8022')) + http_getresponse_mock.reset_mock() + http_getresponse_mock.return_value = ResponseMock(NOT_FOUND, {'Refresh':"testing"}) + self.assertFalse(hadoop_conf._check_is_active_rm('example2', '8022')) + http_conn_request_mock.side_effect = Exception('error') + http_conn_request_mock.reset_mock() + http_conn_request_mock.return_value = None + self.assertFalse(hadoop_conf._check_is_active_rm('example2', '8022')) + pass + + def test_get_resource_manager(self): with patch('yarn_api_client.hadoop_conf.parse') as parse_mock: parse_mock.return_value = 'example.com:8022' - host_port = hadoop_conf.get_resource_manager_host_port() + host_port = hadoop_conf._get_resource_manager(hadoop_conf.CONF_DIR, None) + + self.assertEqual(('example.com', '8022'), host_port) + parse_mock.assert_called_with('/etc/hadoop/conf/yarn-site.xml', + 'yarn.resourcemanager.webapp.address') + + host_port = hadoop_conf._get_resource_manager(hadoop_conf.CONF_DIR, 'rm1') self.assertEqual(('example.com', '8022'), host_port) parse_mock.assert_called_with('/etc/hadoop/conf/yarn-site.xml', - 'yarn.resourcemanager.webapp.address') + 'yarn.resourcemanager.webapp.address.rm1') parse_mock.reset_mock() parse_mock.return_value = None - host_port = hadoop_conf.get_resource_manager_host_port() + host_port = hadoop_conf._get_resource_manager(hadoop_conf.CONF_DIR, 'rm1') self.assertIsNone(host_port) def test_get_jobhistory_host_port(self): diff --git a/tox.ini b/tox.ini index 2c7da0c..aa4429b 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = py26,py27,py32,py33 +envlist = py26,py27,py34,py35 [testenv] deps = diff --git a/yarn_api_client/__init__.py b/yarn_api_client/__init__.py index 1ae88d7..6f4ff0b 100644 --- a/yarn_api_client/__init__.py +++ b/yarn_api_client/__init__.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -__version__ = '0.2.3' +__version__ = '0.2.4' __all__ = ['ApplicationMaster', 'HistoryServer', 'NodeManager', 'ResourceManager'] diff --git a/yarn_api_client/base.py b/yarn_api_client/base.py index 9a61de3..c86a617 100644 --- a/yarn_api_client/base.py +++ b/yarn_api_client/base.py @@ -30,9 +30,10 @@ def request(self, api_path, **query_args): path = api_path self.logger.info('Request http://%s:%s%s', self.address, self.port, path) - self.http_conn.request('GET', path) - - response = self.http_conn.getresponse() + + http_conn = self.http_conn + http_conn.request('GET', path) + response = http_conn.getresponse() if response.status == OK: return self.response_class(response) @@ -44,19 +45,13 @@ def construct_parameters(self, arguments): params = dict((key, value) for key, value in arguments if value is not None) return params - - __http_conn = None @property def http_conn(self): - if self.__http_conn is None: - if self.address is None: - raise ConfigurationError('API address is not set') - elif self.port is None: - raise ConfigurationError('API port is not set') - self.__http_conn = HTTPConnection(self.address, self.port, - timeout=self.timeout) - - return self.__http_conn + if self.address is None: + raise ConfigurationError('API address is not set') + elif self.port is None: + raise ConfigurationError('API port is not set') + return HTTPConnection(self.address, self.port, timeout=self.timeout) __logger = None @property diff --git a/yarn_api_client/hadoop_conf.py b/yarn_api_client/hadoop_conf.py index 458a7e2..663b48d 100644 --- a/yarn_api_client/hadoop_conf.py +++ b/yarn_api_client/hadoop_conf.py @@ -1,21 +1,64 @@ # -*- coding: utf-8 -*- import os import xml.etree.ElementTree as ET +try: + from httplib import HTTPConnection, OK +except ImportError: + from http.client import HTTPConnection, OK CONF_DIR = '/etc/hadoop/conf' -def get_resource_manager_host_port(): - config_path = os.path.join(CONF_DIR, 'yarn-site.xml') +def _get_rm_ids(hadoop_conf_path): + rm_ids = parse(os.path.join(hadoop_conf_path, 'yarn-site.xml'), 'yarn.resourcemanager.ha.rm-ids') + if rm_ids is not None: + rm_ids = rm_ids.split(',') + return rm_ids + + +def _get_resource_manager(hadoop_conf_path, rm_id = None): prop_name = 'yarn.resourcemanager.webapp.address' - value = parse(config_path, prop_name) - if value is not None: - host, _, port = value.partition(':') - return host, port + if rm_id is not None: + rm_webapp_address = parse(os.path.join(hadoop_conf_path, 'yarn-site.xml'), '%s.%s' % (prop_name, rm_id)) + else: + rm_webapp_address = parse(os.path.join(hadoop_conf_path, 'yarn-site.xml'), prop_name) + if rm_webapp_address is not None: + [host, port] = rm_webapp_address.split(':') + return (host, port) else: return None +def _check_is_active_rm(rm_web_host, rm_web_port): + conn = HTTPConnection(rm_web_host, rm_web_port) + try: + conn.request('GET', '/cluster') + except: + return False + response = conn.getresponse() + if response.status != OK: + return False + else: + if response.getheader('Refresh', None) is not None: + return False + return True + + +def get_resource_manager_host_port(): + hadoop_conf_path = CONF_DIR + rm_ids = _get_rm_ids(hadoop_conf_path) + if rm_ids is not None: + for rm_id in rm_ids: + ret = _get_resource_manager(hadoop_conf_path, rm_id) + if ret is not None: + (host, port) = ret + if _check_is_active_rm(host, port): + return host, port + return None + else: + return _get_resource_manager(hadoop_conf_path, None) + + def get_jobhistory_host_port(): config_path = os.path.join(CONF_DIR, 'mapred-site.xml') prop_name = 'mapreduce.jobhistory.webapp.address'