Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-3020: [C++/Python] Allow empty arrow::Table objects to be written as empty Parquet row groups #3269

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 22 additions & 8 deletions cpp/src/parquet/arrow/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,10 @@ class ArrowColumnWriter {
Status Write(const Array& data);

Status Write(const ChunkedArray& data, int64_t offset, const int64_t size) {
if (data.length() == 0) {
return Status::OK();
}

int64_t absolute_position = 0;
int chunk_index = 0;
int64_t chunk_offset = 0;
Expand Down Expand Up @@ -1134,22 +1138,32 @@ Status WriteFileMetaData(const FileMetaData& file_metadata,
namespace {} // namespace

Status FileWriter::WriteTable(const Table& table, int64_t chunk_size) {
if (chunk_size <= 0) {
if (chunk_size <= 0 && table.num_rows() > 0) {
return Status::Invalid("chunk size per row_group must be greater than 0");
} else if (chunk_size > impl_->properties().max_row_group_length()) {
chunk_size = impl_->properties().max_row_group_length();
}

for (int chunk = 0; chunk * chunk_size < table.num_rows(); chunk++) {
int64_t offset = chunk * chunk_size;
int64_t size = std::min(chunk_size, table.num_rows() - offset);

RETURN_NOT_OK_ELSE(NewRowGroup(size), PARQUET_IGNORE_NOT_OK(Close()));
auto WriteRowGroup = [&](int64_t offset, int64_t size) {
RETURN_NOT_OK(NewRowGroup(size));
for (int i = 0; i < table.num_columns(); i++) {
auto chunked_data = table.column(i)->data();
RETURN_NOT_OK_ELSE(WriteColumnChunk(chunked_data, offset, size),
PARQUET_IGNORE_NOT_OK(Close()));
RETURN_NOT_OK(WriteColumnChunk(chunked_data, offset, size));
}
return Status::OK();
};

if (table.num_rows() == 0) {
// Append a row group with 0 rows
RETURN_NOT_OK_ELSE(WriteRowGroup(0, 0), PARQUET_IGNORE_NOT_OK(Close()));
return Status::OK();
}

for (int chunk = 0; chunk * chunk_size < table.num_rows(); chunk++) {
int64_t offset = chunk * chunk_size;
RETURN_NOT_OK_ELSE(
WriteRowGroup(offset, std::min(chunk_size, table.num_rows() - offset)),
PARQUET_IGNORE_NOT_OK(Close()));
}
return Status::OK();
}
Expand Down
13 changes: 6 additions & 7 deletions python/pyarrow/_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -909,17 +909,16 @@ cdef class ParquetWriter:
check_status(self.sink.get().Close())

def write_table(self, Table table, row_group_size=None):
cdef CTable* ctable = table.table
cdef:
CTable* ctable = table.table
int64_t c_row_group_size

if row_group_size is None or row_group_size == -1:
if ctable.num_rows() > 0:
row_group_size = ctable.num_rows()
else:
row_group_size = 1
c_row_group_size = ctable.num_rows()
elif row_group_size == 0:
raise ValueError('Row group size cannot be 0')

cdef int64_t c_row_group_size = row_group_size
else:
c_row_group_size = row_group_size

with nogil:
check_status(self.writer.get()
Expand Down
18 changes: 18 additions & 0 deletions python/pyarrow/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2251,6 +2251,24 @@ def test_merging_parquet_tables_with_different_pandas_metadata(tempdir):
writer.write_table(table2)


def test_empty_row_groups(tempdir):
# ARROW-3020
table = pa.Table.from_arrays([pa.array([], type='int32')], ['f0'])

path = tempdir / 'empty_row_groups.parquet'

num_groups = 3
with pq.ParquetWriter(path, table.schema) as writer:
for i in range(num_groups):
writer.write_table(table)

reader = pq.ParquetFile(path)
assert reader.metadata.num_row_groups == num_groups

for i in range(num_groups):
assert reader.read_row_group(i).equals(table)


def test_writing_empty_lists():
# ARROW-2591: [Python] Segmentation fault issue in pq.write_table
arr1 = pa.array([[], []], pa.list_(pa.int32()))
Expand Down