From 2b62a75a34d44ac7d9ed83c02421ff4867875577 Mon Sep 17 00:00:00 2001 From: Aakcht Date: Sat, 18 Sep 2021 21:47:00 +0300 Subject: [PATCH] fix get_connections deprecation warning in webhdfs hook (#18331) --- .../providers/apache/hdfs/hooks/webhdfs.py | 35 +++++------ .../apache/hdfs/hooks/test_webhdfs.py | 59 ++++++++++--------- 2 files changed, 46 insertions(+), 48 deletions(-) diff --git a/airflow/providers/apache/hdfs/hooks/webhdfs.py b/airflow/providers/apache/hdfs/hooks/webhdfs.py index 002e4f024c8b1..6659ad61b4c92 100644 --- a/airflow/providers/apache/hdfs/hooks/webhdfs.py +++ b/airflow/providers/apache/hdfs/hooks/webhdfs.py @@ -70,26 +70,23 @@ def get_conn(self) -> Any: return connection def _find_valid_server(self) -> Any: - connections = self.get_connections(self.webhdfs_conn_id) - for connection in connections: - host_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.log.info("Trying to connect to %s:%s", connection.host, connection.port) - try: - conn_check = host_socket.connect_ex((connection.host, connection.port)) - if conn_check == 0: - self.log.info('Trying namenode %s', connection.host) - client = self._get_client(connection) - client.status('/') - self.log.info('Using namenode %s for hook', connection.host) - host_socket.close() - return client - else: - self.log.error("Could not connect to %s:%s", connection.host, connection.port) + connection = self.get_connection(self.webhdfs_conn_id) + host_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.log.info("Trying to connect to %s:%s", connection.host, connection.port) + try: + conn_check = host_socket.connect_ex((connection.host, connection.port)) + if conn_check == 0: + self.log.info('Trying namenode %s', connection.host) + client = self._get_client(connection) + client.status('/') + self.log.info('Using namenode %s for hook', connection.host) host_socket.close() - except HdfsError as hdfs_error: - self.log.error( - 'Read operation on namenode %s failed with error: %s', connection.host, hdfs_error - ) + return client + else: + self.log.error("Could not connect to %s:%s", connection.host, connection.port) + host_socket.close() + except HdfsError as hdfs_error: + self.log.error('Read operation on namenode %s failed with error: %s', connection.host, hdfs_error) return None def _get_client(self, connection: Connection) -> Any: diff --git a/tests/providers/apache/hdfs/hooks/test_webhdfs.py b/tests/providers/apache/hdfs/hooks/test_webhdfs.py index 1ded92e5003a7..9288f5a47f9b1 100644 --- a/tests/providers/apache/hdfs/hooks/test_webhdfs.py +++ b/tests/providers/apache/hdfs/hooks/test_webhdfs.py @@ -17,7 +17,7 @@ # under the License. import unittest -from unittest.mock import call, patch +from unittest.mock import patch import pytest from hdfs import HdfsError @@ -33,33 +33,34 @@ def setUp(self): @patch('airflow.providers.apache.hdfs.hooks.webhdfs.requests.Session', return_value="session") @patch('airflow.providers.apache.hdfs.hooks.webhdfs.InsecureClient') @patch( - 'airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connections', - return_value=[Connection(host='host_1', port=123), Connection(host='host_2', port=321, login='user')], + 'airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connection', + return_value=Connection(host='host_2', port=321, login='user'), ) @patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket") - def test_get_conn(self, socket_mock, mock_get_connections, mock_insecure_client, mock_session): - mock_insecure_client.side_effect = [HdfsError('Error'), mock_insecure_client.return_value] + def test_get_conn(self, socket_mock, mock_get_connection, mock_insecure_client, mock_session): socket_mock.socket.return_value.connect_ex.return_value = 0 conn = self.webhdfs_hook.get_conn() - - mock_insecure_client.assert_has_calls( - [ - call( - f'http://{connection.host}:{connection.port}', - user=connection.login, - session=mock_session.return_value, - ) - for connection in mock_get_connections.return_value - ] + connection = mock_get_connection.return_value + mock_insecure_client.assert_called_once_with( + f'http://{connection.host}:{connection.port}', + user=connection.login, + session=mock_session.return_value, ) mock_insecure_client.return_value.status.assert_called_once_with('/') assert conn == mock_insecure_client.return_value + @patch('airflow.providers.apache.hdfs.hooks.webhdfs.InsecureClient', side_effect=HdfsError('Error')) + @patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket") + def test_get_conn_hdfs_error(self, socket_mock, mock_insecure_client): + socket_mock.socket.return_value.connect_ex.return_value = 0 + with pytest.raises(AirflowWebHDFSHookException): + self.webhdfs_hook.get_conn() + @patch('airflow.providers.apache.hdfs.hooks.webhdfs.requests.Session', return_value="session") @patch('airflow.providers.apache.hdfs.hooks.webhdfs.KerberosClient', create=True) @patch( - 'airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connections', - return_value=[Connection(host='host_1', port=123)], + 'airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connection', + return_value=Connection(host='host_1', port=123), ) @patch('airflow.providers.apache.hdfs.hooks.webhdfs._kerberos_security_mode', return_value=True) @patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket") @@ -67,14 +68,14 @@ def test_get_conn_kerberos_security_mode( self, socket_mock, mock_kerberos_security_mode, - mock_get_connections, + mock_get_connection, mock_kerberos_client, mock_session, ): socket_mock.socket.return_value.connect_ex.return_value = 0 conn = self.webhdfs_hook.get_conn() - connection = mock_get_connections.return_value[0] + connection = mock_get_connection.return_value mock_kerberos_client.assert_called_once_with( f'http://{connection.host}:{connection.port}', session=mock_session.return_value ) @@ -119,33 +120,33 @@ def test_init_proxy_user(self): @patch('airflow.providers.apache.hdfs.hooks.webhdfs.KerberosClient', create=True) @patch( - 'airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connections', - return_value=[ - Connection(host='host_1', port=123, extra={"use_ssl": "True", "verify": "/ssl/cert/path"}) - ], + 'airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connection', + return_value=Connection( + host='host_1', port=123, extra={"use_ssl": "True", "verify": "/ssl/cert/path"} + ), ) @patch('airflow.providers.apache.hdfs.hooks.webhdfs._kerberos_security_mode', return_value=True) @patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket") def test_conn_kerberos_ssl( - self, socket_mock, mock_kerberos_security_mode, mock_get_connections, mock_kerberos_client + self, socket_mock, mock_kerberos_security_mode, mock_get_connection, mock_kerberos_client ): socket_mock.socket.return_value.connect_ex.return_value = 0 self.webhdfs_hook.get_conn() - connection = mock_get_connections.return_value[0] + connection = mock_get_connection.return_value assert f'https://{connection.host}:{connection.port}' == mock_kerberos_client.call_args[0][0] assert "/ssl/cert/path" == mock_kerberos_client.call_args[1]['session'].verify @patch('airflow.providers.apache.hdfs.hooks.webhdfs.InsecureClient') @patch( - 'airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connections', - return_value=[Connection(host='host_1', port=123, extra={"use_ssl": "True", "verify": False})], + 'airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connection', + return_value=Connection(host='host_1', port=123, extra={"use_ssl": "True", "verify": False}), ) @patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket") - def test_conn_insecure_ssl(self, socket_mock, mock_get_connections, mock_insecure_client): + def test_conn_insecure_ssl(self, socket_mock, mock_get_connection, mock_insecure_client): socket_mock.socket.return_value.connect_ex.return_value = 0 self.webhdfs_hook.get_conn() - connection = mock_get_connections.return_value[0] + connection = mock_get_connection.return_value assert f'https://{connection.host}:{connection.port}' == mock_insecure_client.call_args[0][0] assert not mock_insecure_client.call_args[1]['session'].verify