Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 18 additions & 20 deletions be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,19 @@ Status VerticalSegmentWriter::_probe_key_for_mow(
return Status::OK();
}

Status VerticalSegmentWriter::_finalize_column_writer_and_update_meta(size_t cid) {
RETURN_IF_ERROR(_column_writers[cid]->finish());
RETURN_IF_ERROR(_column_writers[cid]->write_data());

auto* column_meta = _column_writers[cid]->get_column_meta();
column_meta->set_compressed_data_bytes(
_column_writers[cid]->get_total_compressed_data_pages_bytes());
column_meta->set_uncompressed_data_bytes(
_column_writers[cid]->get_total_uncompressed_data_pages_bytes());
column_meta->set_raw_data_bytes(_column_writers[cid]->get_raw_data_bytes());
return Status::OK();
}

Status VerticalSegmentWriter::_partial_update_preconditions_check(size_t row_pos,
bool is_flexible_update) {
if (!_is_mow()) {
Expand Down Expand Up @@ -540,6 +553,7 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
}
RETURN_IF_ERROR(_column_writers[cid]->append(column->get_nullmap(), column->get_data(),
data.num_rows));
RETURN_IF_ERROR(_finalize_column_writer_and_update_meta(cid));
}

bool has_default_or_nullable = false;
Expand Down Expand Up @@ -629,6 +643,7 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
}
RETURN_IF_ERROR(_column_writers[cid]->append(column->get_nullmap(), column->get_data(),
data.num_rows));
RETURN_IF_ERROR(_finalize_column_writer_and_update_meta(cid));
}

_num_rows_updated += stats.num_rows_updated;
Expand Down Expand Up @@ -726,6 +741,7 @@ Status VerticalSegmentWriter::_append_block_with_flexible_partial_content(
RETURN_IF_ERROR(_column_writers[cid]->append(column->get_nullmap(), column->get_data(),
data.num_rows));
DCHECK(_column_writers[cid]->get_next_rowid() == _num_rows_written + data.num_rows);
RETURN_IF_ERROR(_finalize_column_writer_and_update_meta(cid));
}

// 5. genreate read plan
Expand Down Expand Up @@ -771,6 +787,7 @@ Status VerticalSegmentWriter::_append_block_with_flexible_partial_content(
RETURN_IF_ERROR(_column_writers[cid]->append(column->get_nullmap(), column->get_data(),
data.num_rows));
DCHECK(_column_writers[cid]->get_next_rowid() == _num_rows_written + data.num_rows);
RETURN_IF_ERROR(_finalize_column_writer_and_update_meta(cid));
}

_num_rows_updated += stats.num_rows_updated;
Expand Down Expand Up @@ -932,17 +949,6 @@ Status VerticalSegmentWriter::write_batch() {
RETURN_IF_ERROR(_append_block_with_partial_content(data, full_block));
}
}
for (auto& column_writer : _column_writers) {
RETURN_IF_ERROR(column_writer->finish());
RETURN_IF_ERROR(column_writer->write_data());

auto* column_meta = column_writer->get_column_meta();
column_meta->set_compressed_data_bytes(
column_writer->get_total_compressed_data_pages_bytes());
column_meta->set_uncompressed_data_bytes(
column_writer->get_total_uncompressed_data_pages_bytes());
column_meta->set_raw_data_bytes(column_writer->get_raw_data_bytes());
}
return Status::OK();
}
// Row column should be filled here when it's a directly write from memtable
Expand Down Expand Up @@ -992,15 +998,7 @@ Status VerticalSegmentWriter::write_batch() {
return Status::Error<DISK_REACH_CAPACITY_LIMIT>("disk {} exceed capacity limit.",
_data_dir->path_hash());
}
RETURN_IF_ERROR(_column_writers[cid]->finish());
RETURN_IF_ERROR(_column_writers[cid]->write_data());

auto* column_meta = _column_writers[cid]->get_column_meta();
column_meta->set_compressed_data_bytes(
_column_writers[cid]->get_total_compressed_data_pages_bytes());
column_meta->set_uncompressed_data_bytes(
_column_writers[cid]->get_total_uncompressed_data_pages_bytes());
column_meta->set_raw_data_bytes(_column_writers[cid]->get_raw_data_bytes());
RETURN_IF_ERROR(_finalize_column_writer_and_update_meta(cid));
}

for (auto& data : _batched_blocks) {
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset/segment_v2/vertical_segment_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ class VerticalSegmentWriter {
vectorized::IOlapColumnDataAccessor* seq_column, size_t num_rows, bool need_sort);
Status _generate_short_key_index(std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns,
size_t num_rows, const std::vector<size_t>& short_key_pos);
Status _finalize_column_writer_and_update_meta(size_t cid);

bool _is_mow();
bool _is_mow_with_cluster_key();

Expand Down
Loading