Skip to content

Commit

Permalink
Honor http policy configured in yarn-site xml (#41)
Browse files Browse the repository at this point in the history
In order to allow clients to have a means of using https for their
resource manager, this change enables that behavior ONLY in cases
where no resource manager address is provided.  In this case, the
module will first check if the http policy is HTTPS_ONLY and, in
such case, construct the appropriate endpoint.

Note that the changes are minimally evasive so as to prevent clients
from having to change code.
  • Loading branch information
kevin-bates authored and lresende committed Sep 17, 2019
1 parent 6602471 commit 39ee476
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 18 deletions.
28 changes: 27 additions & 1 deletion tests/test_hadoop_conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@
<name>yarn.resourcemanager.webapp.address</name>
<value>localhost:8022</value>
</property>
<property>
<name>yarn.resourcemanager.webapp.https.address</name>
<value>localhost:8024</value>
</property>
<property>
<name>yarn.http.policy</name>
<value>HTTPS_ONLY</value>
</property>
</configuration>
""".encode('latin1')

Expand All @@ -42,6 +50,14 @@ def test_parse(self):
value = hadoop_conf.parse(f.name, key)
self.assertEqual('localhost:8022', value)

key = 'yarn.resourcemanager.webapp.https.address'
value = hadoop_conf.parse(f.name, key)
self.assertEqual('localhost:8024', value)

key = 'yarn.http.policy'
value = hadoop_conf.parse(f.name, key)
self.assertEqual('HTTPS_ONLY', value)

with NamedTemporaryFile() as f:
f.write(empty_config)
f.flush()
Expand All @@ -50,6 +66,14 @@ def test_parse(self):
value = hadoop_conf.parse(f.name, key)
self.assertEqual(None, value)

key = 'yarn.resourcemanager.webapp.https.address'
value = hadoop_conf.parse(f.name, key)
self.assertEqual(None, value)

key = 'yarn.http.policy'
value = hadoop_conf.parse(f.name, key)
self.assertEqual(None, value)

def test_get_resource_host_port(self):
with patch('yarn_api_client.hadoop_conf.parse') as parse_mock:
with patch('yarn_api_client.hadoop_conf._get_rm_ids') as get_rm_ids_mock:
Expand Down Expand Up @@ -100,9 +124,10 @@ def test_get_rm_ids(self):
rm_list = hadoop_conf._get_rm_ids(hadoop_conf.CONF_DIR)
self.assertIsNone(rm_list)

@mock.patch('yarn_api_client.hadoop_conf._is_https_only')
@mock.patch(_http_request_method)
@mock.patch(_http_getresponse_method)
def test_check_is_active_rm(self, http_getresponse_mock, http_conn_request_mock):
def test_check_is_active_rm(self, http_getresponse_mock, http_conn_request_mock, is_https_only_mock):
class ResponseMock():
def __init__(self, status, header_dict):
self.status = status
Expand All @@ -114,6 +139,7 @@ def getheader(self, header_key, default_return):
else:
return default_return

is_https_only_mock.return_value = False
http_conn_request_mock.return_value = None
http_getresponse_mock.return_value = ResponseMock(OK, {})
self.assertTrue(hadoop_conf.check_is_active_rm('example2', '8022'))
Expand Down
15 changes: 12 additions & 3 deletions tests/test_resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,27 @@

from yarn_api_client.resource_manager import ResourceManager
from yarn_api_client.errors import IllegalArgumentError
from yarn_api_client.hadoop_conf import _is_https_only


@patch('yarn_api_client.resource_manager.ResourceManager.request')
class ResourceManagerTestCase(TestCase):
def setUp(self):
self.rm = ResourceManager('localhost')

@patch('yarn_api_client.resource_manager._is_https_only')
@patch('yarn_api_client.resource_manager.get_resource_manager_host_port')
def test__init__(self, get_config_mock, request_mock):
get_config_mock.return_value = (None, None)
ResourceManager()
def test__init__(self, get_config_mock, is_https_only_mock, request_mock):
get_config_mock.return_value = ('example', '8024')
is_https_only_mock.return_value = True

rm = ResourceManager()

get_config_mock.assert_called_with()
self.assertEqual(rm.address, 'example')
self.assertEqual(rm.port, '8024')
is_https_only_mock.assert_called_with()
self.assertEqual(rm.is_https, True)

def test_cluster_information(self, request_mock):
self.rm.cluster_information()
Expand Down
8 changes: 5 additions & 3 deletions yarn_api_client/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ class BaseYarnAPI(object):
__logger = None
response_class = Response

def __init__(self, address=None, port=None, timeout=None, kerberos_enabled=None):
self.address, self.port, self.timeout, self.kerberos_enabled = address, port, timeout, kerberos_enabled
def __init__(self, address=None, port=None, timeout=None, kerberos_enabled=None, is_https=False):
self.address, self.port, self.timeout, self.kerberos_enabled, self.is_https = \
address, port, timeout, kerberos_enabled, is_https

def _validate_configuration(self):
if self.address is None:
Expand All @@ -26,7 +27,8 @@ def _validate_configuration(self):
raise ConfigurationError('API port is not set')

def request(self, api_path, method='GET', **kwargs):
api_endpoint = 'http://{}:{}{}'.format(self.address, self.port, api_path)
scheme = 'https' if self.is_https else 'http'
api_endpoint = '{}://{}:{}{}'.format(scheme, self.address, self.port, api_path)

self.logger.info('API Endpoint {}'.format(api_endpoint))

Expand Down
36 changes: 28 additions & 8 deletions yarn_api_client/hadoop_conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
import os
import xml.etree.ElementTree as ET
try:
from httplib import HTTPConnection, OK
from httplib import HTTPConnection, HTTPSConnection, OK
except ImportError:
from http.client import HTTPConnection, OK
from http.client import HTTPConnection, HTTPSConnection, OK

CONF_DIR = os.getenv('HADOOP_CONF_DIR', '/etc/hadoop/conf')

Expand All @@ -15,25 +15,45 @@ def _get_rm_ids(hadoop_conf_path):
rm_ids = rm_ids.split(',')
return rm_ids


def _get_maximum_container_memory(hadoop_conf_path):
container_memory = int(parse(os.path.join(hadoop_conf_path,'yarn-site.xml'), 'yarn.nodemanager.resource.memory-mb'))
return container_memory


def _is_https_only():
# determine if HTTPS_ONLY is the configured policy, else use http
hadoop_conf_path = CONF_DIR
http_policy = parse(os.path.join(hadoop_conf_path, 'yarn-site.xml'), 'yarn.http.policy')
if http_policy == 'HTTPS_ONLY':
return True
return False


def _get_resource_manager(hadoop_conf_path, rm_id=None):
prop_name = 'yarn.resourcemanager.webapp.address'
if rm_id is not None:
rm_webapp_address = parse(os.path.join(hadoop_conf_path, 'yarn-site.xml'), '%s.%s' % (prop_name, rm_id))
# compose property name based on policy (and rm_id)
if _is_https_only():
prop_name = 'yarn.resourcemanager.webapp.https.address'
else:
rm_webapp_address = parse(os.path.join(hadoop_conf_path, 'yarn-site.xml'), prop_name)
prop_name = 'yarn.resourcemanager.webapp.address'

# Adjust prop_name if rm_id is set
if rm_id:
prop_name = "{name}.{rm_id}".format(name=prop_name, rm_id=rm_id)

rm_webapp_address = parse(os.path.join(hadoop_conf_path, 'yarn-site.xml'), prop_name)
if rm_webapp_address is not None:
[host, port] = rm_webapp_address.split(':')
return (host, port)
return host, port
else:
return None


def check_is_active_rm(rm_web_host, rm_web_port):
conn = HTTPConnection(rm_web_host, rm_web_port)
if _is_https_only():
conn = HTTPSConnection(rm_web_host, rm_web_port)
else:
conn = HTTPConnection(rm_web_host, rm_web_port)
try:
conn.request('GET', '/cluster')
except:
Expand Down
9 changes: 6 additions & 3 deletions yarn_api_client/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
from .constants import YarnApplicationState, FinalApplicationStatus
from .errors import IllegalArgumentError
from .hadoop_conf import get_resource_manager_host_port,\
check_is_active_rm, _get_maximum_container_memory, CONF_DIR
check_is_active_rm, _get_maximum_container_memory, CONF_DIR, \
_is_https_only
from collections import deque


class ResourceManager(BaseYarnAPI):
"""
The ResourceManager REST API's allow the user to get information about the
Expand All @@ -30,14 +32,15 @@ def __init__(self, address=None, port=8088, alt_address=None, alt_port=8088, tim
if address is None:
self.logger.debug('Get configuration from hadoop conf dir: {conf_dir}'.format(conf_dir=CONF_DIR))
address, port = get_resource_manager_host_port()
is_https = _is_https_only()
else:
is_https = False
if alt_address: # Determine active RM
if not check_is_active_rm(address, port):
# Default is not active, check alternate
if check_is_active_rm(alt_address, alt_port):
address, port = alt_address, alt_port

super(ResourceManager, self).__init__(address, port, timeout, kerberos_enabled)
super(ResourceManager, self).__init__(address, port, timeout, kerberos_enabled, is_https)

def get_active_host_port(self):
"""
Expand Down

0 comments on commit 39ee476

Please sign in to comment.