Skip to content
This repository has been archived by the owner on May 10, 2024. It is now read-only.

PARQUET-1372: Add an API to allow writing RowGroups based on size #484

Closed
wants to merge 17 commits into from

Conversation

majetideepak
Copy link

@majetideepak majetideepak commented Aug 13, 2018

I split the changes into multiple commits to ease the review.
Used the example program to test the new API.
I will add unit tests once we converge on the API after review.
Thanks to @AnatoliShein for collaborating with the API design.

#include <cassert>
#include <fstream>
#include <iostream>
#include <list>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Doesn't look like you're using anything from list in here. Maybe I am missing something implicit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be result of copy-paste, IWYU could confirm

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will check and fix!

// All the values are compressed and stored in memory
// Values are written to the file on Close

class PARQUET_EXPORT RowGroupWriter2 {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A name like "FixedSizeRowGroupWriter", really anything other than 2, would be a little more intuitive in the public API.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed about the name. Something like ContinuousRowGroupWriter might work a bit better though since we now allow writing to each column multiple times, and this way suffix "2" can be replaced in all other locations with something like "_cont". "Repeated" might work too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree that RowGroupWriter2 is not a good name.

Why do we need a new class? It's not clear to me that it's needed

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current RowGroupWriter class has virtual ColumnWriter* NextColumn() = 0; which does not fit well with the use case here which is to be able to write to a column chunk any number of times (can cycle around all column chunks any number of times) and also any order.
The current RowGroupWriter requires us to write a column chunk at one go and move to the immediate next column.
We need the following external API to support this use case.
virtual ColumnWriter* get_column(i) = 0;
The new RowGroupWriter2 has different semantics in this aspect.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not tied to this name. Will change to FixedSizeRowGroupWriter

virtual int num_columns() const = 0;
virtual int64_t num_rows() const = 0;
virtual int64_t current_compressed_bytes() const = 0;
virtual ColumnWriter* get_column(int i) const = 0;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the ColumnWriter can indirectly touch the RowGroupWriter's internal state it'd be best to return a const ColumnWriter* from the const accessor.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do!

@@ -102,6 +144,8 @@ class PARQUET_EXPORT ParquetFileWriter {

virtual RowGroupWriter* AppendRowGroup() = 0;

virtual RowGroupWriter2* AppendRowGroup2() = 0;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider a subclass for writing fixed size rowgroups that overrides AppendRowGroup or take a param in the ParquetFileWriter ctor that determines how it wants to pack rowgroups. That way whatever client code is pushing tuples in doesn't have to care about the on-disk layout after the writer is created.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I think these details should be handled in the WriterProperties and the API of RowGroupWriter should be able to indicate to the caller whether it is "safe" to continue writing, or whether the current row group needs to be terminated and flushed

Copy link
Author

@majetideepak majetideepak Aug 15, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will restrict the clients from being able to loosely or tightly bound a RowGroup size.
Example: Clients might want to write a row at a time or a batch at a time or do both adaptively to achieve their target RowGroup size.
Clients requirements may vary based on a strict upper bound or a loose upper bound as well.

Edit: I might have misunderstood your comments. If the proposal is to use the current RowGroupWriter and extend it, then it all boils down to how we can extend the current RowGroupWriter API to write multiple column chunks.

Copy link
Member

@wesm wesm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's great to add this functionality, but I think we should spend some energy to determine what is the best API for end users and results in more maintainable code. We also need some unit tests

#include <cassert>
#include <fstream>
#include <iostream>
#include <list>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be result of copy-paste, IWYU could confirm

std::cout << "Parquet Writing and Reading Complete" << std::endl;

return 0;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to reuse code between this file and the other read/write example?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The two examples are quite different. The other example writes a single row group with a predetermined number of rows. I can move the schema creation part to a common header.

@@ -294,6 +295,8 @@ ColumnWriter::ColumnWriter(ColumnChunkMetaDataBuilder* metadata,
num_buffered_encoded_values_(0),
rows_written_(0),
total_bytes_written_(0),
current_compressed_bytes_(0),
flush_on_close_(flush_on_close),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to make this part of WriterProperties?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will depend on what RowGroupWriter API we provide to the users. If parquet-cpp takes the responsibility of bounding RowGroup sizes (which I think is not in its scope based on the example above), then this can be.

WriteDictionaryPage();
}

flush_on_close_ = false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems a tad hacky to me; I think there is a missing use of abstraction here which is the management of pages to be written to the file. The Parquet Java library has notions that provide for multiple implementations of this:

https://github.com/apache/parquet-mr/tree/master/parquet-column/src/main/java/org/apache/parquet/column/page

We don't need to necessarily do it in this patch, but using the existing PageWriter (or something similar if what is there now doesn't have the right features) abstraction with APIs to provide for

a) an implementation that accumulates pages in memory
b) an implementation that writes pages immediately

At some point we will want to make page writes asynchronous for better write performance so this refactoring will probably be desired at some point anyway

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PageWriter class is responsible for writing compressed data pages out. But the goal of either writing compressed Pages or accumulating them is more efficient if its part of the ColumnWriter.


if (flush_on_close_) {
current_compressed_bytes_ += page.size() + sizeof(format::DictionaryPageHeader);
saved_dictionary_page_.push_back(std::move(page));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This aligns with my comments above

@@ -209,6 +220,8 @@ class PARQUET_EXPORT ColumnWriter {

std::vector<CompressedDataPage> data_pages_;

std::vector<DictionaryPage> saved_dictionary_page_;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the pages all be kept in the same collection?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do!

// All the values are compressed and stored in memory
// Values are written to the file on Close

class PARQUET_EXPORT RowGroupWriter2 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree that RowGroupWriter2 is not a good name.

Why do we need a new class? It's not clear to me that it's needed

int64_t num_rows() const;

// Only considers the size of the compressed pages + page header in all the columns
// Some values might be still buffered an not written to a page yet
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See note above re: this comment

@@ -102,6 +144,8 @@ class PARQUET_EXPORT ParquetFileWriter {

virtual RowGroupWriter* AppendRowGroup() = 0;

virtual RowGroupWriter2* AppendRowGroup2() = 0;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I think these details should be handled in the WriterProperties and the API of RowGroupWriter should be able to indicate to the caller whether it is "safe" to continue writing, or whether the current row group needs to be terminated and flushed

AES_GCM_V1 = 0,
AES_GCM_CTR_V1 = 1
};
enum type { AES_GCM_V1 = 0, AES_GCM_CTR_V1 = 1 };
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clang-format artifact?

Copy link
Author

@majetideepak majetideepak Aug 15, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes! I did use CLANG_FORMAT_VERSION 6.0

<< " while previous column had " << num_rows_;
throw ParquetException(ss.str());
// verify when only one column is written at a time
if (!row_group_by_size_ && column_writers_.size() > 0 && column_writers_[0]) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This if-else is one of the reasons to use a new RowGroupWriter class in my previous version of this.

}

ColumnWriter* get_column(int i) override {
if (!row_group_by_size_) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This restriction is the other misfit in using a single RowGroupWriter API

@majetideepak
Copy link
Author

@wesm, @xhochy I made changes based on the feedback. I can quickly add unit tests if we agree on the API.

@@ -15,7 +15,11 @@
# specific language governing permissions and limitations
# under the License.

include_directories(SYSTEM . )
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use https://cmake.org/cmake/help/v3.3/command/target_include_directories.html so that this does not spread to the main library.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -50,8 +54,16 @@ class PARQUET_EXPORT RowGroupWriter {
virtual int64_t num_rows() const = 0;

virtual ColumnWriter* NextColumn() = 0;
// to be used only when row_group_by_size = true
virtual ColumnWriter* get_column(int i) = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would just name this column()

@@ -69,11 +81,17 @@ class PARQUET_EXPORT RowGroupWriter {

int num_columns() const;

// to be used only when row_group_by_size = true
ColumnWriter* get_column(int i);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

column()

@wesm
Copy link
Member

wesm commented Aug 23, 2018

Sorry for the delay in my follow up review. This is a priority for me tomorrow (Thursday) so we can release

Copy link
Member

@wesm wesm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a high level comment. The "row_groups_by_size" logic is up to the user, so this is a little bit misleading (unless I've misunderstood the code). Really what this is doing is buffering pages in memory and allowing you to decide whether to keep writing data before flushing to the file -- the decision may be based on how big the row group is. There are other reasons why you might want to do this, such as waiting until a row group is "ready" before issuing the write to an underlying file system. Since this is more general than writing by size, i.e. a type of buffering mode, we should name the APIs and parameters accordingly

@@ -15,7 +15,11 @@
# specific language governing permissions and limitations
# under the License.

include_directories(SYSTEM . )
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


/*
* This example describes writing and reading Parquet Files in C++ and serves as a
* reference to the API.
* The file contains all the physical data types supported by Parquet.
* This example uses the RowGroupWriter API that supports writing RowGroups optimized for memory consumption
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems maybe clang-format isn't hitting this file

@@ -100,7 +118,7 @@ class PARQUET_EXPORT ParquetFileWriter {
/// \note Deprecated since 1.3.0
RowGroupWriter* AppendRowGroup(int64_t num_rows);

virtual RowGroupWriter* AppendRowGroup() = 0;
virtual RowGroupWriter* AppendRowGroup(bool row_group_by_size = false) = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a new method AppendFixedSizeRowGroup() would result in more readable code than AppendRowGroup(true)

@@ -50,8 +54,16 @@ class PARQUET_EXPORT RowGroupWriter {
virtual int64_t num_rows() const = 0;

virtual ColumnWriter* NextColumn() = 0;
// to be used only when row_group_by_size = true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clarify in a comment for NextColumn() that ColumnWriter* objects become invalid when NextColumn is called unless writing fixed size row groups

@@ -39,6 +39,10 @@ class GroupNode;

} // namespace schema

// RowGroupWriter implementation that optimizes memory requirement
// All columns must be written one after the other
// Writing to a new column prevents modification to the previous column
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add \brief here and use /// so these comments will show up in doxygen. We can make sure the Parquet docs show up well in http://arrow.apache.org/docs/cpp/ after the merge

int RowGroupWriter::current_column() { return contents_->current_column(); }

int RowGroupWriter::num_columns() const { return contents_->num_columns(); }

int64_t RowGroupWriter::num_rows() const { return contents_->num_rows(); }

inline void throwRowsMisMatchError(int col, int64_t prev, int64_t curr) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ThrowRowsMismatchError

@wesm
Copy link
Member

wesm commented Aug 23, 2018

I suggest maybe calling this AppendBufferedRowGroup or something

Copy link
Member

@wesm wesm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. Thanks @majetideepak!

Copy link
Member

@xhochy xhochy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM

@xhochy xhochy closed this in 80e110c Aug 25, 2018
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
5 participants