JollyJack is a high-performance Parquet reader designed to load data directly into NumPy arrays and PyTorch tensors with minimal overhead.
- Load Parquet straight into NumPy arrays or PyTorch tensors (fp16, fp32, fp64, int32, int64)
- Up to 6× faster and with lower memory use than vanilla PyArrow
- Compatibility with PalletJack
- Optional io_uring + O_DIRECT backend for I/O-bound workloads
- Data must not contain null values
- Destination NumPy arrays and PyTorch tensors must be column-major (Fortran-style)
By default, the reader uses the regular file API via
parquet::ParquetFileReader. In most cases, this is the recommended choice.
An alternative reader backend based on io_uring is also available. It may provide better performance for some workloads, particularly when used together with O_DIRECT.
To enable the alternative backend, set the JJ_READER_BACKEND environment
variable to one of the following values:
io_uring- Uses io_uring for async I/O with the page cacheio_uring_odirect- Uses io_uring with O_DIRECT (bypasses the page cache)
JollyJack performance is primarily determined by I/O, threading, and memory allocation behavior. The optimal configuration depends on whether your workload is I/O-bound or memory-/CPU-bound.
- JollyJack can be safely called concurrently from multiple threads.
- Parallel reads usually improve throughput, but oversubscribing threads can cause contention and degrade performance.
- Reusing NumPy arrays or PyTorch tensors avoids repeated memory allocation.
- While allocation itself is fast, it can trigger kernel contention and degrade performance.
For datasets larger than the available page cache, performance is typically
I/O-bound. Enabling either pre_buffer=True or prefetch_page_cache=True
brings throughput close to the raw I/O ceiling, but prefetch_page_cache
avoids the increased LLC miss rate caused by pre_buffer
(see Page cache prefetching below).
Recommended configuration:
use_threads = True,prefetch_page_cache = True,pre_buffer = False, with the default reader backend.
For datasets that comfortably fit in RAM, performance is typically CPU- or
memory-bound. Using pre_buffer is not recommended because it leads to an
increased LLC miss rate and suboptimal performance
(see Page cache prefetching below).
Recommended configuration:
use_threads = True,prefetch_page_cache = True,pre_buffer = False, with the default reader backend.
The prefetch_page_cache option calls posix_fadvise(POSIX_FADV_WILLNEED) to tell
the kernel to start loading the relevant byte ranges into the page cache.
Each worker thread then reads directly via pread into its own
locally-allocated buffer, keeping data hot in its local CPU caches.
This avoids the LLC (Last Level Cache) miss problem with pre_buffer=True,
where Arrow's IO thread pool fills temporary buffers on one core and worker
threads on different cores later consume cold data.
This is only useful for local or network-mounted file systems that have a page cache. Remote file systems such as S3 will not benefit from this.
The Linux kernel's force_page_cache_ra
caps the number of pages read per posix_fadvise call to the block device's
readahead window.
Any bytes beyond this cap are silently ignored. The readahead window is
typically 128 KB or higher. Check the value for your device:
cat /sys/block/<device>/queue/read_ahead_kbIf range_size_limit exceeds this value, most of each coalesced range will
not be prefetched. Set range_size_limit to match or stay below the device's
read_ahead_kb:
cache_options = pa.CacheOptions(
hole_size_limit=8192,
range_size_limit=128*1024, # must not exceed read_ahead_kb
lazy=False,
)There are two ways to enable page cache prefetching:
As a parameter on read_into_numpy:
jj.read_into_numpy(
source=path,
metadata=pr.metadata,
np_array=np_array,
row_group_indices=range(pr.metadata.num_row_groups),
column_indices=range(pr.metadata.num_columns),
prefetch_page_cache=True,
cache_options=cache_options,
)As a standalone call:
jj.prefetch_page_cache(
source=path,
metadata=pr.metadata,
row_group_indices=range(pr.metadata.num_row_groups),
column_indices=range(pr.metadata.num_columns),
cache_options=cache_options,
)The standalone call is useful for sliding-window prefetching, where you prefetch the next files while processing the current one:
# Prime the pump
for path in file_paths[:PREFETCH_DEPTH]:
jj.prefetch_page_cache(source=path, ...)
# Main loop
for i, path in enumerate(file_paths):
# Slide the window
ahead_index = i + PREFETCH_DEPTH
if ahead_index < len(file_paths):
jj.prefetch_page_cache(source=file_paths[ahead_index], ...)
# Page cache should already be warm
jj.read_into_numpy(source=path, np_array=np_array, ...)
process(np_array)Column ordering and use_threads:
When using prefetch_page_cache, the order in which columns are read matters.
Sorting column_indices by source column index produces a sequential I/O
pattern, which can significantly improve throughput. The effect depends on the
storage device and kernel readahead settings. Use a dict to preserve the
original target mapping:
# column_indices_to_read is an unsorted list, e.g. [5, 2, 8]
# Sort by source column index, preserving the original target mapping.
col_indices = {
src: dst
for src, dst in sorted(
zip(column_indices_to_read, range(len(column_indices_to_read)))
)
}
# Result: {2: 1, 5: 0, 8: 2} — reads columns in file order,
# writes each to the same target column as the unsorted list.For similar reasons, avoid setting use_threads=True with
prefetch_page_cache. Arrow's internal thread pool dispatches column reads
across cores in an unpredictable order, breaking the sequential I/O pattern
that makes prefetching effective. Use multiple worker threads at the
application level instead, each reading its own file or row group with use_threads=False.
If you use pre_buffer=True instead of prefetch_page_cache, the following
tuning applies.
When pre_buffer=True, Arrow merges nearby column ranges and reads them into
temporary buffers. The default maximum merged range is 32 MB
(range_size_limit).
Arrow supports several memory allocators (mimalloc, jemalloc, system). With
mimalloc (the default on most platforms), allocations above
~16 MB
go straight to the OS (mmap/munmap) instead of the internal arena. This
means the memory cannot be reused between calls, and each call pays the cost
of mapping and zeroing fresh pages. Other allocators may behave similarly.
To avoid this, lower range_size_limit so that merged ranges fit inside the
allocator's arena:
cache_options = pa.CacheOptions(
hole_size_limit=8192, # default
range_size_limit=16*1024*1024, # 16 MB, fits in mimalloc arena
lazy=False,
)
jj.read_into_numpy(
source=path,
metadata=None,
np_array=np_array,
row_group_indices=[0],
column_indices=range(n_columns),
pre_buffer=True,
cache_options=cache_options,
)To debug allocator issues with mimalloc, run with MIMALLOC_SHOW_STATS=1 and
MIMALLOC_VERBOSE=1. This prints allocation statistics at process exit.
When pre_buffer=True, Arrow dispatches reads to its IO thread pool,
configured via the ARROW_IO_THREADS environment variable (default: 8).
Tuning this value may improve performance.
- pyarrow ~= 24.0.0
JollyJack builds on top of PyArrow. While the source package may work with newer versions, the prebuilt binary wheels are built and tested against pyarrow 24.x.
pip install jollyjackimport jollyjack as jj
import pyarrow.parquet as pq
import pyarrow as pa
import numpy as np
from pyarrow import fs
chunk_size = 3
n_row_groups = 2
n_columns = 5
n_rows = n_row_groups * chunk_size
path = "my.parquet"
data = np.random.rand(n_rows, n_columns).astype(np.float32)
pa_arrays = [pa.array(data[:, i]) for i in range(n_columns)]
schema = pa.schema([(f"column_{i}", pa.float32()) for i in range(n_columns)])
table = pa.Table.from_arrays(pa_arrays, schema=schema)
pq.write_table(
table,
path,
row_group_size=chunk_size,
use_dictionary=False,
write_statistics=True,
store_schema=False,
write_page_index=True,
)# Create an array of zeros
np_array = np.zeros((n_rows, n_columns), dtype="f", order="F")pr = pq.ParquetReader()
pr.open(path)
row_begin = 0
row_end = 0
for rg in range(pr.metadata.num_row_groups):
row_begin = row_end
row_end = row_begin + pr.metadata.row_group(rg).num_rows
# To define which subset of the NumPy array we want read into,
# we need to create a view which shares underlying memory with the target NumPy array
subset_view = np_array[row_begin:row_end, :]
jj.read_into_numpy(
source=path,
metadata=pr.metadata,
np_array=subset_view,
row_group_indices=[rg],
column_indices=range(pr.metadata.num_columns),
)
# Alternatively
with fs.LocalFileSystem().open_input_file(path) as f:
jj.read_into_numpy(
source=f,
metadata=None,
np_array=np_array,
row_group_indices=range(pr.metadata.num_row_groups),
column_indices=range(pr.metadata.num_columns),
)with fs.LocalFileSystem().open_input_file(path) as f:
jj.read_into_numpy(
source=f,
metadata=None,
np_array=np_array,
row_group_indices=range(pr.metadata.num_row_groups),
column_indices={
i: pr.metadata.num_columns - i - 1 for i in range(pr.metadata.num_columns)
},
)with fs.LocalFileSystem().open_input_file(path) as f:
jj.read_into_numpy(
source=f,
metadata=None,
np_array=np_array,
row_group_indices=range(pr.metadata.num_row_groups),
column_indices=((3, 0), (3, 1)),
)np_array = np.zeros((n_rows, n_columns), dtype="f", order="F")
with fs.LocalFileSystem().open_input_file(path) as f:
jj.read_into_numpy(
source=f,
metadata=None,
np_array=np_array,
row_group_indices=[0],
row_ranges=[slice(0, 1), slice(4, 6)],
column_indices=range(pr.metadata.num_columns),
)
print(np_array)np_array = np.zeros((n_rows, n_columns), dtype="f", order="F")
cache_options = pa.CacheOptions(
hole_size_limit=8192, # default
range_size_limit=16*1024*1024, # 16 MB, fits in mimalloc arena
lazy=False,
)
with fs.LocalFileSystem().open_input_file(path) as f:
jj.read_into_numpy(
source=f,
metadata=None,
np_array=np_array,
row_group_indices=[0],
row_ranges=[slice(0, 1), slice(4, 6)],
column_indices=range(pr.metadata.num_columns),
cache_options=cache_options,
pre_buffer=True,
)
print(np_array)np_array = np.zeros((n_rows, n_columns), dtype="f", order="F")
pr = pq.ParquetReader()
pr.open(path)
# cache_options controls which byte ranges are prefetched into the page cache.
# range_size_limit should not exceed the device's read_ahead_kb,
# because the kernel silently ignores readahead beyond that cap.
cache_options = pa.CacheOptions(
hole_size_limit=8192,
range_size_limit=128*1024, # must not exceed read_ahead_kb
lazy=False,
)
# Prefetch and read in one call
jj.read_into_numpy(
source=path,
metadata=pr.metadata,
np_array=np_array,
row_group_indices=range(pr.metadata.num_row_groups),
column_indices=range(pr.metadata.num_columns),
cache_options=cache_options,
prefetch_page_cache=True,
)
# Or prefetch separately, then read
jj.prefetch_page_cache(
source=path,
metadata=pr.metadata,
row_group_indices=range(pr.metadata.num_row_groups),
column_indices=range(pr.metadata.num_columns),
cache_options=cache_options,
)
jj.read_into_numpy(
source=path,
metadata=pr.metadata,
np_array=np_array,
row_group_indices=range(pr.metadata.num_row_groups),
column_indices=range(pr.metadata.num_columns),
pre_buffer=False,
)import torch
# Create a tensor and transpose it to get Fortran-style order
tensor = torch.zeros(n_columns, n_rows, dtype=torch.float32).transpose(0, 1)pr = pq.ParquetReader()
pr.open(path)
jj.read_into_torch(
source=path,
metadata=pr.metadata,
tensor=tensor,
row_group_indices=range(pr.metadata.num_row_groups),
column_indices=range(pr.metadata.num_columns),
pre_buffer=True,
use_threads=True,
)
print(tensor)