Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for handling multiple RMs sans Hadoop config #29

Merged
merged 1 commit into from
May 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions tests/test_hadoop_conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def test_get_resource_host_port(self):

@mock.patch('yarn_api_client.hadoop_conf._get_rm_ids')
@mock.patch('yarn_api_client.hadoop_conf.parse')
@mock.patch('yarn_api_client.hadoop_conf._check_is_active_rm')
@mock.patch('yarn_api_client.hadoop_conf.check_is_active_rm')
def test_get_resource_host_port_with_ha(self, check_is_active_rm_mock, parse_mock, get_rm_ids_mock):
get_rm_ids_mock.return_value = ['rm1', 'rm2']
parse_mock.return_value = 'example.com:8022'
Expand Down Expand Up @@ -116,17 +116,17 @@ def getheader(self, header_key, default_return):

http_conn_request_mock.return_value = None
http_getresponse_mock.return_value = ResponseMock(OK, {})
self.assertTrue(hadoop_conf._check_is_active_rm('example2', '8022'))
self.assertTrue(hadoop_conf.check_is_active_rm('example2', '8022'))
http_getresponse_mock.reset_mock()
http_getresponse_mock.return_value = ResponseMock(OK, {'Refresh': "testing"})
self.assertFalse(hadoop_conf._check_is_active_rm('example2', '8022'))
self.assertFalse(hadoop_conf.check_is_active_rm('example2', '8022'))
http_getresponse_mock.reset_mock()
http_getresponse_mock.return_value = ResponseMock(NOT_FOUND, {'Refresh': "testing"})
self.assertFalse(hadoop_conf._check_is_active_rm('example2', '8022'))
self.assertFalse(hadoop_conf.check_is_active_rm('example2', '8022'))
http_conn_request_mock.side_effect = Exception('error')
http_conn_request_mock.reset_mock()
http_conn_request_mock.return_value = None
self.assertFalse(hadoop_conf._check_is_active_rm('example2', '8022'))
self.assertFalse(hadoop_conf.check_is_active_rm('example2', '8022'))
pass

def test_get_resource_manager(self):
Expand Down
4 changes: 2 additions & 2 deletions yarn_api_client/hadoop_conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def _get_resource_manager(hadoop_conf_path, rm_id=None):
return None


def _check_is_active_rm(rm_web_host, rm_web_port):
def check_is_active_rm(rm_web_host, rm_web_port):
conn = HTTPConnection(rm_web_host, rm_web_port)
try:
conn.request('GET', '/cluster')
Expand All @@ -52,7 +52,7 @@ def get_resource_manager_host_port():
ret = _get_resource_manager(hadoop_conf_path, rm_id)
if ret is not None:
(host, port) = ret
if _check_is_active_rm(host, port):
if check_is_active_rm(host, port):
return host, port
return None
else:
Expand Down
26 changes: 22 additions & 4 deletions yarn_api_client/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from .base import BaseYarnAPI
from .constants import YarnApplicationState, FinalApplicationStatus
from .errors import IllegalArgumentError
from .hadoop_conf import get_resource_manager_host_port
from .hadoop_conf import get_resource_manager_host_port, check_is_active_rm, CONF_DIR


class ResourceManager(BaseYarnAPI):
Expand All @@ -14,20 +14,38 @@ class ResourceManager(BaseYarnAPI):
and information about applications on the cluster.

If `address` argument is `None` client will try to extract `address` and
`port` from Hadoop configuration files.
`port` from Hadoop configuration files. If both `address` and `alt_address`
are provided, the address corresponding to the ACTIVE HA Resource Manager will
be used.

:param str address: ResourceManager HTTP address
:param int port: ResourceManager HTTP port
:param str alt_address: Alternate ResourceManager HTTP address for HA configurations
:param int alt_port: Alternate ResourceManager HTTP port for HA configurations
:param int timeout: API connection timeout in seconds
:param boolean kerberos_enabled: Flag identifying is Kerberos Security has been enabled for YARN
"""
def __init__(self, address=None, port=8088, timeout=30, kerberos_enabled=False):
def __init__(self, address=None, port=8088, alt_address=None, alt_port=8088, timeout=30, kerberos_enabled=False):
if address is None:
self.logger.debug('Get configuration from hadoop conf dir')
self.logger.debug('Get configuration from hadoop conf dir: {conf_dir}'.format(conf_dir=CONF_DIR))
address, port = get_resource_manager_host_port()
else:
if alt_address: # Determine active RM
if not check_is_active_rm(address, port):
# Default is not active, check alternate
if check_is_active_rm(alt_address, alt_port):
address, port = alt_address, alt_port

super(ResourceManager, self).__init__(address, port, timeout, kerberos_enabled)

def get_active_host_port(self):
"""
The active address, port tuple to which this instance is associated.

:return: Tuple (str, int) corresponding to the active address and port
"""
return self.address, self.port

def cluster_information(self):
"""
The cluster information resource provides overall information about
Expand Down