From d175395bd5f61d7677d4755879981165b5c887f8 Mon Sep 17 00:00:00 2001 From: Miles Granger Date: Tue, 4 Oct 2022 08:43:30 +0200 Subject: [PATCH 1/5] Add FileFragment.open/metadata methods --- python/pyarrow/_dataset.pyx | 33 ++++++++++++++++++++++++++++ python/pyarrow/tests/test_dataset.py | 10 +++++++++ 2 files changed, 43 insertions(+) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 57029b8da5cae..3c45398fb951f 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -1053,6 +1053,39 @@ cdef class FileFragment(Fragment): self.partition_expression ) + def open(self): + """ + Open a NativeFile of the buffer or file viewed by this fragment. + """ + cdef: + shared_ptr[CFileSystem] c_filesystem + shared_ptr[CRandomAccessFile] opened + c_string c_path + NativeFile out = NativeFile() + + if self.buffer is not None: + return pa.io.BufferReader(self.buffer) + + c_path = tobytes(self.file_fragment.source().path()) + with nogil: + c_filesystem = self.file_fragment.source().filesystem() + opened = GetResultValue(c_filesystem.get().OpenInputFile(c_path)) + + out.set_random_access_file(opened) + out.is_readable = True + return out + + @property + def metadata(self): + """ + Get the FileMetaData of this fragment. + """ + from pyarrow._parquet import ParquetReader + reader = ParquetReader() + with self.open() as nf: + reader.open(nf) + return reader.metadata + @property def path(self): """ diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index ff1ac7e106547..b59e5b2f46e0d 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -422,6 +422,16 @@ def test_dataset(dataset, dataset_reader): assert result['new'] == [False, False, True, True, False, False, False, False, True, True] + # FileFragment convenience methods + for fragment in dataset.get_fragments(): + assert isinstance(fragment.metadata, pq.FileMetaData) + with fragment.open() as nf: + assert isinstance(nf, pa.NativeFile) + assert not nf.closed + assert nf.seekable() + assert nf.readable() + assert not nf.writable() + @pytest.mark.parquet def test_scanner_options(dataset): From 76cf7cff0a2d4be524b6a35b449efb8b5e0becec Mon Sep 17 00:00:00 2001 From: Miles Granger Date: Tue, 4 Oct 2022 11:20:35 +0200 Subject: [PATCH 2/5] Remove FileFragment.metadata, ParquetFileFragment specific --- python/pyarrow/_dataset.pyx | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 3c45398fb951f..6d568127bb5f3 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -1075,17 +1075,6 @@ cdef class FileFragment(Fragment): out.is_readable = True return out - @property - def metadata(self): - """ - Get the FileMetaData of this fragment. - """ - from pyarrow._parquet import ParquetReader - reader = ParquetReader() - with self.open() as nf: - reader.open(nf) - return reader.metadata - @property def path(self): """ From a46062b81a347798390cfb9fd0e2a55045aad338 Mon Sep 17 00:00:00 2001 From: Miles Granger Date: Tue, 4 Oct 2022 12:01:13 +0200 Subject: [PATCH 3/5] Fragment method testing in other dataset formats --- python/pyarrow/tests/test_dataset.py | 32 +++++++++++++++++++--------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index b59e5b2f46e0d..e1df70b6e3362 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -86,6 +86,19 @@ def _table_from_pandas(df): return table.replace_schema_metadata() +def assert_dataset_fragment_convenience_methods(dataset): + # FileFragment convenience methods + for fragment in dataset.get_fragments(): + if pq is not None and isinstance(dataset.format, ds.ParquetFileFormat): + assert isinstance(fragment.metadata, pq.FileMetaData) + with fragment.open() as nf: + assert isinstance(nf, pa.NativeFile) + assert not nf.closed + assert nf.seekable() + assert nf.readable() + assert not nf.writable() + + @pytest.fixture @pytest.mark.parquet def mockfs(): @@ -421,16 +434,7 @@ def test_dataset(dataset, dataset_reader): 2.0, 2.0, 3.0, 3.0, 4.0, 4.0] assert result['new'] == [False, False, True, True, False, False, False, False, True, True] - - # FileFragment convenience methods - for fragment in dataset.get_fragments(): - assert isinstance(fragment.metadata, pq.FileMetaData) - with fragment.open() as nf: - assert isinstance(nf, pa.NativeFile) - assert not nf.closed - assert nf.seekable() - assert nf.readable() - assert not nf.writable() + assert_dataset_fragment_convenience_methods(dataset) @pytest.mark.parquet @@ -2965,6 +2969,8 @@ def test_ipc_format(tempdir, dataset_reader): result = dataset_reader.to_table(dataset) assert result.equals(table) + assert_dataset_fragment_convenience_methods(dataset) + for format_str in ["ipc", "arrow"]: dataset = ds.dataset(path, format=format_str) result = dataset_reader.to_table(dataset) @@ -2987,6 +2993,8 @@ def test_orc_format(tempdir, dataset_reader): result.validate(full=True) assert result.equals(table) + assert_dataset_fragment_convenience_methods(dataset) + dataset = ds.dataset(path, format="orc") result = dataset_reader.to_table(dataset) result.validate(full=True) @@ -3054,6 +3062,8 @@ def test_csv_format(tempdir, dataset_reader): result = dataset_reader.to_table(dataset) assert result.equals(table) + assert_dataset_fragment_convenience_methods(dataset) + dataset = ds.dataset(path, format='csv') result = dataset_reader.to_table(dataset) assert result.equals(table) @@ -3210,6 +3220,8 @@ def test_feather_format(tempdir, dataset_reader): result = dataset_reader.to_table(dataset) assert result.equals(table) + assert_dataset_fragment_convenience_methods(dataset) + dataset = ds.dataset(basedir, format="feather") result = dataset_reader.to_table(dataset) assert result.equals(table) From 2cbcfffeac6b17b97ddfe749b9904b4afa60ae4e Mon Sep 17 00:00:00 2001 From: Miles Granger Date: Tue, 4 Oct 2022 12:09:20 +0200 Subject: [PATCH 4/5] Fix and ensure open()->BufferReader is tested --- python/pyarrow/_dataset.pyx | 2 +- python/pyarrow/tests/test_dataset.py | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 6d568127bb5f3..b07d5ec3eb905 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -1064,7 +1064,7 @@ cdef class FileFragment(Fragment): NativeFile out = NativeFile() if self.buffer is not None: - return pa.io.BufferReader(self.buffer) + return pa.BufferReader(self.buffer) c_path = tobytes(self.file_fragment.source().path()) with nogil: diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index e1df70b6e3362..15e14de304bee 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -965,6 +965,9 @@ def test_make_csv_fragment_from_buffer(dataset_reader): csv_format = ds.CsvFileFormat() fragment = csv_format.make_fragment(buffer) + # When buffer, fragment open returns a BufferReader, not NativeFile + assert isinstance(fragment.open(), pa.BufferReader) + expected = pa.table([['a', 'b', 'c'], [12, 11, 10], ['dog', 'cat', 'rabbit']], From 7ad4109b93a87f7181e69220b24ec518cac3882b Mon Sep 17 00:00:00 2001 From: Miles Granger Date: Wed, 5 Oct 2022 13:23:18 +0200 Subject: [PATCH 5/5] Move parquet metadata specific check --- python/pyarrow/tests/test_dataset.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 15e14de304bee..9393a87ccb611 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -89,8 +89,6 @@ def _table_from_pandas(df): def assert_dataset_fragment_convenience_methods(dataset): # FileFragment convenience methods for fragment in dataset.get_fragments(): - if pq is not None and isinstance(dataset.format, ds.ParquetFileFormat): - assert isinstance(fragment.metadata, pq.FileMetaData) with fragment.open() as nf: assert isinstance(nf, pa.NativeFile) assert not nf.closed @@ -1237,6 +1235,8 @@ def test_fragments_parquet_ensure_metadata(tempdir, open_logging_fs): with assert_opens([]): fragment.ensure_complete_metadata() + assert isinstance(fragment.metadata, pq.FileMetaData) + # recreate fragment with row group ids new_fragment = fragment.format.make_fragment( fragment.path, fragment.filesystem, row_groups=[0, 1]