Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions python/pyarrow/_parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,12 @@ cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil:
CStatus ReadRowGroup(int i, const vector[int]& column_indices,
shared_ptr[CTable]* out)

CStatus ReadRowGroups(const vector[int]& row_groups,
shared_ptr[CTable]* out)
CStatus ReadRowGroups(const vector[int]& row_groups,
const vector[int]& column_indices,
shared_ptr[CTable]* out)

CStatus ReadTable(shared_ptr[CTable]* out)
CStatus ReadTable(const vector[int]& column_indices,
shared_ptr[CTable]* out)
Expand Down
13 changes: 11 additions & 2 deletions python/pyarrow/_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1059,25 +1059,34 @@ cdef class ParquetReader:

def read_row_group(self, int i, column_indices=None,
bint use_threads=True):
return self.read_row_groups([i], column_indices, use_threads)

def read_row_groups(self, row_groups not None, column_indices=None,
bint use_threads=True):
cdef:
shared_ptr[CTable] ctable
vector[int] c_row_groups
vector[int] c_column_indices

if use_threads:
self.set_use_threads(use_threads)

for row_group in row_groups:
c_row_groups.push_back(row_group)

if column_indices is not None:
for index in column_indices:
c_column_indices.push_back(index)

with nogil:
check_status(self.reader.get()
.ReadRowGroup(i, c_column_indices, &ctable))
.ReadRowGroups(c_row_groups, c_column_indices,
&ctable))
else:
# Read all columns
with nogil:
check_status(self.reader.get()
.ReadRowGroup(i, &ctable))
.ReadRowGroups(c_row_groups, &ctable))
return pyarrow_wrap_table(ctable)

def read_all(self, column_indices=None, bint use_threads=True):
Expand Down
30 changes: 30 additions & 0 deletions python/pyarrow/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,36 @@ def read_row_group(self, i, columns=None, use_threads=True,
return self.reader.read_row_group(i, column_indices=column_indices,
use_threads=use_threads)

def read_row_groups(self, row_groups, columns=None, use_threads=True,
use_pandas_metadata=False):
"""
Read a multiple row groups from a Parquet file

Parameters
----------
row_groups: list
Only these row groups will be read from the file.
columns: list
If not None, only these columns will be read from the row group. A
column name may be a prefix of a nested field, e.g. 'a' will select
'a.b', 'a.c', and 'a.d.e'
use_threads : boolean, default True
Perform multi-threaded column reads
use_pandas_metadata : boolean, default False
If True and file has custom pandas schema metadata, ensure that
index columns are also loaded

Returns
-------
pyarrow.table.Table
Content of the row groups as a table (of columns)
"""
column_indices = self._get_column_indices(
columns, use_pandas_metadata=use_pandas_metadata)
return self.reader.read_row_groups(row_groups,
column_indices=column_indices,
use_threads=use_threads)

def read(self, columns=None, use_threads=True, use_pandas_metadata=False):
"""
Read a Table from Parquet format
Expand Down
44 changes: 44 additions & 0 deletions python/pyarrow/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -1319,6 +1319,50 @@ def test_read_single_row_group_with_column_subset():
tm.assert_frame_equal(df[cols], result.to_pandas())


@pytest.mark.pandas
def test_read_multiple_row_groups():
N, K = 10000, 4
df = alltypes_sample(size=N)

a_table = pa.Table.from_pandas(df)

buf = io.BytesIO()
_write_table(a_table, buf, row_group_size=N / K,
compression='snappy', version='2.0')

buf.seek(0)

pf = pq.ParquetFile(buf)

assert pf.num_row_groups == K

result = pf.read_row_groups(range(K))
tm.assert_frame_equal(df, result.to_pandas())


@pytest.mark.pandas
def test_read_multiple_row_groups_with_column_subset():
N, K = 10000, 4
df = alltypes_sample(size=N)
a_table = pa.Table.from_pandas(df)

buf = io.BytesIO()
_write_table(a_table, buf, row_group_size=N / K,
compression='snappy', version='2.0')

buf.seek(0)
pf = pq.ParquetFile(buf)

cols = list(df.columns[:2])
result = pf.read_row_groups(range(K), columns=cols)
tm.assert_frame_equal(df[cols], result.to_pandas())

# ARROW-4267: Selection of duplicate columns still leads to these columns
# being read uniquely.
result = pf.read_row_groups(range(K), columns=cols + cols)
tm.assert_frame_equal(df[cols], result.to_pandas())


@pytest.mark.pandas
def test_scan_contents():
N, K = 10000, 4
Expand Down