Skip to content

Commit

Permalink
WebHDFSHook Bugfix/optional port (#24550)
Browse files Browse the repository at this point in the history
  • Loading branch information
ankurbajaj9 committed Jun 28, 2022
1 parent ed37c3a commit d8ec1ec
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 8 deletions.
22 changes: 16 additions & 6 deletions airflow/providers/apache/hdfs/hooks/webhdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,11 @@ def _find_valid_server(self) -> Any:
if conn_check == 0:
self.log.info('Trying namenode %s', namenode)
client = self._get_client(
namenode, connection.port, connection.login, connection.extra_dejson
namenode,
connection.port,
connection.login,
connection.schema,
connection.extra_dejson,
)
client.status('/')
self.log.info('Using namenode %s for hook', namenode)
Expand All @@ -89,13 +93,19 @@ def _find_valid_server(self) -> Any:
self.log.info('Read operation on namenode %s failed with error: %s', namenode, hdfs_error)
return None

def _get_client(self, namenode: str, port: int, login: str, extra_dejson: dict) -> Any:
connection_str = f'http://{namenode}:{port}'
def _get_client(self, namenode: str, port: int, login: str, schema: str, extra_dejson: dict) -> Any:
connection_str = f'http://{namenode}'
session = requests.Session()

if extra_dejson.get('use_ssl', False):
connection_str = f'https://{namenode}:{port}'
session.verify = extra_dejson.get('verify', True)
if extra_dejson.get('use_ssl', 'False') == 'True' or extra_dejson.get('use_ssl', False):
connection_str = f'https://{namenode}'
session.verify = extra_dejson.get('verify', False)

if port is not None:
connection_str += f':{port}'

if schema is not None:
connection_str += f'/{schema}'

if _kerberos_security_mode:
return KerberosClient(connection_str, session=session)
Expand Down
93 changes: 91 additions & 2 deletions tests/providers/apache/hdfs/hooks/test_webhdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ def setUp(self):
return_value=Connection(host='host_1.com,host_2.com', port=321, login='user'),
)
@patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket")
def test_get_conn(self, socket_mock, mock_get_connection, mock_insecure_client, mock_session):
def test_get_conn_without_schema(
self, socket_mock, mock_get_connection, mock_insecure_client, mock_session
):
mock_insecure_client.side_effect = [HdfsError('Error'), mock_insecure_client.return_value]
socket_mock.socket.return_value.connect_ex.return_value = 0
conn = self.webhdfs_hook.get_conn()
Expand All @@ -56,6 +58,60 @@ def test_get_conn(self, socket_mock, mock_get_connection, mock_insecure_client,
mock_insecure_client.return_value.status.assert_called_once_with('/')
assert conn == mock_insecure_client.return_value

@patch('airflow.providers.apache.hdfs.hooks.webhdfs.requests.Session', return_value="session")
@patch('airflow.providers.apache.hdfs.hooks.webhdfs.InsecureClient')
@patch(
'airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connection',
return_value=Connection(host='host_1.com,host_2.com', port=321, schema="schema", login='user'),
)
@patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket")
def test_get_conn_with_schema(self, socket_mock, mock_get_connection, mock_insecure_client, mock_session):
mock_insecure_client.side_effect = [HdfsError('Error'), mock_insecure_client.return_value]
socket_mock.socket.return_value.connect_ex.return_value = 0
conn = self.webhdfs_hook.get_conn()
connection = mock_get_connection.return_value
hosts = connection.host.split(',')
mock_insecure_client.assert_has_calls(
[
call(
f'http://{host}:{connection.port}/{connection.schema}',
user=connection.login,
session=mock_session.return_value,
)
for host in hosts
]
)
mock_insecure_client.return_value.status.assert_called_once_with('/')
assert conn == mock_insecure_client.return_value

@patch('airflow.providers.apache.hdfs.hooks.webhdfs.requests.Session', return_value="session")
@patch('airflow.providers.apache.hdfs.hooks.webhdfs.InsecureClient')
@patch(
'airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connection',
return_value=Connection(host='host_1.com,host_2.com', login='user'),
)
@patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket")
def test_get_conn_without_port_schema(
self, socket_mock, mock_get_connection, mock_insecure_client, mock_session
):
mock_insecure_client.side_effect = [HdfsError('Error'), mock_insecure_client.return_value]
socket_mock.socket.return_value.connect_ex.return_value = 0
conn = self.webhdfs_hook.get_conn()
connection = mock_get_connection.return_value
hosts = connection.host.split(',')
mock_insecure_client.assert_has_calls(
[
call(
f'http://{host}',
user=connection.login,
session=mock_session.return_value,
)
for host in hosts
]
)
mock_insecure_client.return_value.status.assert_called_once_with('/')
assert conn == mock_insecure_client.return_value

@patch('airflow.providers.apache.hdfs.hooks.webhdfs.InsecureClient', side_effect=HdfsError('Error'))
@patch(
'airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connection',
Expand Down Expand Up @@ -148,13 +204,46 @@ def test_conn_kerberos_ssl(
assert f'https://{connection.host}:{connection.port}' == mock_kerberos_client.call_args[0][0]
assert "/ssl/cert/path" == mock_kerberos_client.call_args[1]['session'].verify

@patch('airflow.providers.apache.hdfs.hooks.webhdfs.InsecureClient')
@patch(
'airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connection',
return_value=Connection(
host='host_1', port=123, schema="schema", extra={"use_ssl": "True", "verify": False}
),
)
@patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket")
def test_conn_insecure_ssl_with_port_schema(self, socket_mock, mock_get_connection, mock_insecure_client):
socket_mock.socket.return_value.connect_ex.return_value = 0
self.webhdfs_hook.get_conn()
connection = mock_get_connection.return_value

assert (
f'https://{connection.host}:{connection.port}/{connection.schema}'
== mock_insecure_client.call_args[0][0]
)
assert not mock_insecure_client.call_args[1]['session'].verify

@patch('airflow.providers.apache.hdfs.hooks.webhdfs.InsecureClient')
@patch(
'airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connection',
return_value=Connection(host='host_1', schema="schema", extra={"use_ssl": "True", "verify": False}),
)
@patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket")
def test_conn_insecure_ssl_without_port(self, socket_mock, mock_get_connection, mock_insecure_client):
socket_mock.socket.return_value.connect_ex.return_value = 0
self.webhdfs_hook.get_conn()
connection = mock_get_connection.return_value

assert f'https://{connection.host}/{connection.schema}' == mock_insecure_client.call_args[0][0]
assert not mock_insecure_client.call_args[1]['session'].verify

@patch('airflow.providers.apache.hdfs.hooks.webhdfs.InsecureClient')
@patch(
'airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connection',
return_value=Connection(host='host_1', port=123, extra={"use_ssl": "True", "verify": False}),
)
@patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket")
def test_conn_insecure_ssl(self, socket_mock, mock_get_connection, mock_insecure_client):
def test_conn_insecure_ssl_without_schema(self, socket_mock, mock_get_connection, mock_insecure_client):
socket_mock.socket.return_value.connect_ex.return_value = 0
self.webhdfs_hook.get_conn()
connection = mock_get_connection.return_value
Expand Down

0 comments on commit d8ec1ec

Please sign in to comment.