Skip to content

Commit

Permalink
apacheGH-36441: [Python] Make CacheOptions configurable from Python (
Browse files Browse the repository at this point in the history
…apache#36627)

### Rationale for this change
Resolves: apache#36441

### What changes are included in this PR?
- Add python bindings for `CacheOptions` from the C++ side. 
- Allow setting `cache_options` on `ParquetFragmentScanOptions` from the python side. 
- Adjust some of the comments on `CacheOptions`

### Are these changes tested?
Yes. I added python side tests for these newly available configs similar to other configs. I have not added an integration test that ensures setting the configs on the python side leads to correctly using them on the C++ side. 

### Are there any user-facing changes?
Yes. The are new configs available on the python side but the defaults are unchanged. I've added/updated docstrings where relevant. 

* Closes: apache#36441

Lead-authored-by: Thomas Newton <thomas.w.newton@gmail.com>
Co-authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Signed-off-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
  • Loading branch information
2 people authored and clayburn committed Jan 23, 2024
1 parent f0c8a4b commit 36c66b3
Show file tree
Hide file tree
Showing 9 changed files with 271 additions and 17 deletions.
10 changes: 8 additions & 2 deletions cpp/src/arrow/io/caching.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ struct ARROW_EXPORT CacheOptions {
/// size greater than this, they are not combined
int64_t range_size_limit;
/// \brief A lazy cache does not perform any I/O until requested.
/// lazy = false: request all byte ranges when PreBuffer or WillNeed is called.
/// lazy = True, prefetch_limit = 0: request merged byte ranges only after the reader
/// needs them.
/// lazy = True, prefetch_limit = k: prefetch up to k merged byte ranges ahead of the
/// range that is currently being read.
bool lazy;
/// \brief The maximum number of ranges to be prefetched. This is only used
/// for lazy cache to asynchronously read some ranges after reading the target range.
Expand All @@ -56,9 +61,10 @@ struct ARROW_EXPORT CacheOptions {
/// \brief Construct CacheOptions from network storage metrics (e.g. S3).
///
/// \param[in] time_to_first_byte_millis Seek-time or Time-To-First-Byte (TTFB) in
/// milliseconds, also called call setup latency of a new S3 request.
/// milliseconds, also called call setup latency of a new read request.
/// The value is a positive integer.
/// \param[in] transfer_bandwidth_mib_per_sec Data transfer Bandwidth (BW) in MiB/sec.
/// \param[in] transfer_bandwidth_mib_per_sec Data transfer Bandwidth (BW) in MiB/sec
/// (per connection).
/// The value is a positive integer.
/// \param[in] ideal_bandwidth_utilization_frac Transfer bandwidth utilization fraction
/// (per connection) to maximize the net data load.
Expand Down
2 changes: 1 addition & 1 deletion python/pyarrow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ def print_entry(label, value):

# I/O
from pyarrow.lib import (NativeFile, PythonFile,
BufferedInputStream, BufferedOutputStream,
BufferedInputStream, BufferedOutputStream, CacheOptions,
CompressedInputStream, CompressedOutputStream,
TransformInputStream, transcoding_input_stream,
FixedSizeBufferWriter,
Expand Down
21 changes: 19 additions & 2 deletions python/pyarrow/_dataset_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ from pyarrow._dataset cimport (
FileWriteOptions,
Fragment,
FragmentScanOptions,
CacheOptions,
Partitioning,
PartitioningFactory,
WrittenFile
Expand Down Expand Up @@ -693,6 +694,10 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
parallel using a background I/O thread pool.
Set to False if you want to prioritize minimal memory usage
over maximum speed.
cache_options : pyarrow.CacheOptions, default None
Cache options used when pre_buffer is enabled. The default values should
be good for most use cases. You may want to adjust these for example if
you have exceptionally high latency to the file system.
thrift_string_size_limit : int, default None
If not None, override the maximum total string size allocated
when decoding Thrift structures. The default limit should be
Expand All @@ -714,6 +719,7 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
def __init__(self, *, bint use_buffered_stream=False,
buffer_size=8192,
bint pre_buffer=True,
cache_options=None,
thrift_string_size_limit=None,
thrift_container_size_limit=None,
decryption_config=None,
Expand All @@ -723,6 +729,8 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
self.use_buffered_stream = use_buffered_stream
self.buffer_size = buffer_size
self.pre_buffer = pre_buffer
if cache_options is not None:
self.cache_options = cache_options
if thrift_string_size_limit is not None:
self.thrift_string_size_limit = thrift_string_size_limit
if thrift_container_size_limit is not None:
Expand Down Expand Up @@ -770,6 +778,14 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
def pre_buffer(self, bint pre_buffer):
self.arrow_reader_properties().set_pre_buffer(pre_buffer)

@property
def cache_options(self):
return CacheOptions.wrap(self.arrow_reader_properties().cache_options())

@cache_options.setter
def cache_options(self, CacheOptions options):
self.arrow_reader_properties().set_cache_options(options.unwrap())

@property
def thrift_string_size_limit(self):
return self.reader_properties().thrift_string_size_limit()
Expand Down Expand Up @@ -828,11 +844,11 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
bool
"""
attrs = (
self.use_buffered_stream, self.buffer_size, self.pre_buffer,
self.use_buffered_stream, self.buffer_size, self.pre_buffer, self.cache_options,
self.thrift_string_size_limit, self.thrift_container_size_limit,
self.page_checksum_verification)
other_attrs = (
other.use_buffered_stream, other.buffer_size, other.pre_buffer,
other.use_buffered_stream, other.buffer_size, other.pre_buffer, other.cache_options,
other.thrift_string_size_limit,
other.thrift_container_size_limit, other.page_checksum_verification)
return attrs == other_attrs
Expand All @@ -849,6 +865,7 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
use_buffered_stream=self.use_buffered_stream,
buffer_size=self.buffer_size,
pre_buffer=self.pre_buffer,
cache_options=self.cache_options,
thrift_string_size_limit=self.thrift_string_size_limit,
thrift_container_size_limit=self.thrift_container_size_limit,
page_checksum_verification=self.page_checksum_verification
Expand Down
6 changes: 4 additions & 2 deletions python/pyarrow/_parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport (CChunkedArray, CScalar, CSchema, CStatus,
CTable, CMemoryPool, CBuffer,
CKeyValueMetadata,
CRandomAccessFile, COutputStream,
CKeyValueMetadata, CRandomAccessFile,
COutputStream, CCacheOptions,
TimeUnit, CRecordBatchReader)
from pyarrow.lib cimport _Weakrefable

Expand Down Expand Up @@ -393,6 +393,8 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
int64_t batch_size()
void set_pre_buffer(c_bool pre_buffer)
c_bool pre_buffer() const
void set_cache_options(CCacheOptions options)
CCacheOptions cache_options() const
void set_coerce_int96_timestamp_unit(TimeUnit unit)
TimeUnit coerce_int96_timestamp_unit() const

Expand Down
16 changes: 16 additions & 0 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -1347,6 +1347,22 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" nogil:
CStatus Write(const uint8_t* data, int64_t nbytes)
CStatus Flush()

cdef cppclass CCacheOptions "arrow::io::CacheOptions":
int64_t hole_size_limit
int64_t range_size_limit
c_bool lazy
int64_t prefetch_limit
c_bool Equals "operator==" (CCacheOptions other)

@staticmethod
CCacheOptions MakeFromNetworkMetrics(int64_t time_to_first_byte_millis,
int64_t transfer_bandwidth_mib_per_sec,
double ideal_bandwidth_utilization_frac,
int64_t max_ideal_request_size_mib)

@staticmethod
CCacheOptions LazyDefaults()

cdef cppclass COutputStream" arrow::io::OutputStream"(FileInterface,
Writable):
pass
Expand Down
134 changes: 134 additions & 0 deletions python/pyarrow/io.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -2122,6 +2122,140 @@ cdef CCompressionType _ensure_compression(str name) except *:
raise ValueError('Invalid value for compression: {!r}'.format(name))


cdef class CacheOptions(_Weakrefable):
"""
Cache options for a pre-buffered fragment scan.
Parameters
----------
hole_size_limit : int, default 8KiB
The maximum distance in bytes between two consecutive ranges; beyond
this value, ranges are not combined.
range_size_limit : int, default 32MiB
The maximum size in bytes of a combined range; if combining two
consecutive ranges would produce a range of a size greater than this,
they are not combined
lazy : bool, default True
lazy = false: request all byte ranges when PreBuffer or WillNeed is called.
lazy = True, prefetch_limit = 0: request merged byte ranges only after the reader
needs them.
lazy = True, prefetch_limit = k: prefetch up to k merged byte ranges ahead of the
range that is currently being read.
prefetch_limit : int, default 0
The maximum number of ranges to be prefetched. This is only used for
lazy cache to asynchronously read some ranges after reading the target
range.
"""

def __init__(self, *, hole_size_limit=None, range_size_limit=None, lazy=None, prefetch_limit=None):
self.wrapped = CCacheOptions.LazyDefaults()
if hole_size_limit is not None:
self.hole_size_limit = hole_size_limit
if range_size_limit is not None:
self.range_size_limit = range_size_limit
if lazy is not None:
self.lazy = lazy
if prefetch_limit is not None:
self.prefetch_limit = prefetch_limit

cdef void init(self, CCacheOptions options):
self.wrapped = options

cdef inline CCacheOptions unwrap(self):
return self.wrapped

@staticmethod
cdef wrap(CCacheOptions options):
self = CacheOptions()
self.init(options)
return self

@property
def hole_size_limit(self):
return self.wrapped.hole_size_limit

@hole_size_limit.setter
def hole_size_limit(self, hole_size_limit):
self.wrapped.hole_size_limit = hole_size_limit

@property
def range_size_limit(self):
return self.wrapped.range_size_limit

@range_size_limit.setter
def range_size_limit(self, range_size_limit):
self.wrapped.range_size_limit = range_size_limit

@property
def lazy(self):
return self.wrapped.lazy

@lazy.setter
def lazy(self, lazy):
self.wrapped.lazy = lazy

@property
def prefetch_limit(self):
return self.wrapped.prefetch_limit

@prefetch_limit.setter
def prefetch_limit(self, prefetch_limit):
self.wrapped.prefetch_limit = prefetch_limit

def __eq__(self, CacheOptions other):
try:
return self.unwrap().Equals(other.unwrap())
except TypeError:
return False

@staticmethod
def from_network_metrics(time_to_first_byte_millis, transfer_bandwidth_mib_per_sec,
ideal_bandwidth_utilization_frac=0.9, max_ideal_request_size_mib=64):
"""
Create suiteable CacheOptions based on provided network metrics.
Typically this will be used with object storage solutions like Amazon S3,
Google Cloud Storage and Azure Blob Storage.
Parameters
----------
time_to_first_byte_millis : int
Seek-time or Time-To-First-Byte (TTFB) in milliseconds, also called call
setup latency of a new read request. The value is a positive integer.
transfer_bandwidth_mib_per_sec : int
Data transfer Bandwidth (BW) in MiB/sec (per connection). The value is a positive
integer.
ideal_bandwidth_utilization_frac : int, default 0.9
Transfer bandwidth utilization fraction (per connection) to maximize the net
data load. The value is a positive float less than 1.
max_ideal_request_size_mib : int, default 64
The maximum single data request size (in MiB) to maximize the net data load.
Returns
-------
CacheOptions
"""
return CacheOptions.wrap(CCacheOptions.MakeFromNetworkMetrics(
time_to_first_byte_millis, transfer_bandwidth_mib_per_sec,
ideal_bandwidth_utilization_frac, max_ideal_request_size_mib))

@staticmethod
@binding(True) # Required for Cython < 3
def _reconstruct(kwargs):
# __reduce__ doesn't allow passing named arguments directly to the
# reconstructor, hence this wrapper.
return CacheOptions(**kwargs)

def __reduce__(self):
kwargs = dict(
hole_size_limit=self.hole_size_limit,
range_size_limit=self.range_size_limit,
lazy=self.lazy,
prefetch_limit=self.prefetch_limit,
)
return CacheOptions._reconstruct, (kwargs,)


cdef class Codec(_Weakrefable):
"""
Compression codec.
Expand Down
12 changes: 12 additions & 0 deletions python/pyarrow/lib.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,18 @@ cdef class RecordBatchReader(_Weakrefable):
SharedPtrNoGIL[CRecordBatchReader] reader


cdef class CacheOptions(_Weakrefable):
cdef:
CCacheOptions wrapped

cdef void init(self, CCacheOptions options)

cdef inline CCacheOptions unwrap(self)

@staticmethod
cdef wrap(const CCacheOptions options)


cdef class Codec(_Weakrefable):
cdef:
shared_ptr[CCodec] wrapped
Expand Down
28 changes: 18 additions & 10 deletions python/pyarrow/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,16 @@
# under the License.

import contextlib
import os
import posixpath
import datetime
import os
import pathlib
import posixpath
import sys
import textwrap
import tempfile
import textwrap
import threading
import time
from shutil import copytree

from urllib.parse import quote

import numpy as np
Expand All @@ -35,12 +34,12 @@
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.csv
import pyarrow.json
import pyarrow.feather
import pyarrow.fs as fs
from pyarrow.tests.util import (change_cwd, _filesystem_uri,
FSProtocolClass, ProxyHandler,
_configure_s3_limited_user)
import pyarrow.json
from pyarrow.tests.util import (FSProtocolClass, ProxyHandler,
_configure_s3_limited_user, _filesystem_uri,
change_cwd)

try:
import pandas as pd
Expand Down Expand Up @@ -138,7 +137,8 @@ def mockfs():

@pytest.fixture
def open_logging_fs(monkeypatch):
from pyarrow.fs import PyFileSystem, LocalFileSystem
from pyarrow.fs import LocalFileSystem, PyFileSystem

from .test_fs import ProxyHandler

localfs = LocalFileSystem()
Expand Down Expand Up @@ -791,6 +791,9 @@ def test_parquet_scan_options():
thrift_container_size_limit=987654,)
opts6 = ds.ParquetFragmentScanOptions(
page_checksum_verification=True)
cache_opts = pa.CacheOptions(
hole_size_limit=2**10, range_size_limit=8*2**10, lazy=True)
opts7 = ds.ParquetFragmentScanOptions(pre_buffer=True, cache_options=cache_opts)

assert opts1.use_buffered_stream is False
assert opts1.buffer_size == 2**13
Expand All @@ -816,12 +819,17 @@ def test_parquet_scan_options():

assert opts6.page_checksum_verification is True

assert opts7.pre_buffer is True
assert opts7.cache_options == cache_opts
assert opts7.cache_options != opts1.cache_options

assert opts1 == opts1
assert opts1 != opts2
assert opts2 != opts3
assert opts3 != opts4
assert opts5 != opts1
assert opts6 != opts1
assert opts7 != opts1


def test_file_format_pickling(pickle_module):
Expand Down Expand Up @@ -2711,7 +2719,7 @@ def test_open_dataset_from_uri_s3_fsspec(s3_example_simple):
table, path, _, _, host, port, access_key, secret_key = s3_example_simple
s3fs = pytest.importorskip("s3fs")

from pyarrow.fs import PyFileSystem, FSSpecHandler
from pyarrow.fs import FSSpecHandler, PyFileSystem

fs = s3fs.S3FileSystem(
key=access_key,
Expand Down
Loading

0 comments on commit 36c66b3

Please sign in to comment.