From b1ff2c5053ae81b4df8ef4e9a89c229cba1c64ae Mon Sep 17 00:00:00 2001 From: Luciano Resende Date: Tue, 18 Sep 2018 15:26:36 -0700 Subject: [PATCH 1/5] Enable building universal distribution --- setup.cfg | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 setup.cfg diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..31cda97 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,6 @@ +[bdist_wheel] +universal=1 + +[metadata] +description-file=README.rst +license_file = LICENSE From 418ba77f62b67453477b59de82de219997a7efad Mon Sep 17 00:00:00 2001 From: Luciano Resende Date: Tue, 18 Sep 2018 15:30:08 -0700 Subject: [PATCH 2/5] Update git configuration files --- .gitattributes | 12 ++++++++++++ .gitignore | 22 +++++++++++++++++++++- 2 files changed, 33 insertions(+), 1 deletion(-) create mode 100644 .gitattributes diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..198fa3d --- /dev/null +++ b/.gitattributes @@ -0,0 +1,12 @@ +# Set the default behavior to have all files normalized to Unix-style +# line endings upon check-in. +* text=auto + # Declare files that will always have CRLF line endings on checkout. +*.bat text eol=crlf + # Denote all files that are truly binary and should not be modified. +*.dll binary +*.exp binary +*.lib binary +*.pdb binary +*.exe binary + diff --git a/.gitignore b/.gitignore index edb7da1..b0d0115 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,27 @@ *~ \#*\# +# Mac +.DS_Store + +# Eclipse +.classpath +.project +.settings/ +target/ + +# Intellij +.idea/ +.idea_modules/ +*.iml +*.iws +*.class +*.log + +# Others +.checkstyle +.fbExcludeFilterFile + # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] @@ -55,4 +76,3 @@ coverage.xml # Sphinx documentation docs/_build/ - From c04e2fce58f1dc8f2dcaf43ec488031cb1342ea9 Mon Sep 17 00:00:00 2001 From: Luciano Resende Date: Wed, 19 Sep 2018 14:50:11 -0700 Subject: [PATCH 3/5] Support YARN endpoints protected by Kerberos/SPNEGO --- .travis.yml | 6 ++- setup.py | 10 ++-- tests/test_base.py | 64 +++++++++++++---------- tox.ini | 12 ++--- yarn_api_client/application_master.py | 5 +- yarn_api_client/base.py | 74 ++++++++++++++++----------- yarn_api_client/history_server.py | 5 +- yarn_api_client/main.py | 2 +- yarn_api_client/node_manager.py | 5 +- yarn_api_client/resource_manager.py | 43 +++++++++++++--- 10 files changed, 141 insertions(+), 85 deletions(-) diff --git a/.travis.yml b/.travis.yml index 536f126..e5acc2e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -13,7 +13,11 @@ matrix: install: - - pip install tox coveralls + - pip install --upgrade setuptools pip + - pip install --upgrade requests requests_mock requests_kerberos tox coveralls + - python setup.py bdist_wheel + - pip install dist/*.whl + - pip freeze script: tox diff --git a/setup.py b/setup.py index 503c8df..532a21d 100644 --- a/setup.py +++ b/setup.py @@ -19,17 +19,17 @@ def find_version(*file_paths): return version_match.group(1) raise RuntimeError("Unable to find version string.") - -install_requires = [] - setup( name = 'yarn-api-client', version = find_version('yarn_api_client', '__init__.py'), description='Python client for Hadoop® YARN API', long_description=read('README.rst'), - packages = find_packages(exclude=['tests']), + packages = find_packages(exclude=['tests','itests']), - install_requires = install_requires, + install_requires = [ + 'requests>=2.7,<3.0', + 'requests-kerberos==0.12.0', + ], entry_points = { 'console_scripts': [ 'yarn_client = yarn_api_client.main:main', diff --git a/tests/test_base.py b/tests/test_base.py index dc9ef52..85126c9 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -4,56 +4,66 @@ except ImportError: from http.client import OK -from mock import patch -from tests import TestCase +import json +import requests +import requests_mock +from tests import TestCase from yarn_api_client import base from yarn_api_client.errors import APIError, ConfigurationError class BaseYarnAPITestCase(TestCase): - def test_request(self): - client = self.get_client() - with patch('yarn_api_client.base.HTTPConnection') as http_conn_mock: - with patch('yarn_api_client.base.json'): - http_conn_mock().getresponse().status = OK + @staticmethod + def success_response(): + return { + 'status':'success' + } - client.request('/ololo', foo='bar') + def test_valid_request(self): + with requests_mock.mock() as requests_get_mock: + requests_get_mock.get('/ololo', text=json.dumps(BaseYarnAPITestCase.success_response())) - http_conn_mock().request.assert_called_with('GET', '/ololo?foo=bar') + client = self.get_client() + response = client.request('/ololo', foo='bar') - http_conn_mock.reset_mock() - client.request('/ololo') + assert requests_get_mock.called + self.assertIn(response.data['status'], 'success') + + + def test_valid_request_with_parameters(self): + with requests_mock.mock() as requests_get_mock: + requests_get_mock.get('/ololo?foo=bar', text=json.dumps(BaseYarnAPITestCase.success_response())) - http_conn_mock() + client = self.get_client() + response = client.request('/ololo', foo='bar') - http_conn_mock().request.assert_called_with('GET', '/ololo') + assert requests_get_mock.called + self.assertIn(response.data['status'], 'success') def test_bad_request(self): - client = self.get_client() - with patch('yarn_api_client.base.HTTPConnection') as http_conn_mock: - http_conn_mock().getresponse().status = 404 + with requests_mock.mock() as requests_get_mock: + requests_get_mock.get('/ololo', status_code=404) + client = self.get_client() with self.assertRaises(APIError): client.request('/ololo') - - def test_http_configuration(self): - client = self.get_client() - client.address = None - client.port = 80 - with self.assertRaises(ConfigurationError): - conn = client.http_conn + def test_http_configuration(self): + with requests_mock.mock() as requests_get_mock: + requests_get_mock.get('/ololo', text=json.dumps(BaseYarnAPITestCase.success_response())) - client.address = 'localhost' - client.port = None + client = self.get_client() + client.address = None + client.port = 80 - with self.assertRaises(ConfigurationError): - conn = client.http_conn + with self.assertRaises(ConfigurationError): + client.request('/ololo') def get_client(self): client = base.BaseYarnAPI() client.address = 'example.com' client.port = 80 client.timeout = 0 + client.kerberos_enabled = False return client diff --git a/tox.ini b/tox.ini index aa4429b..6f98edd 100644 --- a/tox.ini +++ b/tox.ini @@ -1,15 +1,11 @@ [tox] -envlist = py26,py27,py34,py35 +envlist = py27,py35,py36 [testenv] deps = + requests + requests-kerberos + requests_mock coverage mock commands = coverage run --source=yarn_api_client setup.py test - -[testenv:py26] -deps = - argparse - coverage - mock - unittest2 diff --git a/yarn_api_client/application_master.py b/yarn_api_client/application_master.py index b65f9c1..50f529d 100644 --- a/yarn_api_client/application_master.py +++ b/yarn_api_client/application_master.py @@ -18,9 +18,10 @@ class ApplicationMaster(BaseYarnAPI): :param str address: Proxy HTTP address :param int port: Proxy HTTP port :param int timeout: API connection timeout in seconds + :param boolean kerberos_enabled: Flag identifying is Kerberos Security has been enabled for YARN """ - def __init__(self, address=None, port=8088, timeout=30): - self.address, self.port, self.timeout = address, port, timeout + def __init__(self, address=None, port=8088, timeout=30, kerberos_enabled=False): + self.address, self.port, self.timeout, self.kerberos_enabled = address, port, timeout, kerberos_enabled if address is None: self.logger.debug('Get configuration from hadoop conf dir') address, port = get_webproxy_host_port() diff --git a/yarn_api_client/base.py b/yarn_api_client/base.py index af2a10f..145c70f 100644 --- a/yarn_api_client/base.py +++ b/yarn_api_client/base.py @@ -1,60 +1,72 @@ # -*- coding: utf-8 -*- from __future__ import unicode_literals -try: - from httplib import HTTPConnection, OK -except ImportError: - from http.client import HTTPConnection, OK + import json import logging -try: - from urllib import urlencode -except ImportError: - from urllib.parse import urlencode +import requests +from requests_kerberos import HTTPKerberosAuth from .errors import APIError, ConfigurationError class Response(object): - def __init__(self, http_response): - self.data = json.load(http_response) + def __init__(self, response): + self.data = response.json() class BaseYarnAPI(object): + __logger = None response_class = Response + def _validate_configuration(self): + if self.address is None: + raise ConfigurationError('API address is not set') + elif self.port is None: + raise ConfigurationError('API port is not set') + def request(self, api_path, **query_args): - params = urlencode(query_args) - if params: - path = api_path + '?' + params + params = query_args + api_endpoint = 'http://{}:{}{}'.format(self.address, self.port, api_path) + + self.logger.info('API Endpoint {}'.format(api_endpoint)) + + self._validate_configuration() + + response = None + if self.kerberos_enabled: + response = requests.get(api_endpoint, params, auth=HTTPKerberosAuth()) else: - path = api_path + response = requests.get(api_endpoint, params) - self.logger.info('Request http://%s:%s%s', self.address, self.port, path) - - http_conn = self.http_conn - http_conn.request('GET', path) - response = http_conn.getresponse() + if response.status_code == requests.codes.ok: + return self.response_class(response) + else: + msg = 'Response finished with status: %s. Details: %s' % (response.status_code, response.text) + raise APIError(msg) - if response.status == OK: + def update(self, api_path, data): + api_endpoint = 'http://{}:{}{}'.format(self.address, self.port, api_path) + + self.logger.info('API Endpoint {}'.format(api_endpoint)) + + self._validate_configuration() + + response = None + if self.kerberos_enabled: + response = requests.put(api_endpoint, data=data, auth=HTTPKerberosAuth()) + else: + response = requests.put(api_endpoint, data=data) + + if response.status_code == requests.codes.ok: return self.response_class(response) else: - explanation = response.read() - msg = 'Response finished with status: %s. Details: %s' % (response.status, explanation) + msg = 'Response finished with status: %s. Details: %s' % (response.status_code, response.text) raise APIError(msg) def construct_parameters(self, arguments): params = dict((key, value) for key, value in arguments if value is not None) return params - @property - def http_conn(self): - if self.address is None: - raise ConfigurationError('API address is not set') - elif self.port is None: - raise ConfigurationError('API port is not set') - return HTTPConnection(self.address, self.port, timeout=self.timeout) - - __logger = None @property def logger(self): if self.__logger is None: diff --git a/yarn_api_client/history_server.py b/yarn_api_client/history_server.py index 22b219e..4a99d44 100644 --- a/yarn_api_client/history_server.py +++ b/yarn_api_client/history_server.py @@ -18,9 +18,10 @@ class HistoryServer(BaseYarnAPI): :param str address: HistoryServer HTTP address :param int port: HistoryServer HTTP port :param int timeout: API connection timeout in seconds + :param boolean kerberos_enabled: Flag identifying is Kerberos Security has been enabled for YARN """ - def __init__(self, address=None, port=19888, timeout=30): - self.address, self.port, self.timeout = address, port, timeout + def __init__(self, address=None, port=19888, timeout=30, kerberos_enabled=False): + self.address, self.port, self.timeout, self.kerberos_enabled = address, port, timeout, kerberos_enabled if address is None: self.logger.debug('Get information from hadoop conf dir') address, port = get_jobhistory_host_port() diff --git a/yarn_api_client/main.py b/yarn_api_client/main.py index 05421f6..51634d6 100644 --- a/yarn_api_client/main.py +++ b/yarn_api_client/main.py @@ -256,7 +256,7 @@ def main(): method_args = [getattr(opts, arg) for arg in opts.method_args] else: method_args = [] - # Construce key arguments for method + # Construct key arguments for method if 'method_kwargs' in opts: method_kwargs = dict((key, getattr(opts, key)) for key in opts.method_kwargs) else: diff --git a/yarn_api_client/node_manager.py b/yarn_api_client/node_manager.py index 5848d4d..37660f4 100644 --- a/yarn_api_client/node_manager.py +++ b/yarn_api_client/node_manager.py @@ -12,9 +12,10 @@ class NodeManager(BaseYarnAPI): :param str address: NodeManager HTTP address :param int port: NodeManager HTTP port :param int timeout: API connection timeout in seconds + :param boolean kerberos_enabled: Flag identifying is Kerberos Security has been enabled for YARN """ - def __init__(self, address=None, port=8042, timeout=30): - self.address, self.port, self.timeout = address, port, timeout + def __init__(self, address=None, port=8042, timeout=30, kerberos_enabled=False): + self.address, self.port, self.timeout, self.kerberos_enabled = address, port, timeout, kerberos_enabled def node_information(self): """ diff --git a/yarn_api_client/resource_manager.py b/yarn_api_client/resource_manager.py index 1161379..5f327d9 100644 --- a/yarn_api_client/resource_manager.py +++ b/yarn_api_client/resource_manager.py @@ -19,9 +19,10 @@ class ResourceManager(BaseYarnAPI): :param str address: ResourceManager HTTP address :param int port: ResourceManager HTTP port :param int timeout: API connection timeout in seconds + :param boolean kerberos_enabled: Flag identifying is Kerberos Security has been enabled for YARN """ - def __init__(self, address=None, port=8088, timeout=30): - self.address, self.port, self.timeout = address, port, timeout + def __init__(self, address=None, port=8088, timeout=30, kerberos_enabled=False): + self.address, self.port, self.timeout, self.kerberos_enabled = address, port, timeout, kerberos_enabled if address is None: self.logger.debug('Get configuration from hadoop conf dir') address, port = get_resource_manager_host_port() @@ -132,7 +133,7 @@ def cluster_application_statistics(self, state_list=None, comma-separated list. If states is not provided, the API will enumerate all application states and return the counts of them. :param list application_type_list: types of the applications, - specified as a comma-separated list. If applicationTypes is not + specified as a comma-separated list. If application_types is not provided, the API will count the applications of any application type. In this case, the response shows * to indicate any application type. Note that we only support at most one @@ -146,13 +147,13 @@ def cluster_application_statistics(self, state_list=None, # TODO: validate state argument states = ','.join(state_list) if state_list is not None else None if application_type_list is not None: - applicationTypes = ','.join(application_type_list) + application_types = ','.join(application_type_list) else: - applicationTypes = None + application_types = None loc_args = ( ('states', states), - ('applicationTypes', applicationTypes)) + ('applicationTypes', application_types)) params = self.construct_parameters(loc_args) return self.request(path, **params) @@ -184,6 +185,36 @@ def cluster_application_attempts(self, application_id): return self.request(path) + def cluster_application_state(self, application_id): + """ + With the application state API, you can obtain the current + state of an application. + + :param str application_id: The application id + :returns: API response object with JSON data + :rtype: :py:class:`yarn_api_client.base.Response` + """ + path = '/ws/v1/cluster/apps/{appid}/state'.format( + appid=application_id) + + return self.request(path) + + def cluster_application_kill(self, application_id): + """ + With the application kill API, you can kill an application + that is not in FINISHED or FAILED state. + + :param str application_id: The application id + :returns: API response object with JSON data + :rtype: :py:class:`yarn_api_client.base.Response` + """ + + data = '{"state": "KILLED"}' + path = '/ws/v1/cluster/apps/{appid}/state'.format( + appid=application_id) + + return self.put(path, data) + def cluster_nodes(self, state=None, healthy=None): """ With the Nodes API, you can obtain a collection of resources, each of From 8ef5cae388cfc3571f42ff6ef0aa51fe18554378 Mon Sep 17 00:00:00 2001 From: Luciano Resende Date: Wed, 19 Sep 2018 15:23:26 -0700 Subject: [PATCH 4/5] Integration tests run against provided YARN ENDPOINT Integration test that, given a provided YARN ENDPOINT, execute some real scenario test against that server. Note that, if no YARN ENDPOINT is provided, the tests are ignored. --- .travis.yml | 5 +- itests/__init__.py | 5 ++ itests/integration_test_resource_manager.py | 83 +++++++++++++++++++++ setup.py | 2 +- tox.ini | 10 +-- 5 files changed, 95 insertions(+), 10 deletions(-) create mode 100644 itests/__init__.py create mode 100644 itests/integration_test_resource_manager.py diff --git a/.travis.yml b/.travis.yml index e5acc2e..54f5c6c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -13,10 +13,7 @@ matrix: install: - - pip install --upgrade setuptools pip - - pip install --upgrade requests requests_mock requests_kerberos tox coveralls - - python setup.py bdist_wheel - - pip install dist/*.whl + - pip install --upgrade pip tox coveralls - pip freeze script: diff --git a/itests/__init__.py b/itests/__init__.py new file mode 100644 index 0000000..9e4c88f --- /dev/null +++ b/itests/__init__.py @@ -0,0 +1,5 @@ +# -*- coding: utf-8 -*- +try: + from unittest2 import TestCase +except ImportError: + from unittest import TestCase diff --git a/itests/integration_test_resource_manager.py b/itests/integration_test_resource_manager.py new file mode 100644 index 0000000..7784bf9 --- /dev/null +++ b/itests/integration_test_resource_manager.py @@ -0,0 +1,83 @@ +# -*- coding: utf-8 -*- + +import os + +from pprint import pprint +from unittest import TestCase +from yarn_api_client.resource_manager import ResourceManager + +try: + from urlparse import urlparse +except ImportError: + from urllib.parse import urlparse + + +class ResourceManagerTestCase(TestCase): + """ + Integration test that, given a provided YARN ENDPOINT, + execute some real scenario test against that server. + + Note that, if no YARN ENDPOINT is provided, the tests + are ignored. + """ + @classmethod + def setUpClass(self): + self.configured = False + if os.getenv('YARN_ENDPOINT'): + yarn_endpoint = os.getenv('YARN_ENDPOINT') + yarn_endpoint_uri = urlparse(yarn_endpoint) + + if yarn_endpoint_uri.hostname and yarn_endpoint_uri.port: + self.configured = True + self.resourceManager = ResourceManager(yarn_endpoint_uri.hostname, yarn_endpoint_uri.port) + + def test_cluster_information(self): + if self.configured: + info = self.resourceManager.cluster_information() + pprint(info.data) + self.assertEqual(info.data['clusterInfo']['state'], 'STARTED') + + def test_cluster_metrics(self): + if self.configured: + metrics = self.resourceManager.cluster_metrics() + pprint(metrics.data) + self.assertGreater(metrics.data['clusterMetrics']['activeNodes'], 0) + self.assertIsNotNone(metrics.data['clusterMetrics']['totalNodes']) + + def test_cluster_scheduler(self): + if self.configured: + scheduler = self.resourceManager.cluster_scheduler() + pprint(scheduler.data) + self.assertIsNotNone(scheduler.data['scheduler']['schedulerInfo']) + + def test_cluster_applications(self): + if self.configured: + apps = self.resourceManager.cluster_applications() + pprint(apps.data) + self.assertIsNotNone(apps.data['apps']) + + def test_cluster_application_state(self): + if self.configured: + apps = self.resourceManager.cluster_applications() + appid = apps.data['apps']['app'][0]['id'] + print(appid) + response = self.resourceManager.cluster_application_state(appid) + pprint(response.data) + pprint(response.data['state']) + self.assertIsNotNone(apps.data['apps']) + + def test_cluster_application_statistics(self): + if self.configured: + appstats = self.resourceManager.cluster_application_statistics() + pprint(appstats.data) + self.assertIsNotNone(appstats.data['appStatInfo']) + + def test_cluster_nodes(self): + if self.configured: + nodes = self.resourceManager.cluster_nodes() + pprint(nodes.data) + self.assertIsNotNone(nodes.data['nodes']) + + running_nodes = self.resourceManager.cluster_nodes(state='RUNNING', healthy='true') + pprint(running_nodes.data) + self.assertIsNotNone(nodes.data['nodes']) diff --git a/setup.py b/setup.py index 532a21d..7ce7293 100644 --- a/setup.py +++ b/setup.py @@ -28,7 +28,7 @@ def find_version(*file_paths): install_requires = [ 'requests>=2.7,<3.0', - 'requests-kerberos==0.12.0', + 'requests-kerberos', ], entry_points = { 'console_scripts': [ diff --git a/tox.ini b/tox.ini index 6f98edd..51138db 100644 --- a/tox.ini +++ b/tox.ini @@ -3,9 +3,9 @@ envlist = py27,py35,py36 [testenv] deps = - requests - requests-kerberos - requests_mock - coverage - mock + coverage + mock + requests + requests-kerberos + requests_mock commands = coverage run --source=yarn_api_client setup.py test From a21f612582c996da95cb471b0e47ffbe0cce7d72 Mon Sep 17 00:00:00 2001 From: Luciano Resende Date: Thu, 20 Sep 2018 08:52:37 -0700 Subject: [PATCH 5/5] Bump version in preparation for 0.3.0 release --- README.rst | 30 +++++++++++++++++++++++------- yarn_api_client/__init__.py | 2 +- 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/README.rst b/README.rst index 6d1028e..d545427 100644 --- a/README.rst +++ b/README.rst @@ -2,11 +2,7 @@ hadoop-yarn-api-python-client ============================= -Python client for Hadoop® YARN API - -.. image:: https://coveralls.io/repos/toidi/hadoop-yarn-api-python-client/badge.png - :target: https://coveralls.io/r/toidi/hadoop-yarn-api-python-client - :alt: Test coverage +Python client for Apache Hadoop® YARN API .. image:: https://img.shields.io/pypi/v/yarn-api-client.svg :target: https://pypi.python.org/pypi/yarn-api-client/ @@ -16,6 +12,14 @@ Python client for Hadoop® YARN API :target: https://travis-ci.org/toidi/hadoop-yarn-api-python-client :alt: Travis CI build status +.. image:: http://readthedocs.org/projects/python-client-for-hadoop-yarn-api/badge/?version=latest + :target: https://python-client-for-hadoop-yarn-api.readthedocs.org/en/latest/?badge=latest + :alt: Latest documentation status + +.. image:: https://coveralls.io/repos/toidi/hadoop-yarn-api-python-client/badge.png + :target: https://coveralls.io/r/toidi/hadoop-yarn-api-python-client + :alt: Test coverage + Package documentation: python-client-for-hadoop-yarn-api.readthedocs.org_ REST API documentation: hadoop.apache.org_ @@ -30,6 +34,11 @@ From PyPI pip install yarn-api-client +From Anaconda (conda forge) + +:: + + conda install -c conda-forge yarn-api-client From source code @@ -67,9 +76,16 @@ Programmatic interface Changelog ========= -0.2.5 - Fixed History REST API +0.3.0 Release + - Add support for YARN endpoints protected by Kerberos/SPNEGO + - Moved to `requests` package for REST API invocation + - Remove `http_con` property, as connections are now managed by `requests` package + +0.2.5 Release + - Fixed History REST API -0.2.4 - Added compatibility with HA enabled Resource Manager +0.2.4 Release + - Added compatibility with HA enabled Resource Manager .. _python-client-for-hadoop-yarn-api.readthedocs.org: http://python-client-for-hadoop-yarn-api.readthedocs.org/en/latest/ .. _hadoop.apache.org: http://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/WebServicesIntro.html diff --git a/yarn_api_client/__init__.py b/yarn_api_client/__init__.py index 4c91f4b..ac4c252 100644 --- a/yarn_api_client/__init__.py +++ b/yarn_api_client/__init__.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -__version__ = '0.2.5' +__version__ = '0.3.0' __all__ = ['ApplicationMaster', 'HistoryServer', 'NodeManager', 'ResourceManager']