Skip to content

Commit

Permalink
GH-38887: [C++][Parquet] Move EstimatedBufferedValueBytes from TypedC…
Browse files Browse the repository at this point in the history
…olumnWriter to ColumnWriter (#39055)

### Rationale for this change

Trying to put `EstimatedBufferedValueBytes` from `TypedColumnWriter` to `ColumnWriter`.

### What changes are included in this PR?

put `EstimatedBufferedValueBytes` from `TypedColumnWriter` to `ColumnWriter`.

### Are these changes tested?

No, just interface change

### Are there any user-facing changes?

Yes, interface changed

* Closes: #38887

Authored-by: mwish <maplewish117@gmail.com>
Signed-off-by: mwish <maplewish117@gmail.com>
  • Loading branch information
mapleFU committed Dec 6, 2023
1 parent e2168e5 commit 1cc1f4c
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 15 deletions.
16 changes: 8 additions & 8 deletions cpp/examples/parquet/low_level_api/reader_writer2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,15 @@ int main(int argc, char** argv) {
static_cast<parquet::BoolWriter*>(rg_writer->column(col_id));
bool bool_value = ((i % 2) == 0) ? true : false;
bool_writer->WriteBatch(1, nullptr, nullptr, &bool_value);
buffered_values_estimate[col_id] = bool_writer->EstimatedBufferedValueBytes();
buffered_values_estimate[col_id] = bool_writer->estimated_buffered_value_bytes();

// Write the Int32 column
col_id++;
parquet::Int32Writer* int32_writer =
static_cast<parquet::Int32Writer*>(rg_writer->column(col_id));
int32_t int32_value = i;
int32_writer->WriteBatch(1, nullptr, nullptr, &int32_value);
buffered_values_estimate[col_id] = int32_writer->EstimatedBufferedValueBytes();
buffered_values_estimate[col_id] = int32_writer->estimated_buffered_value_bytes();

// Write the Int64 column. Each row has repeats twice.
col_id++;
Expand All @@ -119,7 +119,7 @@ int main(int argc, char** argv) {
int64_t int64_value2 = (2 * i + 1);
repetition_level = 1; // start of a new record
int64_writer->WriteBatch(1, &definition_level, &repetition_level, &int64_value2);
buffered_values_estimate[col_id] = int64_writer->EstimatedBufferedValueBytes();
buffered_values_estimate[col_id] = int64_writer->estimated_buffered_value_bytes();

// Write the INT96 column.
col_id++;
Expand All @@ -130,23 +130,23 @@ int main(int argc, char** argv) {
int96_value.value[1] = i + 1;
int96_value.value[2] = i + 2;
int96_writer->WriteBatch(1, nullptr, nullptr, &int96_value);
buffered_values_estimate[col_id] = int96_writer->EstimatedBufferedValueBytes();
buffered_values_estimate[col_id] = int96_writer->estimated_buffered_value_bytes();

// Write the Float column
col_id++;
parquet::FloatWriter* float_writer =
static_cast<parquet::FloatWriter*>(rg_writer->column(col_id));
float float_value = static_cast<float>(i) * 1.1f;
float_writer->WriteBatch(1, nullptr, nullptr, &float_value);
buffered_values_estimate[col_id] = float_writer->EstimatedBufferedValueBytes();
buffered_values_estimate[col_id] = float_writer->estimated_buffered_value_bytes();

// Write the Double column
col_id++;
parquet::DoubleWriter* double_writer =
static_cast<parquet::DoubleWriter*>(rg_writer->column(col_id));
double double_value = i * 1.1111111;
double_writer->WriteBatch(1, nullptr, nullptr, &double_value);
buffered_values_estimate[col_id] = double_writer->EstimatedBufferedValueBytes();
buffered_values_estimate[col_id] = double_writer->estimated_buffered_value_bytes();

// Write the ByteArray column. Make every alternate values NULL
col_id++;
Expand All @@ -166,7 +166,7 @@ int main(int argc, char** argv) {
int16_t definition_level = 0;
ba_writer->WriteBatch(1, &definition_level, nullptr, nullptr);
}
buffered_values_estimate[col_id] = ba_writer->EstimatedBufferedValueBytes();
buffered_values_estimate[col_id] = ba_writer->estimated_buffered_value_bytes();

// Write the FixedLengthByteArray column
col_id++;
Expand All @@ -178,7 +178,7 @@ int main(int argc, char** argv) {
flba_value.ptr = reinterpret_cast<const uint8_t*>(&flba[0]);

flba_writer->WriteBatch(1, nullptr, nullptr, &flba_value);
buffered_values_estimate[col_id] = flba_writer->EstimatedBufferedValueBytes();
buffered_values_estimate[col_id] = flba_writer->estimated_buffered_value_bytes();
}

// Close the RowGroupWriter
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1313,7 +1313,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
END_PARQUET_CATCH_EXCEPTIONS
}

int64_t EstimatedBufferedValueBytes() const override {
int64_t estimated_buffered_value_bytes() const override {
return current_encoder_->EstimatedDataEncodedSize();
}

Expand Down
6 changes: 3 additions & 3 deletions cpp/src/parquet/column_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ class PARQUET_EXPORT ColumnWriter {
/// total_bytes_written().
virtual int64_t total_compressed_bytes_written() const = 0;

/// \brief Estimated size of the values that are not written to a page yet.
virtual int64_t estimated_buffered_value_bytes() const = 0;

/// \brief The file-level writer properties
virtual const WriterProperties* properties() = 0;

Expand Down Expand Up @@ -239,9 +242,6 @@ class TypedColumnWriter : public ColumnWriter {
virtual void WriteBatchSpaced(int64_t num_values, const int16_t* def_levels,
const int16_t* rep_levels, const uint8_t* valid_bits,
int64_t valid_bits_offset, const T* values) = 0;

// Estimated size of the values that are not written to a page yet
virtual int64_t EstimatedBufferedValueBytes() const = 0;
};

using BoolWriter = TypedColumnWriter<BooleanType>;
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/parquet/stream_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ StreamWriter& StreamWriter::WriteVariableLength(const char* data_ptr,
writer->WriteBatch(kBatchSizeOne, &kDefLevelZero, &kRepLevelZero, nullptr);
}
if (max_row_group_size_ > 0) {
row_group_size_ += writer->EstimatedBufferedValueBytes();
row_group_size_ += writer->estimated_buffered_value_bytes();
}
return *this;
}
Expand All @@ -178,7 +178,7 @@ StreamWriter& StreamWriter::WriteFixedLength(const char* data_ptr, std::size_t d
writer->WriteBatch(kBatchSizeOne, &kDefLevelZero, &kRepLevelZero, nullptr);
}
if (max_row_group_size_ > 0) {
row_group_size_ += writer->EstimatedBufferedValueBytes();
row_group_size_ += writer->estimated_buffered_value_bytes();
}
return *this;
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/parquet/stream_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ class PARQUET_EXPORT StreamWriter {
writer->WriteBatch(kBatchSizeOne, &kDefLevelOne, &kRepLevelZero, &v);

if (max_row_group_size_ > 0) {
row_group_size_ += writer->EstimatedBufferedValueBytes();
row_group_size_ += writer->estimated_buffered_value_bytes();
}
return *this;
}
Expand Down

0 comments on commit 1cc1f4c

Please sign in to comment.