Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Optimize mem usage of partial update #14187

Merged
merged 9 commits into from
Dec 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 12 additions & 0 deletions be/src/storage/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,17 @@ Status DeltaWriter::_init() {
}
writer_context.referenced_column_ids.push_back(index);
}
int64_t average_row_size = _tablet->updates()->get_average_row_size();
if (average_row_size != 0) {
_memtable_buffer_row = config::write_buffer_size / average_row_size;
} else {
sevev marked this conversation as resolved.
Show resolved Hide resolved
// If tablet is a new created tablet and has no historical data, average_row_size is 0
// And we use schema size as average row size. If there are complex type(i.e. BITMAP/ARRAY) or varchar,
// we will consider it as 16 bytes.
average_row_size = _tablet->tablet_schema().estimate_row_size(16);
_memtable_buffer_row = config::write_buffer_size / average_row_size;
}

writer_context.partial_update_tablet_schema =
TabletSchema::create(_tablet->tablet_schema(), writer_context.referenced_column_ids);
auto sort_key_idxes = _tablet->tablet_schema().sort_key_idxes();
Expand Down Expand Up @@ -400,6 +411,7 @@ void DeltaWriter::_reset_mem_table() {
_mem_table = std::make_unique<MemTable>(_tablet->tablet_id(), &_vectorized_schema, _opt.slots,
_mem_table_sink.get(), "", _mem_tracker);
}
_mem_table->set_write_buffer_row(_memtable_buffer_row);
}

Status DeltaWriter::commit() {
Expand Down
2 changes: 2 additions & 0 deletions be/src/storage/delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ class DeltaWriter {
std::unique_ptr<FlushToken> _flush_token;
std::unique_ptr<ReplicateToken> _replicate_token;
bool _with_rollback_log;
// initial value is max value
size_t _memtable_buffer_row = -1;
};

} // namespace starrocks
8 changes: 7 additions & 1 deletion be/src/storage/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,12 @@ size_t MemTable::write_buffer_size() const {
return _chunk_bytes_usage + _aggregator_bytes_usage;
}

size_t MemTable::write_buffer_rows() const {
return _total_rows - _merged_rows;
}

bool MemTable::is_full() const {
return write_buffer_size() >= _max_buffer_size;
return write_buffer_size() >= _max_buffer_size || write_buffer_rows() >= _max_buffer_row;
}

bool MemTable::insert(const Chunk& chunk, const uint32_t* indexes, uint32_t from, uint32_t size) {
Expand Down Expand Up @@ -162,6 +166,7 @@ bool MemTable::insert(const Chunk& chunk, const uint32_t* indexes, uint32_t from
if (chunk.has_rows()) {
_chunk_memory_usage += chunk.memory_usage() * size / chunk.num_rows();
_chunk_bytes_usage += _chunk->bytes_usage(cur_row_count, size);
_total_rows += chunk.num_rows();
}

// if memtable is full, push it to the flush executor,
Expand Down Expand Up @@ -319,6 +324,7 @@ void MemTable::_aggregate(bool is_final) {
// impossible finish
DCHECK(!_aggregator->is_finish());
DCHECK(_aggregator->source_exhausted());
_merged_rows = _aggregator->merged_rows();

if (is_final) {
_result_chunk.reset();
Expand Down
7 changes: 7 additions & 0 deletions be/src/storage/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class MemTable {

// buffer memory usage for write segment
size_t write_buffer_size() const;
size_t write_buffer_rows() const;

// return true suggests caller should flush this memory table
bool insert(const Chunk& chunk, const uint32_t* indexes, uint32_t from, uint32_t size);
Expand All @@ -61,6 +62,8 @@ class MemTable {

bool is_full() const;

void set_write_buffer_row(size_t max_buffer_row) { _max_buffer_row = max_buffer_row; }

static VectorizedSchema convert_schema(const TabletSchema* tablet_schema,
const std::vector<SlotDescriptor*>* slot_descs);

Expand Down Expand Up @@ -103,6 +106,10 @@ class MemTable {
std::string _merge_condition;

int64_t _max_buffer_size = config::write_buffer_size;
// initial value is max size
size_t _max_buffer_row = -1;
size_t _total_rows = 0;
size_t _merged_rows = 0;

// memory statistic
MemTracker* _mem_tracker = nullptr;
Expand Down
18 changes: 18 additions & 0 deletions be/src/storage/rowset/rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,24 @@ Status Rowset::reload() {
return Status::OK();
}

Status Rowset::reload_segment(int32_t segment_id) {
DCHECK(_segments.size() > segment_id);
if (_segments.size() <= segment_id) {
LOG(WARNING) << "Error segment id: " << segment_id;
return Status::InternalError("Error segment id");
}
ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(_rowset_path));
size_t footer_size_hint = 16 * 1024;
std::string seg_path = segment_file_path(_rowset_path, rowset_id(), segment_id);
auto res = Segment::open(fs, seg_path, segment_id, _schema, &footer_size_hint);
if (!res.ok()) {
LOG(WARNING) << "Fail to open " << seg_path << ": " << res.status();
return res.status();
}
_segments[segment_id] = std::move(res).value();
return Status::OK();
}

StatusOr<int64_t> Rowset::estimate_compaction_segment_iterator_num() {
if (num_segments() == 0) {
return 0;
Expand Down
1 change: 1 addition & 0 deletions be/src/storage/rowset/rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ class Rowset : public std::enable_shared_from_this<Rowset> {

// reload this rowset after the underlying segment file is changed
Status reload();
Status reload_segment(int32_t segment_id);

const TabletSchema& schema() const { return *_schema; }
void set_schema(const TabletSchema* schema) { _schema = schema; }
Expand Down