diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 63055c0392f834..ab006717a40680 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -697,6 +697,7 @@ DEFINE_mInt32(memory_gc_sleep_time_ms, "500"); // max write buffer size before flush, default 200MB DEFINE_mInt64(write_buffer_size, "209715200"); +DEFINE_mBool(enable_adaptive_write_buffer_size, "true"); // max buffer size used in memtable for the aggregated table, default 400MB DEFINE_mInt64(write_buffer_size_for_agg, "104857600"); DEFINE_mInt64(min_write_buffer_size_for_partial_update, "1048576"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 930367cdf17dc3..68791713fcb840 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -759,6 +759,7 @@ DECLARE_mInt32(memory_gc_sleep_time_ms); // max write buffer size before flush, default 200MB DECLARE_mInt64(write_buffer_size); +DECLARE_mBool(enable_adaptive_write_buffer_size); // max buffer size used in memtable for the aggregated table, default 400MB DECLARE_mInt64(write_buffer_size_for_agg); diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index a5481d39a4f4e0..3a59a3e60c4224 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -81,6 +81,7 @@ MemTable::MemTable(int64_t tablet_id, std::shared_ptr tablet_schem _init_columns_offset_by_slot_descs(slot_descs, tuple_desc); // TODO: Support ZOrderComparator in the future _row_in_blocks = std::make_unique>>(); + _load_mem_limit = MemInfo::mem_limit() * config::load_process_max_memory_limit_percent / 100; } void MemTable::_init_columns_offset_by_slot_descs(const std::vector* slot_descs, @@ -656,7 +657,7 @@ void MemTable::shrink_memtable_by_agg() { bool MemTable::need_flush() const { DBUG_EXECUTE_IF("MemTable.need_flush", { return true; }); - auto max_size = config::write_buffer_size; + auto max_size = _adaptive_write_buffer_size(); if (_partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) { auto update_columns_size = _num_columns; auto min_buffer_size = config::min_write_buffer_size_for_partial_update; @@ -671,6 +672,28 @@ bool MemTable::need_flush() const { return false; } +int64_t MemTable::_adaptive_write_buffer_size() const { + if (!config::enable_adaptive_write_buffer_size) [[unlikely]] { + return config::write_buffer_size; + } + auto* memtable_limiter = ExecEnv::GetInstance()->memtable_memory_limiter(); + int64_t memtable_mem = + (memtable_limiter != nullptr && memtable_limiter->mem_tracker() != nullptr) + ? memtable_limiter->mem_tracker()->consumption() + : 0; + int64_t factor = 4; + // Memory usage intervals: + // (80 %, 100 %] → 1× buffer + // (50 %, 80 %] → 2× buffer + // [0 %, 50 %] → 4× buffer + if (memtable_mem > (_load_mem_limit * 4) / 5) { // > 80 % + factor = 1; + } else if (memtable_mem > _load_mem_limit / 2) { // > 50 % + factor = 2; + } + return config::write_buffer_size * factor; +} + bool MemTable::need_agg() const { if (_keys_type == KeysType::AGG_KEYS) { auto max_size = _last_agg_pos + config::write_buffer_size_for_agg; diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index 5cd70e812a9626..47426e354d0c1e 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -20,6 +20,7 @@ #include #include +#include #include #include #include @@ -204,6 +205,8 @@ class MemTable { void update_mem_type(MemType memtype) { _mem_type = memtype; } + int64_t raw_rows() { return _stat.raw_rows.load(); } + private: // for vectorized template @@ -213,6 +216,8 @@ class MemTable { // Used to wrapped by to_block to do exception handle logic Status _to_block(std::unique_ptr* res); + int64_t _adaptive_write_buffer_size() const; + private: std::atomic _mem_type; int64_t _tablet_id; @@ -231,6 +236,7 @@ class MemTable { // In this way, we can make MemTable::memory_usage() to be more accurate, and eventually // reduce the number of segment files that are generated by current load vectorized::Arena _arena; + int64_t _load_mem_limit = -1; void _init_columns_offset_by_slot_descs(const std::vector* slot_descs, const TupleDescriptor* tuple_desc); diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp index 808eaf9eec8b5e..057ede302850bd 100644 --- a/be/src/olap/memtable_writer.cpp +++ b/be/src/olap/memtable_writer.cpp @@ -105,6 +105,14 @@ Status MemTableWriter::write(const vectorized::Block* block, _req.tablet_id, _req.load_id.hi(), _req.load_id.lo()); } + // Flush and reset memtable if it is raw rows great than int32_t. + int64_t raw_rows = _mem_table->raw_rows(); + DBUG_EXECUTE_IF("MemTableWriter.too_many_raws", + { raw_rows = std::numeric_limits::max(); }); + if (raw_rows + row_idxs.size() > std::numeric_limits::max()) { + RETURN_IF_ERROR(_flush_memtable()); + } + _total_received_rows += row_idxs.size(); auto st = _mem_table->insert(block, row_idxs); @@ -129,16 +137,21 @@ Status MemTableWriter::write(const vectorized::Block* block, _mem_table->shrink_memtable_by_agg(); } if (UNLIKELY(_mem_table->need_flush())) { - auto s = _flush_memtable_async(); - _reset_mem_table(); - if (UNLIKELY(!s.ok())) { - return s; - } + RETURN_IF_ERROR(_flush_memtable()); } return Status::OK(); } +Status MemTableWriter::_flush_memtable() { + auto s = _flush_memtable_async(); + _reset_mem_table(); + if (UNLIKELY(!s.ok())) { + return s; + } + return Status::OK(); +} + Status MemTableWriter::_flush_memtable_async() { DCHECK(_flush_token != nullptr); std::shared_ptr memtable; diff --git a/be/src/olap/memtable_writer.h b/be/src/olap/memtable_writer.h index aa1fd4025ed767..7465e79a452218 100644 --- a/be/src/olap/memtable_writer.h +++ b/be/src/olap/memtable_writer.h @@ -25,6 +25,7 @@ #include #include #include +#include #include #include "common/status.h" @@ -109,6 +110,7 @@ class MemTableWriter { } private: + Status _flush_memtable(); // push a full memtable to flush executor Status _flush_memtable_async(); diff --git a/regression-test/suites/fault_injection_p0/memtable/test_memtable_too_many_rows.groovy b/regression-test/suites/fault_injection_p0/memtable/test_memtable_too_many_rows.groovy new file mode 100644 index 00000000000000..385168647279e9 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/memtable/test_memtable_too_many_rows.groovy @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_memtable_too_many_rows", "nonConcurrent") { + GetDebugPoint().clearDebugPointsForAllBEs() + def testTable = "test_memtable_too_many_rows" + sql """ DROP TABLE IF EXISTS ${testTable}""" + + sql """ + CREATE TABLE IF NOT EXISTS `${testTable}` ( + `id` BIGINT NOT NULL, + `value` int(11) NOT NULL + ) ENGINE=OLAP + DUPLICATE KEY(`id`) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ) + """ + + def debugPoint = "MemTableWriter.too_many_raws" + try { + GetDebugPoint().enableDebugPointForAllBEs(debugPoint) + sql "insert into ${testTable} values(1,1)" + def res = sql "select * from ${testTable}" + logger.info("res: " + res.size()) + assertTrue(res.size() == 1) + } catch (Exception e){ + logger.info(e.getMessage()) + assertTrue(e.getMessage().contains("write memtable too many rows fail")) + } finally { + GetDebugPoint().disableDebugPointForAllBEs(debugPoint) + } +} \ No newline at end of file