Skip to content
This repository has been archived by the owner on May 10, 2024. It is now read-only.

PARQUET-691: Write ColumnChunk metadata after chunk is complete #224

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/parquet/file/metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,10 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
column_chunk_->meta_data.__set_encodings(thrift_encodings);
}

void WriteTo(OutputStream* sink) {
SerializeThriftMsg(column_chunk_, sizeof(format::ColumnChunk), sink);
}

const ColumnDescriptor* descr() const { return column_; }

private:
Expand Down Expand Up @@ -536,6 +540,10 @@ void ColumnChunkMetaDataBuilder::Finish(int64_t num_values,
compressed_size, uncompressed_size, has_dictionary, dictionary_fallback);
}

void ColumnChunkMetaDataBuilder::WriteTo(OutputStream* sink) {
impl_->WriteTo(sink);
}

const ColumnDescriptor* ColumnChunkMetaDataBuilder::descr() const {
return impl_->descr();
}
Expand Down
3 changes: 3 additions & 0 deletions src/parquet/file/metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ class PARQUET_EXPORT ColumnChunkMetaDataBuilder {
int64_t index_page_offset, int64_t data_page_offset, int64_t compressed_size,
int64_t uncompressed_size, bool has_dictionary, bool dictionary_fallback);

// For writing metadata at end of column chunk
void WriteTo(OutputStream* sink);

private:
explicit ColumnChunkMetaDataBuilder(const std::shared_ptr<WriterProperties>& props,
const ColumnDescriptor* column, uint8_t* contents);
Expand Down
12 changes: 8 additions & 4 deletions src/parquet/file/writer-internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ void SerializedPageWriter::Close(bool has_dictionary, bool fallback) {
// TODO: Remove default fallback = 'false' when implemented
metadata_->Finish(num_values_, dictionary_page_offset_, 0, data_page_offset_,
total_compressed_size_, total_uncompressed_size_, has_dictionary, fallback);

// Write metadata at end of column chunk
metadata_->WriteTo(sink_);
}

std::shared_ptr<Buffer> SerializedPageWriter::Compress(
Expand Down Expand Up @@ -104,8 +107,9 @@ int64_t SerializedPageWriter::WriteDataPage(const CompressedDataPage& page) {

int64_t start_pos = sink_->Tell();
if (data_page_offset_ == 0) { data_page_offset_ = start_pos; }
SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_);
int64_t header_size = sink_->Tell() - start_pos;

int64_t header_size =
SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_);
sink_->Write(compressed_data->data(), compressed_data->size());

total_uncompressed_size_ += uncompressed_size + header_size;
Expand Down Expand Up @@ -133,8 +137,8 @@ int64_t SerializedPageWriter::WriteDictionaryPage(const DictionaryPage& page) {

int64_t start_pos = sink_->Tell();
if (dictionary_page_offset_ == 0) { dictionary_page_offset_ = start_pos; }
SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_);
int64_t header_size = sink_->Tell() - start_pos;
int64_t header_size =
SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_);
sink_->Write(compressed_data->data(), compressed_data->size());

total_uncompressed_size_ += uncompressed_size + header_size;
Expand Down
3 changes: 0 additions & 3 deletions src/parquet/file/writer-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,6 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
int num_columns() const override;
int64_t num_rows() const override;

// TODO: PARQUET-579
// void WriteRowGroupStatitics() override;

ColumnWriter* NextColumn() override;
void Close() override;

Expand Down
3 changes: 2 additions & 1 deletion src/parquet/thrift/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ inline void DeserializeThriftMsg(const uint8_t* buf, uint32_t* len, T* deseriali
// The arguments are the object to be serialized and
// the expected size of the serialized object
template <class T>
inline void SerializeThriftMsg(T* obj, uint32_t len, OutputStream* out) {
inline int64_t SerializeThriftMsg(T* obj, uint32_t len, OutputStream* out) {
boost::shared_ptr<apache::thrift::transport::TMemoryBuffer> mem_buffer(
new apache::thrift::transport::TMemoryBuffer(len));
apache::thrift::protocol::TCompactProtocolFactoryT<
Expand All @@ -139,6 +139,7 @@ inline void SerializeThriftMsg(T* obj, uint32_t len, OutputStream* out) {
uint32_t out_length;
mem_buffer->getBuffer(&out_buffer, &out_length);
out->Write(out_buffer, out_length);
return out_length;
}

} // namespace parquet
Expand Down