From 9b7b47e12c439f775e23d2060036856db7e7b7f2 Mon Sep 17 00:00:00 2001 From: Dmitry Romanenko Date: Wed, 29 May 2019 21:07:46 -0400 Subject: [PATCH] Code cleanup --- itests/integration_test_resource_manager.py | 20 +++--- setup.py | 4 -- tests/test_application_master.py | 6 +- tests/test_base.py | 11 ++-- tests/test_hadoop_conf.py | 73 ++++++++++----------- tests/test_history_server.py | 4 +- tests/test_resource_manager.py | 25 +++---- yarn_api_client/application_master.py | 18 ++--- yarn_api_client/base.py | 52 ++++++++++----- yarn_api_client/hadoop_conf.py | 53 +++++---------- yarn_api_client/history_server.py | 22 ++++--- yarn_api_client/node_manager.py | 21 +++--- yarn_api_client/resource_manager.py | 52 ++++++++------- 13 files changed, 186 insertions(+), 175 deletions(-) diff --git a/itests/integration_test_resource_manager.py b/itests/integration_test_resource_manager.py index 7784bf9..670fd11 100644 --- a/itests/integration_test_resource_manager.py +++ b/itests/integration_test_resource_manager.py @@ -29,55 +29,55 @@ def setUpClass(self): if yarn_endpoint_uri.hostname and yarn_endpoint_uri.port: self.configured = True - self.resourceManager = ResourceManager(yarn_endpoint_uri.hostname, yarn_endpoint_uri.port) + self.resource_manager = ResourceManager([yarn_endpoint_uri.hostname + ":" + str(yarn_endpoint_uri.port)]) def test_cluster_information(self): if self.configured: - info = self.resourceManager.cluster_information() + info = self.resource_manager.cluster_information() pprint(info.data) self.assertEqual(info.data['clusterInfo']['state'], 'STARTED') def test_cluster_metrics(self): if self.configured: - metrics = self.resourceManager.cluster_metrics() + metrics = self.resource_manager.cluster_metrics() pprint(metrics.data) self.assertGreater(metrics.data['clusterMetrics']['activeNodes'], 0) self.assertIsNotNone(metrics.data['clusterMetrics']['totalNodes']) def test_cluster_scheduler(self): if self.configured: - scheduler = self.resourceManager.cluster_scheduler() + scheduler = self.resource_manager.cluster_scheduler() pprint(scheduler.data) self.assertIsNotNone(scheduler.data['scheduler']['schedulerInfo']) def test_cluster_applications(self): if self.configured: - apps = self.resourceManager.cluster_applications() + apps = self.resource_manager.cluster_applications() pprint(apps.data) self.assertIsNotNone(apps.data['apps']) def test_cluster_application_state(self): if self.configured: - apps = self.resourceManager.cluster_applications() + apps = self.resource_manager.cluster_applications() appid = apps.data['apps']['app'][0]['id'] print(appid) - response = self.resourceManager.cluster_application_state(appid) + response = self.resource_manager.cluster_application_state(appid) pprint(response.data) pprint(response.data['state']) self.assertIsNotNone(apps.data['apps']) def test_cluster_application_statistics(self): if self.configured: - appstats = self.resourceManager.cluster_application_statistics() + appstats = self.resource_manager.cluster_application_statistics() pprint(appstats.data) self.assertIsNotNone(appstats.data['appStatInfo']) def test_cluster_nodes(self): if self.configured: - nodes = self.resourceManager.cluster_nodes() + nodes = self.resource_manager.cluster_nodes() pprint(nodes.data) self.assertIsNotNone(nodes.data['nodes']) - running_nodes = self.resourceManager.cluster_nodes(state='RUNNING', healthy='true') + running_nodes = self.resource_manager.cluster_nodes(state='RUNNING', healthy='true') pprint(running_nodes.data) self.assertIsNotNone(nodes.data['nodes']) diff --git a/setup.py b/setup.py index 7512127..79aeeaf 100644 --- a/setup.py +++ b/setup.py @@ -30,10 +30,6 @@ def find_version(*file_paths): 'requests>=2.7,<3.0', ], - extras_require = { - 'kerberos': ['requests-kerberos'], - }, - entry_points = { 'console_scripts': [ 'yarn_client = yarn_api_client.main:main', diff --git a/tests/test_application_master.py b/tests/test_application_master.py index 7224daf..b808c47 100644 --- a/tests/test_application_master.py +++ b/tests/test_application_master.py @@ -10,11 +10,11 @@ class AppMasterTestCase(TestCase): def setUp(self): self.app = ApplicationMaster('localhost') - @patch('yarn_api_client.application_master.get_webproxy_host_port') + @patch('yarn_api_client.application_master.get_webproxy_endpoint') def test__init__(self, get_config_mock, request_mock): - get_config_mock.return_value = (None, None) + get_config_mock.return_value = None ApplicationMaster() - get_config_mock.assert_called_with() + get_config_mock.assert_called_with(30) def test_application_information(self, request_mock): self.app.application_information('app_100500') diff --git a/tests/test_base.py b/tests/test_base.py index 9c640ab..740111c 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -1,8 +1,10 @@ # -*- coding: utf-8 -*- try: from httplib import OK # NOQA + from urlparse import urlparse except ImportError: from http.client import OK # NOQA + from urllib.parse import urlparse import json import requests_mock @@ -52,16 +54,15 @@ def test_http_configuration(self): requests_get_mock.get('/ololo', text=json.dumps(BaseYarnAPITestCase.success_response())) client = self.get_client() - client.address = None - client.port = 80 + client.service_uri = None with self.assertRaises(ConfigurationError): client.request('/ololo') def get_client(self): client = base.BaseYarnAPI() - client.address = 'example.com' - client.port = 80 + client.service_uri = base.Uri('example.com:80') client.timeout = 0 - client.kerberos_enabled = False + client.auth = None + client.verify = True return client diff --git a/tests/test_hadoop_conf.py b/tests/test_hadoop_conf.py index a859e98..8e1a65b 100644 --- a/tests/test_hadoop_conf.py +++ b/tests/test_hadoop_conf.py @@ -50,42 +50,42 @@ def test_parse(self): value = hadoop_conf.parse(f.name, key) self.assertEqual(None, value) - def test_get_resource_host_port(self): + def test_get_resource_endpoint(self): with patch('yarn_api_client.hadoop_conf.parse') as parse_mock: with patch('yarn_api_client.hadoop_conf._get_rm_ids') as get_rm_ids_mock: parse_mock.return_value = 'example.com:8022' get_rm_ids_mock.return_value = None - host_port = hadoop_conf.get_resource_manager_host_port() + endpoint = hadoop_conf.get_resource_manager_endpoint() - self.assertEqual(('example.com', '8022'), host_port) + self.assertEqual('example.com:8022', endpoint) parse_mock.assert_called_with('/etc/hadoop/conf/yarn-site.xml', 'yarn.resourcemanager.webapp.address') parse_mock.reset_mock() parse_mock.return_value = None - host_port = hadoop_conf.get_resource_manager_host_port() - self.assertIsNone(host_port) + endpoint = hadoop_conf.get_resource_manager_endpoint() + self.assertIsNone(endpoint) @mock.patch('yarn_api_client.hadoop_conf._get_rm_ids') @mock.patch('yarn_api_client.hadoop_conf.parse') @mock.patch('yarn_api_client.hadoop_conf.check_is_active_rm') - def test_get_resource_host_port_with_ha(self, check_is_active_rm_mock, parse_mock, get_rm_ids_mock): + def test_get_resource_endpoint_with_ha(self, check_is_active_rm_mock, parse_mock, get_rm_ids_mock): get_rm_ids_mock.return_value = ['rm1', 'rm2'] parse_mock.return_value = 'example.com:8022' check_is_active_rm_mock.return_value = True - host_port = hadoop_conf.get_resource_manager_host_port() + endpoint = hadoop_conf.get_resource_manager_endpoint() - self.assertEqual(('example.com', '8022'), host_port) + self.assertEqual('example.com:8022', endpoint) parse_mock.assert_called_with('/etc/hadoop/conf/yarn-site.xml', 'yarn.resourcemanager.webapp.address.rm1') parse_mock.reset_mock() parse_mock.return_value = None - host_port = hadoop_conf.get_resource_manager_host_port() - self.assertIsNone(host_port) + endpoint = hadoop_conf.get_resource_manager_endpoint() + self.assertIsNone(endpoint) def test_get_rm_ids(self): with patch('yarn_api_client.hadoop_conf.parse') as parse_mock: @@ -116,85 +116,82 @@ def getheader(self, header_key, default_return): http_conn_request_mock.return_value = None http_getresponse_mock.return_value = ResponseMock(OK, {}) - self.assertTrue(hadoop_conf.check_is_active_rm('example2', '8022')) + self.assertTrue(hadoop_conf.check_is_active_rm('example2:8022')) http_getresponse_mock.reset_mock() http_getresponse_mock.return_value = ResponseMock(OK, {'Refresh': "testing"}) - self.assertFalse(hadoop_conf.check_is_active_rm('example2', '8022')) + self.assertFalse(hadoop_conf.check_is_active_rm('example2:8022')) http_getresponse_mock.reset_mock() http_getresponse_mock.return_value = ResponseMock(NOT_FOUND, {'Refresh': "testing"}) - self.assertFalse(hadoop_conf.check_is_active_rm('example2', '8022')) + self.assertFalse(hadoop_conf.check_is_active_rm('example2:8022')) http_conn_request_mock.side_effect = Exception('error') http_conn_request_mock.reset_mock() http_conn_request_mock.return_value = None - self.assertFalse(hadoop_conf.check_is_active_rm('example2', '8022')) - pass + self.assertFalse(hadoop_conf.check_is_active_rm('example2:8022')) def test_get_resource_manager(self): with patch('yarn_api_client.hadoop_conf.parse') as parse_mock: parse_mock.return_value = 'example.com:8022' - host_port = hadoop_conf._get_resource_manager(hadoop_conf.CONF_DIR, None) + endpoint = hadoop_conf._get_resource_manager(hadoop_conf.CONF_DIR, None) - self.assertEqual(('example.com', '8022'), host_port) + self.assertEqual('example.com:8022', endpoint) parse_mock.assert_called_with('/etc/hadoop/conf/yarn-site.xml', 'yarn.resourcemanager.webapp.address') - host_port = hadoop_conf._get_resource_manager(hadoop_conf.CONF_DIR, 'rm1') + endpoint = hadoop_conf._get_resource_manager(hadoop_conf.CONF_DIR, 'rm1') - self.assertEqual(('example.com', '8022'), host_port) + self.assertEqual(('example.com:8022'), endpoint) parse_mock.assert_called_with('/etc/hadoop/conf/yarn-site.xml', 'yarn.resourcemanager.webapp.address.rm1') parse_mock.reset_mock() parse_mock.return_value = None - host_port = hadoop_conf._get_resource_manager(hadoop_conf.CONF_DIR, 'rm1') - self.assertIsNone(host_port) + endpoint = hadoop_conf._get_resource_manager(hadoop_conf.CONF_DIR, 'rm1') + self.assertIsNone(endpoint) - def test_get_jobhistory_host_port(self): + def test_get_jobhistory_endpoint(self): with patch('yarn_api_client.hadoop_conf.parse') as parse_mock: parse_mock.return_value = 'example.com:8022' - host_port = hadoop_conf.get_jobhistory_host_port() + endpoint = hadoop_conf.get_jobhistory_endpoint() - self.assertEqual(('example.com', '8022'), host_port) + self.assertEqual('example.com:8022', endpoint) parse_mock.assert_called_with('/etc/hadoop/conf/mapred-site.xml', 'mapreduce.jobhistory.webapp.address') parse_mock.reset_mock() parse_mock.return_value = None - host_port = hadoop_conf.get_jobhistory_host_port() - self.assertIsNone(host_port) + endpoint = hadoop_conf.get_jobhistory_endpoint() + self.assertIsNone(endpoint) - - def test_get_nodemanager_host_port(self): + def test_get_nodemanager_endpoint(self): with patch('yarn_api_client.hadoop_conf.parse') as parse_mock: parse_mock.return_value = 'example.com:8022' - host_port = hadoop_conf.get_nodemanager_host_port() + endpoint = hadoop_conf.get_nodemanager_endpoint() - self.assertEqual(('example.com', '8022'), host_port) + self.assertEqual('example.com:8022', endpoint) parse_mock.assert_called_with('/etc/hadoop/conf/yarn-site.xml', 'yarn.nodemanager.webapp.address') parse_mock.reset_mock() parse_mock.return_value = None - host_port = hadoop_conf.get_nodemanager_host_port() - self.assertIsNone(host_port) - + endpoint = hadoop_conf.get_nodemanager_endpoint() + self.assertIsNone(endpoint) - def test_get_webproxy_host_port(self): + def test_get_webproxy_endpoint(self): with patch('yarn_api_client.hadoop_conf.parse') as parse_mock: parse_mock.return_value = 'example.com:8022' - host_port = hadoop_conf.get_webproxy_host_port() + endpoint = hadoop_conf.get_webproxy_endpoint() - self.assertEqual(('example.com', '8022'), host_port) + self.assertEqual('example.com:8022', endpoint) parse_mock.assert_called_with('/etc/hadoop/conf/yarn-site.xml', 'yarn.web-proxy.address') parse_mock.reset_mock() parse_mock.return_value = None - host_port = hadoop_conf.get_webproxy_host_port() - self.assertIsNone(host_port) + endpoint = hadoop_conf.get_webproxy_endpoint() + self.assertIsNone(endpoint) diff --git a/tests/test_history_server.py b/tests/test_history_server.py index fd025ba..b821387 100644 --- a/tests/test_history_server.py +++ b/tests/test_history_server.py @@ -11,9 +11,9 @@ class HistoryServerTestCase(TestCase): def setUp(self): self.hs = HistoryServer('localhost') - @patch('yarn_api_client.history_server.get_jobhistory_host_port') + @patch('yarn_api_client.history_server.get_jobhistory_endpoint') def test__init__(self, get_config_mock, request_mock): - get_config_mock.return_value = (None, None) + get_config_mock.return_value = None HistoryServer() get_config_mock.assert_called_with() diff --git a/tests/test_resource_manager.py b/tests/test_resource_manager.py index 97656c9..1250c20 100644 --- a/tests/test_resource_manager.py +++ b/tests/test_resource_manager.py @@ -8,14 +8,16 @@ @patch('yarn_api_client.resource_manager.ResourceManager.request') class ResourceManagerTestCase(TestCase): - def setUp(self): - self.rm = ResourceManager('localhost') + @patch('yarn_api_client.resource_manager.check_is_active_rm') + def setUp(self, check_is_active_rm_mock): + check_is_active_rm_mock.return_value = True + self.rm = ResourceManager(['localhost']) - @patch('yarn_api_client.resource_manager.get_resource_manager_host_port') + @patch('yarn_api_client.resource_manager.get_resource_manager_endpoint') def test__init__(self, get_config_mock, request_mock): - get_config_mock.return_value = (None, None) + get_config_mock.return_value = "localhost" ResourceManager() - get_config_mock.assert_called_with() + get_config_mock.assert_called_with(30) def test_cluster_information(self, request_mock): self.rm.cluster_information() @@ -31,17 +33,16 @@ def test_cluster_scheduler(self, request_mock): def test_cluster_applications(self, request_mock): self.rm.cluster_applications() - request_mock.assert_called_with('/ws/v1/cluster/apps') + request_mock.assert_called_with('/ws/v1/cluster/apps', params={}) self.rm.cluster_applications(state='KILLED', final_status='FAILED', user='root', queue='low', limit=10, started_time_begin=1, started_time_end=2, finished_time_begin=3, finished_time_end=4) - request_mock.assert_called_with('/ws/v1/cluster/apps', state='KILLED', - finalStatus='FAILED', user='root', - queue='low', limit=10, - startedTimeBegin=1, startedTimeEnd=2, - finishedTimeBegin=3, finishedTimeEnd=4) + request_mock.assert_called_with('/ws/v1/cluster/apps', params={'state': 'KILLED', + 'finalStatus': 'FAILED', 'user': 'root', 'queue': 'low', + 'limit': 10, 'startedTimeBegin': 1, 'startedTimeEnd': 2, + 'finishedTimeBegin': 3, 'finishedTimeEnd': 4}) with self.assertRaises(IllegalArgumentError): self.rm.cluster_applications(state='ololo') @@ -51,7 +52,7 @@ def test_cluster_applications(self, request_mock): def test_cluster_application_statistics(self, request_mock): self.rm.cluster_application_statistics() - request_mock.assert_called_with('/ws/v1/cluster/appstatistics') + request_mock.assert_called_with('/ws/v1/cluster/appstatistics', params={}) # TODO: test arguments def test_cluster_application(self, request_mock): diff --git a/yarn_api_client/application_master.py b/yarn_api_client/application_master.py index e02c85d..001fd77 100644 --- a/yarn_api_client/application_master.py +++ b/yarn_api_client/application_master.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- from __future__ import unicode_literals from .base import BaseYarnAPI -from .hadoop_conf import get_webproxy_host_port +from .hadoop_conf import get_webproxy_endpoint class ApplicationMaster(BaseYarnAPI): @@ -15,17 +15,19 @@ class ApplicationMaster(BaseYarnAPI): If `address` argument is `None` client will try to extract `address` and `port` from Hadoop configuration files. - :param str address: Proxy HTTP address - :param int port: Proxy HTTP port + :param str service_endpoint: ApplicationMaster HTTP(S) address :param int timeout: API connection timeout in seconds - :param boolean kerberos_enabled: Flag identifying is Kerberos Security has been enabled for YARN + :param AuthBase auth: Auth to use for requests + :param boolean verify: Either a boolean, in which case it controls whether + we verify the server's TLS certificate, or a string, in which case it must + be a path to a CA bundle to use. Defaults to ``True`` """ - def __init__(self, address=None, port=8088, timeout=30, kerberos_enabled=False): - if address is None: + def __init__(self, service_endpoint=None, timeout=30, auth=None, verify=True): + if not service_endpoint: self.logger.debug('Get configuration from hadoop conf dir') - address, port = get_webproxy_host_port() + service_endpoint = get_webproxy_endpoint(timeout) - super(ApplicationMaster, self).__init__(address, port, timeout, kerberos_enabled) + super(ApplicationMaster, self).__init__(service_endpoint, timeout, auth, verify) def application_information(self, application_id): """ diff --git a/yarn_api_client/base.py b/yarn_api_client/base.py index 9e248b9..6753658 100644 --- a/yarn_api_client/base.py +++ b/yarn_api_client/base.py @@ -6,43 +6,65 @@ from .errors import APIError, ConfigurationError +try: + from urlparse import urlparse, urlunparse +except ImportError: + from urllib.parse import urlparse, urlunparse + class Response(object): def __init__(self, response): self.data = response.json() +class Uri(object): + def __init__(self, service_endpoint): + service_uri = urlparse(service_endpoint) + self.scheme = service_uri.scheme or 'http' + self.hostname = service_uri.hostname or service_uri.path + self.port = service_uri.port + + def to_url(self, api_path=None): + if self.port: + result_url = urlunparse((self.scheme, self.hostname + ":" + self.port, api_path, None, None, None)) + else: + result_url = urlunparse((self.scheme, self.hostname, api_path, None, None, None)) + + return result_url + + class BaseYarnAPI(object): __logger = None response_class = Response - def __init__(self, address=None, port=None, timeout=None, kerberos_enabled=None): - self.address, self.port, self.timeout, self.kerberos_enabled = address, port, timeout, kerberos_enabled + def __init__(self, service_endpoint=None, timeout=None, auth=None, verify=True): + self.timeout = timeout + + if service_endpoint: + self.service_uri = Uri(service_endpoint) + else: + self.service_uri = None + + self.session = requests.Session() + self.session.auth = auth + self.session.verify = verify def _validate_configuration(self): - if self.address is None: - raise ConfigurationError('API address is not set') - elif self.port is None: - raise ConfigurationError('API port is not set') + if not self.service_uri: + raise ConfigurationError('API endpoint is not set') def request(self, api_path, method='GET', **kwargs): - api_endpoint = 'http://{}:{}{}'.format(self.address, self.port, api_path) + self._validate_configuration() + api_endpoint = self.service_uri.to_url(api_path) self.logger.info('API Endpoint {}'.format(api_endpoint)) - self._validate_configuration() - if method == 'GET': headers = None else: headers = {"Content-Type": "application/json"} - response = None - if self.kerberos_enabled: - from requests_kerberos import HTTPKerberosAuth - response = requests.request(method=method, url=api_endpoint, auth=HTTPKerberosAuth(), headers=headers, **kwargs) - else: - response = requests.request(method=method, url=api_endpoint, headers=headers, **kwargs) + response = self.session.request(method=method, url=api_endpoint, headers=headers, timeout=self.timeout, **kwargs) if response.status_code in (200, 202): return self.response_class(response) diff --git a/yarn_api_client/hadoop_conf.py b/yarn_api_client/hadoop_conf.py index c189bf5..2e6b04a 100644 --- a/yarn_api_client/hadoop_conf.py +++ b/yarn_api_client/hadoop_conf.py @@ -5,7 +5,6 @@ from httplib import HTTPConnection, OK except ImportError: from http.client import HTTPConnection, OK - CONF_DIR = os.getenv('HADOOP_CONF_DIR', '/etc/hadoop/conf') @@ -18,19 +17,16 @@ def _get_rm_ids(hadoop_conf_path): def _get_resource_manager(hadoop_conf_path, rm_id=None): prop_name = 'yarn.resourcemanager.webapp.address' - if rm_id is not None: + if rm_id: rm_webapp_address = parse(os.path.join(hadoop_conf_path, 'yarn-site.xml'), '%s.%s' % (prop_name, rm_id)) else: rm_webapp_address = parse(os.path.join(hadoop_conf_path, 'yarn-site.xml'), prop_name) - if rm_webapp_address is not None: - [host, port] = rm_webapp_address.split(':') - return (host, port) - else: - return None + return rm_webapp_address or None -def check_is_active_rm(rm_web_host, rm_web_port): - conn = HTTPConnection(rm_web_host, rm_web_port) + +def check_is_active_rm(url, timeout=30): + conn = HTTPConnection(host=url, timeout=timeout) try: conn.request('GET', '/cluster') except: @@ -41,55 +37,40 @@ def check_is_active_rm(rm_web_host, rm_web_port): else: if response.getheader('Refresh', None) is not None: return False - return True + return True -def get_resource_manager_host_port(): +def get_resource_manager_endpoint(timeout=30): hadoop_conf_path = CONF_DIR rm_ids = _get_rm_ids(hadoop_conf_path) - if rm_ids is not None: + if rm_ids: for rm_id in rm_ids: ret = _get_resource_manager(hadoop_conf_path, rm_id) - if ret is not None: - (host, port) = ret - if check_is_active_rm(host, port): - return host, port + if ret: + if check_is_active_rm(ret, timeout): + return ret return None else: return _get_resource_manager(hadoop_conf_path, None) -def get_jobhistory_host_port(): +def get_jobhistory_endpoint(): config_path = os.path.join(CONF_DIR, 'mapred-site.xml') prop_name = 'mapreduce.jobhistory.webapp.address' - value = parse(config_path, prop_name) - if value is not None: - host, _, port = value.partition(':') - return host, port - else: - return None + return parse(config_path, prop_name) -def get_nodemanager_host_port(): +def get_nodemanager_endpoint(): config_path = os.path.join(CONF_DIR, 'yarn-site.xml') prop_name = 'yarn.nodemanager.webapp.address' - value = parse(config_path, prop_name) - if value is not None: - host, _, port = value.partition(':') - return host, port - else: - return None + return parse(config_path, prop_name) -def get_webproxy_host_port(): +def get_webproxy_endpoint(timeout=30): config_path = os.path.join(CONF_DIR, 'yarn-site.xml') prop_name = 'yarn.web-proxy.address' value = parse(config_path, prop_name) - if value is not None: - host, _, port = value.partition(':') - return host, port - else: - return get_resource_manager_host_port() + return value or get_resource_manager_endpoint(timeout) def parse(config_path, key): diff --git a/yarn_api_client/history_server.py b/yarn_api_client/history_server.py index 2564ed7..cf10e1c 100644 --- a/yarn_api_client/history_server.py +++ b/yarn_api_client/history_server.py @@ -3,7 +3,7 @@ from .base import BaseYarnAPI from .constants import JobStateInternal from .errors import IllegalArgumentError -from .hadoop_conf import get_jobhistory_host_port +from .hadoop_conf import get_jobhistory_endpoint class HistoryServer(BaseYarnAPI): @@ -12,20 +12,22 @@ class HistoryServer(BaseYarnAPI): applications. Currently it only supports MapReduce and provides information on finished jobs. - If `address` argument is `None` client will try to extract `address` and - `port` from Hadoop configuration files. + If `service_endpoint` argument is `None` client will try to extract it from + Hadoop configuration files. - :param str address: HistoryServer HTTP address - :param int port: HistoryServer HTTP port + :param str service_endpoint: HistoryServer HTTP(S) address :param int timeout: API connection timeout in seconds - :param boolean kerberos_enabled: Flag identifying is Kerberos Security has been enabled for YARN + :param AuthBase auth: Auth to use for requests + :param boolean verify: Either a boolean, in which case it controls whether + we verify the server's TLS certificate, or a string, in which case it must + be a path to a CA bundle to use. Defaults to ``True`` """ - def __init__(self, address=None, port=19888, timeout=30, kerberos_enabled=False): - if address is None: + def __init__(self, service_endpoint=None, timeout=30, auth=None, verify=True): + if not service_endpoint: self.logger.debug('Get information from hadoop conf dir') - address, port = get_jobhistory_host_port() + service_endpoint = get_jobhistory_endpoint() - super(HistoryServer, self).__init__(address, port, timeout, kerberos_enabled) + super(HistoryServer, self).__init__(service_endpoint, timeout, auth, verify) def application_information(self): """ diff --git a/yarn_api_client/node_manager.py b/yarn_api_client/node_manager.py index efbf8ed..3b96d6a 100644 --- a/yarn_api_client/node_manager.py +++ b/yarn_api_client/node_manager.py @@ -2,7 +2,7 @@ from .base import BaseYarnAPI from .constants import ApplicationState from .errors import IllegalArgumentError -from .hadoop_conf import get_nodemanager_host_port +from .hadoop_conf import get_nodemanager_endpoint class NodeManager(BaseYarnAPI): @@ -10,17 +10,22 @@ class NodeManager(BaseYarnAPI): The NodeManager REST API's allow the user to get status on the node and information about applications and containers running on that node. - :param str address: NodeManager HTTP address - :param int port: NodeManager HTTP port + If `service_endpoint` argument is `None` client will try to extract it from + Hadoop configuration files. + + :param str service_endpoint: NodeManager HTTP(S) address :param int timeout: API connection timeout in seconds - :param boolean kerberos_enabled: Flag identifying is Kerberos Security has been enabled for YARN + :param AuthBase auth: Auth to use for requests + :param boolean verify: Either a boolean, in which case it controls whether + we verify the server's TLS certificate, or a string, in which case it must + be a path to a CA bundle to use. Defaults to ``True`` """ - def __init__(self, address=None, port=8042, timeout=30, kerberos_enabled=False): - if address is None: + def __init__(self, service_endpoint=None, timeout=30, auth=None, verify=True): + if not service_endpoint: self.logger.debug('Get configuration from hadoop conf dir') - address, port = get_nodemanager_host_port() + service_endpoint = get_nodemanager_endpoint() - super(NodeManager, self).__init__(address, port, timeout, kerberos_enabled) + super(NodeManager, self).__init__(service_endpoint, timeout, auth, verify) def node_information(self): """ diff --git a/yarn_api_client/resource_manager.py b/yarn_api_client/resource_manager.py index cc87907..89a7d54 100644 --- a/yarn_api_client/resource_manager.py +++ b/yarn_api_client/resource_manager.py @@ -3,7 +3,7 @@ from .base import BaseYarnAPI from .constants import YarnApplicationState, FinalApplicationStatus from .errors import IllegalArgumentError -from .hadoop_conf import get_resource_manager_host_port, check_is_active_rm, CONF_DIR +from .hadoop_conf import get_resource_manager_endpoint, check_is_active_rm, CONF_DIR class ResourceManager(BaseYarnAPI): @@ -13,38 +13,42 @@ class ResourceManager(BaseYarnAPI): scheduler information, information about nodes in the cluster, and information about applications on the cluster. - If `address` argument is `None` client will try to extract `address` and - `port` from Hadoop configuration files. If both `address` and `alt_address` - are provided, the address corresponding to the ACTIVE HA Resource Manager will + If `service_endpoint` argument is `None` client will try to extract it from + Hadoop configuration files. If both `address` and `alt_address` are + provided, the address corresponding to the ACTIVE HA Resource Manager will be used. - :param str address: ResourceManager HTTP address - :param int port: ResourceManager HTTP port - :param str alt_address: Alternate ResourceManager HTTP address for HA configurations - :param int alt_port: Alternate ResourceManager HTTP port for HA configurations + :param List[str] service_endpoints: List of ResourceManager HTTP(S) + addresses :param int timeout: API connection timeout in seconds - :param boolean kerberos_enabled: Flag identifying is Kerberos Security has been enabled for YARN + :param AuthBase auth: Auth to use for requests configurations + :param boolean verify: Either a boolean, in which case it controls whether + we verify the server's TLS certificate, or a string, in which case it must + be a path to a CA bundle to use. Defaults to ``True`` """ - def __init__(self, address=None, port=8088, alt_address=None, alt_port=8088, timeout=30, kerberos_enabled=False): - if address is None: + def __init__(self, service_endpoints=None, timeout=30, auth=None, verify=True): + active_service_endpoint = None + if not service_endpoints: self.logger.debug('Get configuration from hadoop conf dir: {conf_dir}'.format(conf_dir=CONF_DIR)) - address, port = get_resource_manager_host_port() + active_service_endpoint = get_resource_manager_endpoint(timeout) else: - if alt_address: # Determine active RM - if not check_is_active_rm(address, port): - # Default is not active, check alternate - if check_is_active_rm(alt_address, alt_port): - address, port = alt_address, alt_port + for endpoint in service_endpoints: + if check_is_active_rm(endpoint, timeout): + active_service_endpoint = endpoint + break - super(ResourceManager, self).__init__(address, port, timeout, kerberos_enabled) + if active_service_endpoint: + super(ResourceManager, self).__init__(active_service_endpoint, timeout, auth, verify) + else: + raise Exception("No active RMs found") - def get_active_host_port(self): + def get_active_endpoint(self): """ The active address, port tuple to which this instance is associated. - - :return: Tuple (str, int) corresponding to the active address and port + :return: str service_endpoint: Service endpoint URL corresponding to + the active address of RM """ - return self.address, self.port + return self.service_uri.to_url() def cluster_information(self): """ @@ -135,7 +139,7 @@ def cluster_applications(self, state=None, final_status=None, params = self.construct_parameters(loc_args) - return self.request(path, **params) + return self.request(path, params=params) def cluster_application_statistics(self, state_list=None, application_type_list=None): @@ -174,7 +178,7 @@ def cluster_application_statistics(self, state_list=None, ('applicationTypes', application_types)) params = self.construct_parameters(loc_args) - return self.request(path, **params) + return self.request(path, params=params) def cluster_application(self, application_id): """