Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,7 @@ DEFINE_mInt32(memory_gc_sleep_time_ms, "500");

// max write buffer size before flush, default 200MB
DEFINE_mInt64(write_buffer_size, "209715200");
DEFINE_mBool(enable_adaptive_write_buffer_size, "true");
// max buffer size used in memtable for the aggregated table, default 400MB
DEFINE_mInt64(write_buffer_size_for_agg, "104857600");
DEFINE_mInt64(min_write_buffer_size_for_partial_update, "1048576");
Expand Down
1 change: 1 addition & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,7 @@ DECLARE_mInt32(memory_gc_sleep_time_ms);

// max write buffer size before flush, default 200MB
DECLARE_mInt64(write_buffer_size);
DECLARE_mBool(enable_adaptive_write_buffer_size);
// max buffer size used in memtable for the aggregated table, default 400MB
DECLARE_mInt64(write_buffer_size_for_agg);

Expand Down
25 changes: 24 additions & 1 deletion be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ MemTable::MemTable(int64_t tablet_id, std::shared_ptr<TabletSchema> tablet_schem
_init_columns_offset_by_slot_descs(slot_descs, tuple_desc);
// TODO: Support ZOrderComparator in the future
_row_in_blocks = std::make_unique<DorisVector<std::shared_ptr<RowInBlock>>>();
_load_mem_limit = MemInfo::mem_limit() * config::load_process_max_memory_limit_percent / 100;
}

void MemTable::_init_columns_offset_by_slot_descs(const std::vector<SlotDescriptor*>* slot_descs,
Expand Down Expand Up @@ -656,7 +657,7 @@ void MemTable::shrink_memtable_by_agg() {

bool MemTable::need_flush() const {
DBUG_EXECUTE_IF("MemTable.need_flush", { return true; });
auto max_size = config::write_buffer_size;
auto max_size = _adaptive_write_buffer_size();
if (_partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) {
auto update_columns_size = _num_columns;
auto min_buffer_size = config::min_write_buffer_size_for_partial_update;
Expand All @@ -671,6 +672,28 @@ bool MemTable::need_flush() const {
return false;
}

int64_t MemTable::_adaptive_write_buffer_size() const {
if (!config::enable_adaptive_write_buffer_size) [[unlikely]] {
return config::write_buffer_size;
}
auto* memtable_limiter = ExecEnv::GetInstance()->memtable_memory_limiter();
int64_t memtable_mem =
(memtable_limiter != nullptr && memtable_limiter->mem_tracker() != nullptr)
? memtable_limiter->mem_tracker()->consumption()
: 0;
int64_t factor = 4;
// Memory usage intervals:
// (80 %, 100 %] → 1× buffer
// (50 %, 80 %] → 2× buffer
// [0 %, 50 %] → 4× buffer
if (memtable_mem > (_load_mem_limit * 4) / 5) { // > 80 %
factor = 1;
} else if (memtable_mem > _load_mem_limit / 2) { // > 50 %
factor = 2;
}
return config::write_buffer_size * factor;
}

bool MemTable::need_agg() const {
if (_keys_type == KeysType::AGG_KEYS) {
auto max_size = _last_agg_pos + config::write_buffer_size_for_agg;
Expand Down
6 changes: 6 additions & 0 deletions be/src/olap/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <stddef.h>
#include <stdint.h>

#include <cstdint>
#include <cstring>
#include <functional>
#include <memory>
Expand Down Expand Up @@ -204,6 +205,8 @@ class MemTable {

void update_mem_type(MemType memtype) { _mem_type = memtype; }

int64_t raw_rows() { return _stat.raw_rows.load(); }

private:
// for vectorized
template <bool has_skip_bitmap_col>
Expand All @@ -213,6 +216,8 @@ class MemTable {
// Used to wrapped by to_block to do exception handle logic
Status _to_block(std::unique_ptr<vectorized::Block>* res);

int64_t _adaptive_write_buffer_size() const;

private:
std::atomic<MemType> _mem_type;
int64_t _tablet_id;
Expand All @@ -231,6 +236,7 @@ class MemTable {
// In this way, we can make MemTable::memory_usage() to be more accurate, and eventually
// reduce the number of segment files that are generated by current load
vectorized::Arena _arena;
int64_t _load_mem_limit = -1;

void _init_columns_offset_by_slot_descs(const std::vector<SlotDescriptor*>* slot_descs,
const TupleDescriptor* tuple_desc);
Expand Down
23 changes: 18 additions & 5 deletions be/src/olap/memtable_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,14 @@ Status MemTableWriter::write(const vectorized::Block* block,
_req.tablet_id, _req.load_id.hi(), _req.load_id.lo());
}

// Flush and reset memtable if it is raw rows great than int32_t.
int64_t raw_rows = _mem_table->raw_rows();
DBUG_EXECUTE_IF("MemTableWriter.too_many_raws",
{ raw_rows = std::numeric_limits<int32_t>::max(); });
if (raw_rows + row_idxs.size() > std::numeric_limits<int32_t>::max()) {
RETURN_IF_ERROR(_flush_memtable());
}

_total_received_rows += row_idxs.size();
auto st = _mem_table->insert(block, row_idxs);

Expand All @@ -129,16 +137,21 @@ Status MemTableWriter::write(const vectorized::Block* block,
_mem_table->shrink_memtable_by_agg();
}
if (UNLIKELY(_mem_table->need_flush())) {
auto s = _flush_memtable_async();
_reset_mem_table();
if (UNLIKELY(!s.ok())) {
return s;
}
RETURN_IF_ERROR(_flush_memtable());
}

return Status::OK();
}

Status MemTableWriter::_flush_memtable() {
auto s = _flush_memtable_async();
_reset_mem_table();
if (UNLIKELY(!s.ok())) {
return s;
}
return Status::OK();
}

Status MemTableWriter::_flush_memtable_async() {
DCHECK(_flush_token != nullptr);
std::shared_ptr<MemTable> memtable;
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/memtable_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <cstdint>
#include <memory>
#include <mutex>
#include <random>
#include <vector>

#include "common/status.h"
Expand Down Expand Up @@ -109,6 +110,7 @@ class MemTableWriter {
}

private:
Status _flush_memtable();
// push a full memtable to flush executor
Status _flush_memtable_async();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_memtable_too_many_rows", "nonConcurrent") {
GetDebugPoint().clearDebugPointsForAllBEs()
def testTable = "test_memtable_too_many_rows"
sql """ DROP TABLE IF EXISTS ${testTable}"""

sql """
CREATE TABLE IF NOT EXISTS `${testTable}` (
`id` BIGINT NOT NULL,
`value` int(11) NOT NULL
) ENGINE=OLAP
DUPLICATE KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
)
"""

def debugPoint = "MemTableWriter.too_many_raws"
try {
GetDebugPoint().enableDebugPointForAllBEs(debugPoint)
sql "insert into ${testTable} values(1,1)"
def res = sql "select * from ${testTable}"
logger.info("res: " + res.size())
assertTrue(res.size() == 1)
} catch (Exception e){
logger.info(e.getMessage())
assertTrue(e.getMessage().contains("write memtable too many rows fail"))
} finally {
GetDebugPoint().disableDebugPointForAllBEs(debugPoint)
}
}
Loading