From 5941e552b29c35dec7fb32612c3429c1dd73dc85 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Wed, 29 Apr 2026 18:10:42 +0800 Subject: [PATCH] [python] Fix HDFS HA and ViewFS URI handling in PyArrowFileIO ViewFS and HDFS HA URIs carry no port, so the previous splitport(netloc) + int(port_str) path raised TypeError on int(None) before the call ever reached pafs.HadoopFileSystem. Resolve (host, port) up-front: * viewfs:// (with or without netloc) -> host='default', port=0 so libhdfs reads fs.defaultFS and the ViewFS mount table. * hdfs:// without an explicit port (HA nameservice or no netloc) -> host='default', port=0 to delegate to fs.defaultFS. * hdfs://host:port -> connect directly with the given host/port. Add HdfsFileIOTest covering all three branches plus the existing HADOOP_HOME / HADOOP_CONF_DIR guard checks. --- .../pypaimon/filesystem/pyarrow_file_io.py | 16 +++- paimon-python/pypaimon/tests/file_io_test.py | 73 +++++++++++++++++++ 2 files changed, 87 insertions(+), 2 deletions(-) diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py index d6af3ca543b3..9488809628eb 100644 --- a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py +++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py @@ -267,8 +267,20 @@ def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem: "security.kerberos.login.principal and security.kerberos.login.keytab " "must be both set or both unset") - host, port_str = splitport(netloc) - port = int(port_str) if port_str else 0 + # Resolve (host, port) for pafs.HadoopFileSystem. + # - ViewFS URIs delegate to fs.defaultFS (host='default') so libhdfs + # resolves the mount table from core-site.xml. + # - HDFS HA URIs carry a nameservice without a port; also delegate to + # fs.defaultFS to avoid int(None) on the missing port. + # - Explicit "host:port" URIs connect directly. + if scheme == 'viewfs' or not netloc: + host, port = 'default', 0 + else: + parsed_host, port_str = splitport(netloc) + if port_str is None: + host, port = 'default', 0 + else: + host, port = parsed_host, int(port_str) kerb_ticket = None if principal and keytab: diff --git a/paimon-python/pypaimon/tests/file_io_test.py b/paimon-python/pypaimon/tests/file_io_test.py index 86c4c28f82aa..57e0bd3d4dd4 100644 --- a/paimon-python/pypaimon/tests/file_io_test.py +++ b/paimon-python/pypaimon/tests/file_io_test.py @@ -489,5 +489,78 @@ def test_path_on_windows(self): self.assertNotIn("\\", call[0][0], f"backslash in path: {call[0][0]}") +class HdfsFileIOTest(unittest.TestCase): + """Cases for HDFS / ViewFS URI handling in PyArrowFileIO._initialize_hdfs_fs.""" + + def _make_hdfs_env(self, env_patch): + env_patch.setdefault('HADOOP_HOME', '/opt/hadoop') + env_patch.setdefault('HADOOP_CONF_DIR', '/opt/hadoop/etc/hadoop') + env_patch.setdefault('CLASSPATH', '') + env_patch.setdefault('LD_LIBRARY_PATH', '') + return env_patch + + def _make_file_io(self): + file_io = PyArrowFileIO.__new__(PyArrowFileIO) + file_io.properties = Options({}) + return file_io + + @patch('pypaimon.filesystem.pyarrow_file_io.subprocess.run') + @patch('pypaimon.filesystem.pyarrow_file_io.pafs.HadoopFileSystem') + def test_viewfs_uses_default_host(self, mock_hadoop_fs, mock_run): + mock_run.return_value = MagicMock(stdout='/opt/hadoop/share/hadoop/common/*') + mock_hadoop_fs.return_value = MagicMock() + with patch.dict(os.environ, self._make_hdfs_env({}), clear=True): + self._make_file_io()._initialize_hdfs_fs('viewfs', 'clusterName') + mock_hadoop_fs.assert_called_once_with(host='default', port=0, user='hadoop') + + @patch('pypaimon.filesystem.pyarrow_file_io.subprocess.run') + @patch('pypaimon.filesystem.pyarrow_file_io.pafs.HadoopFileSystem') + def test_viewfs_without_netloc_uses_default_host(self, mock_hadoop_fs, mock_run): + mock_run.return_value = MagicMock(stdout='/opt/hadoop/share/hadoop/common/*') + mock_hadoop_fs.return_value = MagicMock() + with patch.dict(os.environ, self._make_hdfs_env({}), clear=True): + self._make_file_io()._initialize_hdfs_fs('viewfs', '') + mock_hadoop_fs.assert_called_once_with(host='default', port=0, user='hadoop') + + @patch('pypaimon.filesystem.pyarrow_file_io.subprocess.run') + @patch('pypaimon.filesystem.pyarrow_file_io.pafs.HadoopFileSystem') + def test_hdfs_with_port_uses_explicit_host(self, mock_hadoop_fs, mock_run): + mock_run.return_value = MagicMock(stdout='/opt/hadoop/share/hadoop/common/*') + mock_hadoop_fs.return_value = MagicMock() + with patch.dict(os.environ, self._make_hdfs_env({}), clear=True): + self._make_file_io()._initialize_hdfs_fs('hdfs', 'namenode:8020') + mock_hadoop_fs.assert_called_once_with(host='namenode', port=8020, user='hadoop') + + @patch('pypaimon.filesystem.pyarrow_file_io.subprocess.run') + @patch('pypaimon.filesystem.pyarrow_file_io.pafs.HadoopFileSystem') + def test_hdfs_ha_nameservice_without_port_uses_default_host(self, mock_hadoop_fs, mock_run): + mock_run.return_value = MagicMock(stdout='/opt/hadoop/share/hadoop/common/*') + mock_hadoop_fs.return_value = MagicMock() + with patch.dict(os.environ, self._make_hdfs_env({}), clear=True): + self._make_file_io()._initialize_hdfs_fs('hdfs', 'nameservice1') + mock_hadoop_fs.assert_called_once_with(host='default', port=0, user='hadoop') + + @patch('pypaimon.filesystem.pyarrow_file_io.subprocess.run') + @patch('pypaimon.filesystem.pyarrow_file_io.pafs.HadoopFileSystem') + def test_hdfs_without_netloc_uses_default_host(self, mock_hadoop_fs, mock_run): + mock_run.return_value = MagicMock(stdout='/opt/hadoop/share/hadoop/common/*') + mock_hadoop_fs.return_value = MagicMock() + with patch.dict(os.environ, self._make_hdfs_env({}), clear=True): + self._make_file_io()._initialize_hdfs_fs('hdfs', '') + mock_hadoop_fs.assert_called_once_with(host='default', port=0, user='hadoop') + + def test_hdfs_missing_hadoop_home_raises(self): + with patch.dict(os.environ, {'HADOOP_CONF_DIR': '/opt/hadoop/etc/hadoop'}, clear=True): + with self.assertRaises(RuntimeError) as ctx: + self._make_file_io()._initialize_hdfs_fs('hdfs', 'namenode:8020') + self.assertIn('HADOOP_HOME', str(ctx.exception)) + + def test_hdfs_missing_hadoop_conf_dir_raises(self): + with patch.dict(os.environ, {'HADOOP_HOME': '/opt/hadoop'}, clear=True): + with self.assertRaises(RuntimeError) as ctx: + self._make_file_io()._initialize_hdfs_fs('hdfs', 'namenode:8020') + self.assertIn('HADOOP_CONF_DIR', str(ctx.exception)) + + if __name__ == '__main__': unittest.main()