Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -145,5 +145,5 @@ jobs:
shell: bash -l {0}
run: |
cd ${{ matrix.FRIEND }}
pytest -v
pytest -v -W ignore::pytest.PytestRemovedIn9Warning
cd ..
86 changes: 50 additions & 36 deletions fsspec/caching.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import math
import os
import threading
import warnings
from collections import OrderedDict
from concurrent.futures import Future, ThreadPoolExecutor
from itertools import groupby
Expand Down Expand Up @@ -629,7 +628,7 @@ def __init__(
fetcher: Fetcher,
size: int,
data: dict[tuple[int, int], bytes] | None = None,
strict: bool = True,
strict: bool = False,
**_: Any,
):
super().__init__(blocksize, fetcher, size)
Expand All @@ -653,50 +652,65 @@ def __init__(
else:
self.data = {}

@property
def size(self):
return sum(_[1] - _[0] for _ in self.data)

@size.setter
def size(self, value):
pass

@property
def nblocks(self):
return len(self.data)

@nblocks.setter
def nblocks(self, value):
pass

def _fetch(self, start: int | None, stop: int | None) -> bytes:
if start is None:
start = 0
if stop is None:
stop = self.size
self.total_requested_bytes += stop - start

out = b""
for (loc0, loc1), data in self.data.items():
# If self.strict=False, use zero-padded data
# for reads beyond the end of a "known" buffer
started = False
loc_old = 0
for loc0, loc1 in sorted(self.data):
if (loc0 <= start < loc1) and (loc0 <= stop <= loc1):
# entirely within the block
off = start - loc0
self.hit_count += 1
return self.data[(loc0, loc1)][off : off + stop - start]
if stop <= loc0:
break
if started and loc0 > loc_old:
# a gap where we need data
self.miss_count += 1
if self.strict:
raise ValueError
out += b"\x00" * (loc0 - loc_old)
if loc0 <= start < loc1:
# found the start
self.hit_count += 1
off = start - loc0
out = data[off : off + stop - start]
if not self.strict or loc0 <= stop <= loc1:
# The request is within a known range, or
# it begins within a known range, and we
# are allowed to pad reads beyond the
# buffer with zero
out += b"\x00" * (stop - start - len(out))
self.hit_count += 1
return out
else:
# The request ends outside a known range,
# and we are being "strict" about reads
# beyond the buffer
start = loc1
break

# We only get here if there is a request outside the
# known parts of the file. In an ideal world, this
# should never happen
if self.fetcher is None:
# We cannot fetch the data, so raise an error
raise ValueError(f"Read is outside the known file parts: {(start, stop)}. ")
# We can fetch the data, but should warn the user
# that this may be slow
warnings.warn(
f"Read is outside the known file parts: {(start, stop)}. "
f"IO/caching performance may be poor!"
)
logger.debug(f"KnownPartsOfAFile cache fetching {start}-{stop}")
self.total_requested_bytes += stop - start
out = self.data[(loc0, loc1)][off : off + stop - start]
started = True
elif start < loc0 and stop > loc1:
# the whole block
self.hit_count += 1
out += self.data[(loc0, loc1)]
elif loc0 <= stop <= loc1:
# end block
self.hit_count += 1
return out + self.data[(loc0, loc1)][: stop - loc0]
loc_old = loc1
self.miss_count += 1
return out + super()._fetch(start, stop)
if started and not self.strict:
return out + b"\x00" * (stop - loc_old)
raise ValueError


class UpdatableLRU(Generic[P, T]):
Expand Down
83 changes: 52 additions & 31 deletions fsspec/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import warnings

from .core import url_to_fs
from .spec import AbstractBufferedFile
from .utils import merge_offset_ranges

# Parquet-Specific Utilities for fsspec
Expand All @@ -14,6 +15,11 @@
# on remote file systems.


class AlreadyBufferedFile(AbstractBufferedFile):
def _fetch_range(self, start, end):
raise NotImplementedError


def open_parquet_file(
path,
mode="rb",
Expand All @@ -22,11 +28,11 @@ def open_parquet_file(
columns=None,
row_groups=None,
storage_options=None,
strict=False,
engine="auto",
max_gap=64_000,
max_block=256_000_000,
footer_sample_size=1_000_000,
filters=None,
**kwargs,
):
"""
Expand Down Expand Up @@ -72,12 +78,6 @@ def open_parquet_file(
storage_options : dict, optional
Used to generate an `AbstractFileSystem` object if `fs` was
not specified.
strict : bool, optional
Whether the resulting `KnownPartsOfAFile` cache should
fetch reads that go beyond a known byte-range boundary.
If `False` (the default), any read that ends outside a
known part will be zero padded. Note that using
`strict=True` may be useful for debugging.
max_gap : int, optional
Neighboring byte ranges will only be merged when their
inter-range gap is <= `max_gap`. Default is 64KB.
Expand All @@ -89,6 +89,10 @@ def open_parquet_file(
for the footer metadata. If the sampled bytes do not contain
the footer, a second read request will be required, and
performance will suffer. Default is 1MB.
filters : list[list], optional
List of filters to apply to prevent reading row groups, of the
same format as accepted by the loading engines. Ignored if
``row_groups`` is specified.
**kwargs :
Optional key-word arguments to pass to `fs.open`
"""
Expand All @@ -98,10 +102,10 @@ def open_parquet_file(
if fs is None:
fs = url_to_fs(path, **(storage_options or {}))[0]

# For now, `columns == []` not supported. Just use
# default `open` command with `path` input
# For now, `columns == []` not supported, is the same
# as all columns
if columns is not None and len(columns) == 0:
return fs.open(path, mode=mode)
columns = None

# Set the engine
engine = _set_engine(engine)
Expand All @@ -118,22 +122,24 @@ def open_parquet_file(
max_gap=max_gap,
max_block=max_block,
footer_sample_size=footer_sample_size,
filters=filters,
)

# Extract file name from `data`
fn = next(iter(data)) if data else path

# Call self.open with "parts" caching
options = kwargs.pop("cache_options", {}).copy()
return fs.open(
fn,
return AlreadyBufferedFile(
fs=None,
path=fn,
mode=mode,
cache_type="parts",
cache_options={
**options,
"data": data.get(fn, {}),
"strict": strict,
},
size=max(_[1] for _ in data.get(fn, {})),
**kwargs,
)

Expand All @@ -148,6 +154,7 @@ def _get_parquet_byte_ranges(
max_block=256_000_000,
footer_sample_size=1_000_000,
engine="auto",
filters=None,
):
"""Get a dictionary of the known byte ranges needed
to read a specific column/row-group selection from a
Expand All @@ -172,6 +179,7 @@ def _get_parquet_byte_ranges(
row_groups=row_groups,
max_gap=max_gap,
max_block=max_block,
filters=filters,
)

# Get file sizes asynchronously
Expand All @@ -183,17 +191,16 @@ def _get_parquet_byte_ranges(
data_starts = []
data_ends = []
add_header_magic = True
if columns is None and row_groups is None:
if columns is None and row_groups is None and filters is None:
# We are NOT selecting specific columns or row-groups.
#
# We can avoid sampling the footers, and just transfer
# all file data with cat_ranges
for i, path in enumerate(paths):
result[path] = {}
for b in range(0, file_sizes[i], max_block):
data_paths.append(path)
data_starts.append(b)
data_ends.append(min(b + max_block, file_sizes[i]))
data_paths.append(path)
data_starts.append(0)
data_ends.append(file_sizes[i])
add_header_magic = False # "Magic" should already be included
else:
# We ARE selecting specific columns or row-groups.
Expand Down Expand Up @@ -238,26 +245,30 @@ def _get_parquet_byte_ranges(
# Deal with small-file case.
# Just include all remaining bytes of the file
# in a single range.
if file_sizes[i] < max_block:
if footer_starts[i] > 0:
# Only need to transfer the data if the
# footer sample isn't already the whole file
data_paths.append(path)
data_starts.append(0)
data_ends.append(footer_starts[i])
continue
# if file_sizes[i] < max_block:
# if footer_starts[i] > 0:
# # Only need to transfer the data if the
# # footer sample isn't already the whole file
# data_paths.append(path)
# data_starts.append(0)
# data_ends.append(footer_starts[i])
# continue

# Use "engine" to collect data byte ranges
path_data_starts, path_data_ends = engine._parquet_byte_ranges(
columns,
row_groups=row_groups,
footer=footer_samples[i],
footer_start=footer_starts[i],
filters=filters,
)

data_paths += [path] * len(path_data_starts)
data_starts += path_data_starts
data_ends += path_data_ends
result.setdefault(path, {})[(footer_starts[i], file_sizes[i])] = (
footer_samples[i]
)

# Merge adjacent offset ranges
data_paths, data_starts, data_ends = merge_offset_ranges(
Expand Down Expand Up @@ -291,6 +302,7 @@ def _get_parquet_byte_ranges_from_metadata(
row_groups=None,
max_gap=64_000,
max_block=256_000_000,
filters=None,
):
"""Simplified version of `_get_parquet_byte_ranges` for
the case that an engine-specific `metadata` object is
Expand All @@ -300,9 +312,7 @@ def _get_parquet_byte_ranges_from_metadata(

# Use "engine" to collect data byte ranges
data_paths, data_starts, data_ends = engine._parquet_byte_ranges(
columns,
row_groups=row_groups,
metadata=metadata,
columns, row_groups=row_groups, metadata=metadata, filters=filters
)

# Merge adjacent offset ranges
Expand Down Expand Up @@ -401,16 +411,19 @@ def _parquet_byte_ranges(
metadata=None,
footer=None,
footer_start=None,
filters=None,
):
# Initialize offset ranges and define ParqetFile metadata
pf = metadata
data_paths, data_starts, data_ends = [], [], []
if filters and row_groups:
raise ValueError("filters and row_groups cannot be used together")
if pf is None:
pf = self.fp.ParquetFile(io.BytesIO(footer))

# Convert columns to a set and add any index columns
# specified in the pandas metadata (just in case)
column_set = None if columns is None else set(columns)
column_set = None if columns is None else {c.split(".", 1)[0] for c in columns}
if column_set is not None and hasattr(pf, "pandas_metadata"):
md_index = [
ind
Expand All @@ -422,7 +435,12 @@ def _parquet_byte_ranges(

# Check if row_groups is a list of integers
# or a list of row-group metadata
if row_groups and not isinstance(row_groups[0], int):
if filters:
from fastparquet.api import filter_row_groups

row_group_indices = None
row_groups = filter_row_groups(pf, filters)
elif row_groups and not isinstance(row_groups[0], int):
# Input row_groups contains row-group metadata
row_group_indices = None
else:
Expand Down Expand Up @@ -486,9 +504,12 @@ def _parquet_byte_ranges(
metadata=None,
footer=None,
footer_start=None,
filters=None,
):
if metadata is not None:
raise ValueError("metadata input not supported for PyarrowEngine")
if filters:
raise NotImplementedError

data_starts, data_ends = [], []
md = self.pq.ParquetFile(io.BytesIO(footer)).metadata
Expand Down
Loading
Loading