diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml index 0f7967487..1722b2388 100644 --- a/.github/workflows/main.yaml +++ b/.github/workflows/main.yaml @@ -145,5 +145,5 @@ jobs: shell: bash -l {0} run: | cd ${{ matrix.FRIEND }} - pytest -v + pytest -v -W ignore::pytest.PytestRemovedIn9Warning cd .. diff --git a/fsspec/caching.py b/fsspec/caching.py index de6a4e340..ba37cbd62 100644 --- a/fsspec/caching.py +++ b/fsspec/caching.py @@ -6,7 +6,6 @@ import math import os import threading -import warnings from collections import OrderedDict from concurrent.futures import Future, ThreadPoolExecutor from itertools import groupby @@ -629,7 +628,7 @@ def __init__( fetcher: Fetcher, size: int, data: dict[tuple[int, int], bytes] | None = None, - strict: bool = True, + strict: bool = False, **_: Any, ): super().__init__(blocksize, fetcher, size) @@ -653,50 +652,65 @@ def __init__( else: self.data = {} + @property + def size(self): + return sum(_[1] - _[0] for _ in self.data) + + @size.setter + def size(self, value): + pass + + @property + def nblocks(self): + return len(self.data) + + @nblocks.setter + def nblocks(self, value): + pass + def _fetch(self, start: int | None, stop: int | None) -> bytes: if start is None: start = 0 if stop is None: stop = self.size + self.total_requested_bytes += stop - start out = b"" - for (loc0, loc1), data in self.data.items(): - # If self.strict=False, use zero-padded data - # for reads beyond the end of a "known" buffer + started = False + loc_old = 0 + for loc0, loc1 in sorted(self.data): + if (loc0 <= start < loc1) and (loc0 <= stop <= loc1): + # entirely within the block + off = start - loc0 + self.hit_count += 1 + return self.data[(loc0, loc1)][off : off + stop - start] + if stop <= loc0: + break + if started and loc0 > loc_old: + # a gap where we need data + self.miss_count += 1 + if self.strict: + raise ValueError + out += b"\x00" * (loc0 - loc_old) if loc0 <= start < loc1: + # found the start + self.hit_count += 1 off = start - loc0 - out = data[off : off + stop - start] - if not self.strict or loc0 <= stop <= loc1: - # The request is within a known range, or - # it begins within a known range, and we - # are allowed to pad reads beyond the - # buffer with zero - out += b"\x00" * (stop - start - len(out)) - self.hit_count += 1 - return out - else: - # The request ends outside a known range, - # and we are being "strict" about reads - # beyond the buffer - start = loc1 - break - - # We only get here if there is a request outside the - # known parts of the file. In an ideal world, this - # should never happen - if self.fetcher is None: - # We cannot fetch the data, so raise an error - raise ValueError(f"Read is outside the known file parts: {(start, stop)}. ") - # We can fetch the data, but should warn the user - # that this may be slow - warnings.warn( - f"Read is outside the known file parts: {(start, stop)}. " - f"IO/caching performance may be poor!" - ) - logger.debug(f"KnownPartsOfAFile cache fetching {start}-{stop}") - self.total_requested_bytes += stop - start + out = self.data[(loc0, loc1)][off : off + stop - start] + started = True + elif start < loc0 and stop > loc1: + # the whole block + self.hit_count += 1 + out += self.data[(loc0, loc1)] + elif loc0 <= stop <= loc1: + # end block + self.hit_count += 1 + return out + self.data[(loc0, loc1)][: stop - loc0] + loc_old = loc1 self.miss_count += 1 - return out + super()._fetch(start, stop) + if started and not self.strict: + return out + b"\x00" * (stop - loc_old) + raise ValueError class UpdatableLRU(Generic[P, T]): diff --git a/fsspec/parquet.py b/fsspec/parquet.py index faedb7b9e..596236cb9 100644 --- a/fsspec/parquet.py +++ b/fsspec/parquet.py @@ -3,6 +3,7 @@ import warnings from .core import url_to_fs +from .spec import AbstractBufferedFile from .utils import merge_offset_ranges # Parquet-Specific Utilities for fsspec @@ -14,6 +15,11 @@ # on remote file systems. +class AlreadyBufferedFile(AbstractBufferedFile): + def _fetch_range(self, start, end): + raise NotImplementedError + + def open_parquet_file( path, mode="rb", @@ -22,11 +28,11 @@ def open_parquet_file( columns=None, row_groups=None, storage_options=None, - strict=False, engine="auto", max_gap=64_000, max_block=256_000_000, footer_sample_size=1_000_000, + filters=None, **kwargs, ): """ @@ -72,12 +78,6 @@ def open_parquet_file( storage_options : dict, optional Used to generate an `AbstractFileSystem` object if `fs` was not specified. - strict : bool, optional - Whether the resulting `KnownPartsOfAFile` cache should - fetch reads that go beyond a known byte-range boundary. - If `False` (the default), any read that ends outside a - known part will be zero padded. Note that using - `strict=True` may be useful for debugging. max_gap : int, optional Neighboring byte ranges will only be merged when their inter-range gap is <= `max_gap`. Default is 64KB. @@ -89,6 +89,10 @@ def open_parquet_file( for the footer metadata. If the sampled bytes do not contain the footer, a second read request will be required, and performance will suffer. Default is 1MB. + filters : list[list], optional + List of filters to apply to prevent reading row groups, of the + same format as accepted by the loading engines. Ignored if + ``row_groups`` is specified. **kwargs : Optional key-word arguments to pass to `fs.open` """ @@ -98,10 +102,10 @@ def open_parquet_file( if fs is None: fs = url_to_fs(path, **(storage_options or {}))[0] - # For now, `columns == []` not supported. Just use - # default `open` command with `path` input + # For now, `columns == []` not supported, is the same + # as all columns if columns is not None and len(columns) == 0: - return fs.open(path, mode=mode) + columns = None # Set the engine engine = _set_engine(engine) @@ -118,6 +122,7 @@ def open_parquet_file( max_gap=max_gap, max_block=max_block, footer_sample_size=footer_sample_size, + filters=filters, ) # Extract file name from `data` @@ -125,15 +130,16 @@ def open_parquet_file( # Call self.open with "parts" caching options = kwargs.pop("cache_options", {}).copy() - return fs.open( - fn, + return AlreadyBufferedFile( + fs=None, + path=fn, mode=mode, cache_type="parts", cache_options={ **options, "data": data.get(fn, {}), - "strict": strict, }, + size=max(_[1] for _ in data.get(fn, {})), **kwargs, ) @@ -148,6 +154,7 @@ def _get_parquet_byte_ranges( max_block=256_000_000, footer_sample_size=1_000_000, engine="auto", + filters=None, ): """Get a dictionary of the known byte ranges needed to read a specific column/row-group selection from a @@ -172,6 +179,7 @@ def _get_parquet_byte_ranges( row_groups=row_groups, max_gap=max_gap, max_block=max_block, + filters=filters, ) # Get file sizes asynchronously @@ -183,17 +191,16 @@ def _get_parquet_byte_ranges( data_starts = [] data_ends = [] add_header_magic = True - if columns is None and row_groups is None: + if columns is None and row_groups is None and filters is None: # We are NOT selecting specific columns or row-groups. # # We can avoid sampling the footers, and just transfer # all file data with cat_ranges for i, path in enumerate(paths): result[path] = {} - for b in range(0, file_sizes[i], max_block): - data_paths.append(path) - data_starts.append(b) - data_ends.append(min(b + max_block, file_sizes[i])) + data_paths.append(path) + data_starts.append(0) + data_ends.append(file_sizes[i]) add_header_magic = False # "Magic" should already be included else: # We ARE selecting specific columns or row-groups. @@ -238,14 +245,14 @@ def _get_parquet_byte_ranges( # Deal with small-file case. # Just include all remaining bytes of the file # in a single range. - if file_sizes[i] < max_block: - if footer_starts[i] > 0: - # Only need to transfer the data if the - # footer sample isn't already the whole file - data_paths.append(path) - data_starts.append(0) - data_ends.append(footer_starts[i]) - continue + # if file_sizes[i] < max_block: + # if footer_starts[i] > 0: + # # Only need to transfer the data if the + # # footer sample isn't already the whole file + # data_paths.append(path) + # data_starts.append(0) + # data_ends.append(footer_starts[i]) + # continue # Use "engine" to collect data byte ranges path_data_starts, path_data_ends = engine._parquet_byte_ranges( @@ -253,11 +260,15 @@ def _get_parquet_byte_ranges( row_groups=row_groups, footer=footer_samples[i], footer_start=footer_starts[i], + filters=filters, ) data_paths += [path] * len(path_data_starts) data_starts += path_data_starts data_ends += path_data_ends + result.setdefault(path, {})[(footer_starts[i], file_sizes[i])] = ( + footer_samples[i] + ) # Merge adjacent offset ranges data_paths, data_starts, data_ends = merge_offset_ranges( @@ -291,6 +302,7 @@ def _get_parquet_byte_ranges_from_metadata( row_groups=None, max_gap=64_000, max_block=256_000_000, + filters=None, ): """Simplified version of `_get_parquet_byte_ranges` for the case that an engine-specific `metadata` object is @@ -300,9 +312,7 @@ def _get_parquet_byte_ranges_from_metadata( # Use "engine" to collect data byte ranges data_paths, data_starts, data_ends = engine._parquet_byte_ranges( - columns, - row_groups=row_groups, - metadata=metadata, + columns, row_groups=row_groups, metadata=metadata, filters=filters ) # Merge adjacent offset ranges @@ -401,16 +411,19 @@ def _parquet_byte_ranges( metadata=None, footer=None, footer_start=None, + filters=None, ): # Initialize offset ranges and define ParqetFile metadata pf = metadata data_paths, data_starts, data_ends = [], [], [] + if filters and row_groups: + raise ValueError("filters and row_groups cannot be used together") if pf is None: pf = self.fp.ParquetFile(io.BytesIO(footer)) # Convert columns to a set and add any index columns # specified in the pandas metadata (just in case) - column_set = None if columns is None else set(columns) + column_set = None if columns is None else {c.split(".", 1)[0] for c in columns} if column_set is not None and hasattr(pf, "pandas_metadata"): md_index = [ ind @@ -422,7 +435,12 @@ def _parquet_byte_ranges( # Check if row_groups is a list of integers # or a list of row-group metadata - if row_groups and not isinstance(row_groups[0], int): + if filters: + from fastparquet.api import filter_row_groups + + row_group_indices = None + row_groups = filter_row_groups(pf, filters) + elif row_groups and not isinstance(row_groups[0], int): # Input row_groups contains row-group metadata row_group_indices = None else: @@ -486,9 +504,12 @@ def _parquet_byte_ranges( metadata=None, footer=None, footer_start=None, + filters=None, ): if metadata is not None: raise ValueError("metadata input not supported for PyarrowEngine") + if filters: + raise NotImplementedError data_starts, data_ends = [], [] md = self.pq.ParquetFile(io.BytesIO(footer)).metadata diff --git a/fsspec/tests/test_caches.py b/fsspec/tests/test_caches.py index ff9d2f97b..d9e153f73 100644 --- a/fsspec/tests/test_caches.py +++ b/fsspec/tests/test_caches.py @@ -227,22 +227,35 @@ def test_cache_basic(Cache_imp, blocksize, size_requests): @pytest.mark.parametrize("strict", [True, False]) @pytest.mark.parametrize("sort", [True, False]) -def test_known(sort, strict): - parts = {(10, 20): b"1" * 10, (20, 30): b"2" * 10, (0, 10): b"0" * 10} +def test_known(strict, sort): + parts = { + (10, 20): b"1" * 10, + (20, 30): b"2" * 10, + (0, 10): b"0" * 10, + (40, 50): b"3" * 10, + } if sort: parts = dict(sorted(parts.items())) c = caches["parts"](None, None, 100, parts, strict=strict) + assert c.size == 40 assert (0, 30) in c.data # got consolidated + assert c.nblocks == 2 + assert c._fetch(5, 15) == b"0" * 5 + b"1" * 5 assert c._fetch(15, 25) == b"1" * 5 + b"2" * 5 + assert c.hit_count + assert not c.miss_count + if strict: # Over-read will raise error with pytest.raises(ValueError): - # tries to call None fetcher c._fetch(25, 35) + with pytest.raises(ValueError): + c._fetch(25, 45) else: - # Over-read will be zero-padded assert c._fetch(25, 35) == b"2" * 5 + b"\x00" * 5 + assert c._fetch(25, 45) == b"2" * 5 + b"\x00" * 10 + b"3" * 5 + assert c.miss_count def test_background(server, monkeypatch): diff --git a/fsspec/tests/test_parquet.py b/fsspec/tests/test_parquet.py index f31dc4034..42d68e15a 100644 --- a/fsspec/tests/test_parquet.py +++ b/fsspec/tests/test_parquet.py @@ -17,23 +17,19 @@ # Define `engine` fixture FASTPARQUET_MARK = pytest.mark.skipif(not fastparquet, reason="fastparquet not found") PYARROW_MARK = pytest.mark.skipif(not pq, reason="pyarrow not found") -ANY_ENGINE_MARK = pytest.mark.skipif( - not (fastparquet or pq), - reason="No parquet engine (fastparquet or pyarrow) found", -) @pytest.fixture( params=[ pytest.param("fastparquet", marks=FASTPARQUET_MARK), pytest.param("pyarrow", marks=PYARROW_MARK), - pytest.param("auto", marks=ANY_ENGINE_MARK), ] ) def engine(request): return request.param +@pytest.mark.filterwarnings("ignore:.*Not enough data.*") @pytest.mark.parametrize("columns", [None, ["x"], ["x", "y"], ["z"]]) @pytest.mark.parametrize("max_gap", [0, 64]) @pytest.mark.parametrize("max_block", [64, 256_000_000]) @@ -44,6 +40,8 @@ def test_open_parquet_file( ): # Pandas required for this test pd = pytest.importorskip("pandas") + if columns == ["z"] and engine == "fastparquet": + columns = ["z.a"] # fastparquet is more specific # Write out a simple DataFrame path = os.path.join(str(tmpdir), "test.parquet") @@ -62,7 +60,7 @@ def test_open_parquet_file( df.to_parquet(path) # "Traditional read" (without `open_parquet_file`) - expect = pd.read_parquet(path, columns=columns) + expect = pd.read_parquet(path, columns=columns, engine=engine) # Use `_get_parquet_byte_ranges` to re-write a # place-holder file with all bytes NOT required @@ -106,7 +104,7 @@ def test_open_parquet_file( max_block=max_block, footer_sample_size=footer_sample_size, ) as f: - result = pd.read_parquet(f, columns=columns) + result = pd.read_parquet(f, columns=columns, engine=engine) # Check that `result` matches `expect` pd.testing.assert_frame_equal(expect, result) @@ -124,11 +122,20 @@ def test_open_parquet_file( max_block=max_block, footer_sample_size=footer_sample_size, ) as f: - result = pd.read_parquet(f, columns=columns) + # TODO: construct directory test + import struct + + footer = bytes(pf.fmd.to_bytes()) + footer2 = footer + struct.pack(b"