Skip to content

Commit

Permalink
[AIRFLOW-6833] HA for webhdfs connection (#7454)
Browse files Browse the repository at this point in the history
  • Loading branch information
GuzikJakub committed Mar 25, 2020
1 parent 686d7d5 commit 6c67087
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 16 deletions.
37 changes: 23 additions & 14 deletions airflow/providers/apache/hdfs/hooks/webhdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# under the License.
"""Hook for Web HDFS"""
import logging
import socket

from hdfs import HdfsError, InsecureClient

Expand Down Expand Up @@ -57,27 +58,35 @@ def __init__(self, webhdfs_conn_id='webhdfs_default', proxy_user=None):
def get_conn(self):
"""
Establishes a connection depending on the security mode set via config or environment variable.
:return: a hdfscli InsecureClient or KerberosClient object.
:rtype: hdfs.InsecureClient or hdfs.ext.kerberos.KerberosClient
"""
connections = self.get_connections(self.webhdfs_conn_id)
connection = self._find_valid_server()
if connection is None:
raise AirflowWebHDFSHookException("Failed to locate the valid server.")
return connection

def _find_valid_server(self):
connections = self.get_connections(self.webhdfs_conn_id)
for connection in connections:
host_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.log.info("Trying to connect to %s:%s", connection.host, connection.port)
try:
self.log.debug('Trying namenode %s', connection.host)
client = self._get_client(connection)
client.status('/')
self.log.debug('Using namenode %s for hook', connection.host)
return client
conn_check = host_socket.connect_ex((connection.host, connection.port))
if conn_check == 0:
self.log.info('Trying namenode %s', connection.host)
client = self._get_client(connection)
client.status('/')
self.log.info('Using namenode %s for hook', connection.host)
host_socket.close()
return client
else:
self.log.info("Could not connect to %s:%s", connection.host, connection.port)
host_socket.close()
except HdfsError as hdfs_error:
self.log.debug('Read operation on namenode %s failed with error: %s',
connection.host, hdfs_error)

hosts = [connection.host for connection in connections]
error_message = 'Read operations failed on the namenodes below:\n{hosts}'.format(
hosts='\n'.join(hosts))
raise AirflowWebHDFSHookException(error_message)
self.log.info('Read operation on namenode %s failed with error: %s',
connection.host, hdfs_error)
return None

def _get_client(self, connection):
connection_str = 'http://{host}:{port}'.format(host=connection.host, port=connection.port)
Expand Down
9 changes: 7 additions & 2 deletions tests/providers/apache/hdfs/hooks/test_webhdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ def setUp(self):
Connection(host='host_1', port=123),
Connection(host='host_2', port=321, login='user')
])
def test_get_conn(self, mock_get_connections, mock_insecure_client):
@patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket")
def test_get_conn(self, socket_mock, mock_get_connections, mock_insecure_client):
mock_insecure_client.side_effect = [HdfsError('Error'), mock_insecure_client.return_value]
socket_mock.socket.return_value.connect_ex.return_value = 0
conn = self.webhdfs_hook.get_conn()

mock_insecure_client.assert_has_calls([
Expand All @@ -52,18 +54,21 @@ def test_get_conn(self, mock_get_connections, mock_insecure_client):
Connection(host='host_1', port=123)
])
@patch('airflow.providers.apache.hdfs.hooks.webhdfs._kerberos_security_mode', return_value=True)
@patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket")
def test_get_conn_kerberos_security_mode(self,
socket_mock,
mock_kerberos_security_mode,
mock_get_connections,
mock_kerberos_client):
socket_mock.socket.return_value.connect_ex.return_value = 0
conn = self.webhdfs_hook.get_conn()

connection = mock_get_connections.return_value[0]
mock_kerberos_client.assert_called_once_with(
'http://{host}:{port}'.format(host=connection.host, port=connection.port))
self.assertEqual(conn, mock_kerberos_client.return_value)

@patch('airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connections', return_value=[])
@patch('airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook._find_valid_server', return_value=None)
def test_get_conn_no_connection_found(self, mock_get_connection):
with self.assertRaises(AirflowWebHDFSHookException):
self.webhdfs_hook.get_conn()
Expand Down

0 comments on commit 6c67087

Please sign in to comment.