Skip to content

Commit

Permalink
fix get_connections deprecation warning in webhdfs hook (#18331)
Browse files Browse the repository at this point in the history
  • Loading branch information
Aakcht committed Sep 18, 2021
1 parent 4cd190c commit 2b62a75
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 48 deletions.
35 changes: 16 additions & 19 deletions airflow/providers/apache/hdfs/hooks/webhdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,26 +70,23 @@ def get_conn(self) -> Any:
return connection

def _find_valid_server(self) -> Any:
connections = self.get_connections(self.webhdfs_conn_id)
for connection in connections:
host_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.log.info("Trying to connect to %s:%s", connection.host, connection.port)
try:
conn_check = host_socket.connect_ex((connection.host, connection.port))
if conn_check == 0:
self.log.info('Trying namenode %s', connection.host)
client = self._get_client(connection)
client.status('/')
self.log.info('Using namenode %s for hook', connection.host)
host_socket.close()
return client
else:
self.log.error("Could not connect to %s:%s", connection.host, connection.port)
connection = self.get_connection(self.webhdfs_conn_id)
host_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.log.info("Trying to connect to %s:%s", connection.host, connection.port)
try:
conn_check = host_socket.connect_ex((connection.host, connection.port))
if conn_check == 0:
self.log.info('Trying namenode %s', connection.host)
client = self._get_client(connection)
client.status('/')
self.log.info('Using namenode %s for hook', connection.host)
host_socket.close()
except HdfsError as hdfs_error:
self.log.error(
'Read operation on namenode %s failed with error: %s', connection.host, hdfs_error
)
return client
else:
self.log.error("Could not connect to %s:%s", connection.host, connection.port)
host_socket.close()
except HdfsError as hdfs_error:
self.log.error('Read operation on namenode %s failed with error: %s', connection.host, hdfs_error)
return None

def _get_client(self, connection: Connection) -> Any:
Expand Down
59 changes: 30 additions & 29 deletions tests/providers/apache/hdfs/hooks/test_webhdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# under the License.

import unittest
from unittest.mock import call, patch
from unittest.mock import patch

import pytest
from hdfs import HdfsError
Expand All @@ -33,48 +33,49 @@ def setUp(self):
@patch('airflow.providers.apache.hdfs.hooks.webhdfs.requests.Session', return_value="session")
@patch('airflow.providers.apache.hdfs.hooks.webhdfs.InsecureClient')
@patch(
'airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connections',
return_value=[Connection(host='host_1', port=123), Connection(host='host_2', port=321, login='user')],
'airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connection',
return_value=Connection(host='host_2', port=321, login='user'),
)
@patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket")
def test_get_conn(self, socket_mock, mock_get_connections, mock_insecure_client, mock_session):
mock_insecure_client.side_effect = [HdfsError('Error'), mock_insecure_client.return_value]
def test_get_conn(self, socket_mock, mock_get_connection, mock_insecure_client, mock_session):
socket_mock.socket.return_value.connect_ex.return_value = 0
conn = self.webhdfs_hook.get_conn()

mock_insecure_client.assert_has_calls(
[
call(
f'http://{connection.host}:{connection.port}',
user=connection.login,
session=mock_session.return_value,
)
for connection in mock_get_connections.return_value
]
connection = mock_get_connection.return_value
mock_insecure_client.assert_called_once_with(
f'http://{connection.host}:{connection.port}',
user=connection.login,
session=mock_session.return_value,
)
mock_insecure_client.return_value.status.assert_called_once_with('/')
assert conn == mock_insecure_client.return_value

@patch('airflow.providers.apache.hdfs.hooks.webhdfs.InsecureClient', side_effect=HdfsError('Error'))
@patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket")
def test_get_conn_hdfs_error(self, socket_mock, mock_insecure_client):
socket_mock.socket.return_value.connect_ex.return_value = 0
with pytest.raises(AirflowWebHDFSHookException):
self.webhdfs_hook.get_conn()

@patch('airflow.providers.apache.hdfs.hooks.webhdfs.requests.Session', return_value="session")
@patch('airflow.providers.apache.hdfs.hooks.webhdfs.KerberosClient', create=True)
@patch(
'airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connections',
return_value=[Connection(host='host_1', port=123)],
'airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connection',
return_value=Connection(host='host_1', port=123),
)
@patch('airflow.providers.apache.hdfs.hooks.webhdfs._kerberos_security_mode', return_value=True)
@patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket")
def test_get_conn_kerberos_security_mode(
self,
socket_mock,
mock_kerberos_security_mode,
mock_get_connections,
mock_get_connection,
mock_kerberos_client,
mock_session,
):
socket_mock.socket.return_value.connect_ex.return_value = 0
conn = self.webhdfs_hook.get_conn()

connection = mock_get_connections.return_value[0]
connection = mock_get_connection.return_value
mock_kerberos_client.assert_called_once_with(
f'http://{connection.host}:{connection.port}', session=mock_session.return_value
)
Expand Down Expand Up @@ -119,33 +120,33 @@ def test_init_proxy_user(self):

@patch('airflow.providers.apache.hdfs.hooks.webhdfs.KerberosClient', create=True)
@patch(
'airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connections',
return_value=[
Connection(host='host_1', port=123, extra={"use_ssl": "True", "verify": "/ssl/cert/path"})
],
'airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connection',
return_value=Connection(
host='host_1', port=123, extra={"use_ssl": "True", "verify": "/ssl/cert/path"}
),
)
@patch('airflow.providers.apache.hdfs.hooks.webhdfs._kerberos_security_mode', return_value=True)
@patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket")
def test_conn_kerberos_ssl(
self, socket_mock, mock_kerberos_security_mode, mock_get_connections, mock_kerberos_client
self, socket_mock, mock_kerberos_security_mode, mock_get_connection, mock_kerberos_client
):
socket_mock.socket.return_value.connect_ex.return_value = 0
self.webhdfs_hook.get_conn()
connection = mock_get_connections.return_value[0]
connection = mock_get_connection.return_value

assert f'https://{connection.host}:{connection.port}' == mock_kerberos_client.call_args[0][0]
assert "/ssl/cert/path" == mock_kerberos_client.call_args[1]['session'].verify

@patch('airflow.providers.apache.hdfs.hooks.webhdfs.InsecureClient')
@patch(
'airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connections',
return_value=[Connection(host='host_1', port=123, extra={"use_ssl": "True", "verify": False})],
'airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connection',
return_value=Connection(host='host_1', port=123, extra={"use_ssl": "True", "verify": False}),
)
@patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket")
def test_conn_insecure_ssl(self, socket_mock, mock_get_connections, mock_insecure_client):
def test_conn_insecure_ssl(self, socket_mock, mock_get_connection, mock_insecure_client):
socket_mock.socket.return_value.connect_ex.return_value = 0
self.webhdfs_hook.get_conn()
connection = mock_get_connections.return_value[0]
connection = mock_get_connection.return_value

assert f'https://{connection.host}:{connection.port}' == mock_insecure_client.call_args[0][0]
assert not mock_insecure_client.call_args[1]['session'].verify

0 comments on commit 2b62a75

Please sign in to comment.