From e7e40fc056c0bc6bf9d2dbb0b7244393495a8e04 Mon Sep 17 00:00:00 2001 From: Hu Shenggang Date: Thu, 14 May 2026 15:35:18 +0800 Subject: [PATCH] Fix adaptive load batch sizing --- be/src/exec/scan/file_scanner.cpp | 2 ++ be/src/exec/spill/spill_file_writer.cpp | 1 + be/src/storage/compaction/compaction.cpp | 6 ++++++ be/src/storage/rowset/segcompaction.cpp | 6 ++++++ regression-test/conf/regression-conf.groovy | 8 ++++---- .../unique_with_mow_c_p0/test_compact_with_seq2.groovy | 4 ++++ 6 files changed, 23 insertions(+), 4 deletions(-) diff --git a/be/src/exec/scan/file_scanner.cpp b/be/src/exec/scan/file_scanner.cpp index d97f5a9ae95441..c443a44f9a71bb 100644 --- a/be/src/exec/scan/file_scanner.cpp +++ b/be/src/exec/scan/file_scanner.cpp @@ -610,7 +610,9 @@ Status FileScanner::_get_block_wrapped(RuntimeState* state, Block* block, bool* RETURN_IF_ERROR(_convert_to_output_block(block)); // Truncate char columns or varchar columns if size is smaller than file columns // or not found in the file column schema. + _update_adaptive_batch_size_before_truncate(*block); RETURN_IF_ERROR(_truncate_char_or_varchar_columns(block)); + _update_adaptive_batch_size_after_truncate(*block); } } break; diff --git a/be/src/exec/spill/spill_file_writer.cpp b/be/src/exec/spill/spill_file_writer.cpp index ced813e51e3080..9e0a3d3775e108 100644 --- a/be/src/exec/spill/spill_file_writer.cpp +++ b/be/src/exec/spill/spill_file_writer.cpp @@ -146,6 +146,7 @@ Status SpillFileWriter::write_block(RuntimeState* state, const Block& block) { // accounting would be out of sync. auto spill_file = _spill_file_wptr.lock(); if (!spill_file) { + DCHECK(false); return Status::Error( "SpillFile has been destroyed, cannot write more data, spill_dir={}", _spill_dir); } diff --git a/be/src/storage/compaction/compaction.cpp b/be/src/storage/compaction/compaction.cpp index 6e553bfb901dfe..8841c447b9ca2d 100644 --- a/be/src/storage/compaction/compaction.cpp +++ b/be/src/storage/compaction/compaction.cpp @@ -375,6 +375,12 @@ void Compaction::set_delete_predicate_for_output_rowset() { } int64_t Compaction::get_avg_segment_rows() { + DBUG_EXECUTE_IF("Compaction::get_avg_segment_rows.return", { + auto rows = dp->param("rows", 10); + LOG(INFO) << "Compaction::get_avg_segment_rows.return, rows=" << rows + << ", tablet_id=" << _tablet->tablet_id(); + return std::max(1, rows); + }); // take care of empty rowset // input_rowsets_size is total disk_size of input_rowset, this size is the // final size after codec and compress, so expect dest segment file size diff --git a/be/src/storage/rowset/segcompaction.cpp b/be/src/storage/rowset/segcompaction.cpp index e94fabee96ec16..eeea6efe3b6ff6 100644 --- a/be/src/storage/rowset/segcompaction.cpp +++ b/be/src/storage/rowset/segcompaction.cpp @@ -23,6 +23,7 @@ #include #include +#include #include #include #include @@ -30,6 +31,7 @@ #include #include #include +#include #include #include "absl/strings/substitute.h" @@ -381,6 +383,10 @@ Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt void SegcompactionWorker::compact_segments(SegCompactionCandidatesSharedPtr segments) { Status status = Status::OK(); if (_is_compacting_state_mutable.exchange(false)) { + DBUG_EXECUTE_IF("SegcompactionWorker::compact_segments.sleep", { + auto ms = dp->param("ms", 1000); + std::this_thread::sleep_for(std::chrono::milliseconds(ms)); + }); status = _do_compact_segments(segments); } else { // note: be aware that _writer maybe released when the task is cancelled diff --git a/regression-test/conf/regression-conf.groovy b/regression-test/conf/regression-conf.groovy index 21f4bd047d8a76..18770ce2168853 100644 --- a/regression-test/conf/regression-conf.groovy +++ b/regression-test/conf/regression-conf.groovy @@ -24,8 +24,8 @@ defaultDb = "regression_test" // init cmd like: select @@session.tx_read_only // at each time we connect. // add allowLoadLocalInfile so that the jdbc can execute mysql load data from client. -jdbcUrl = "jdbc:mysql://127.0.0.1:9030/?useLocalSessionState=true&allowLoadLocalInfile=true&zeroDateTimeBehavior=round" -targetJdbcUrl = "jdbc:mysql://127.0.0.1:9030/?useLocalSessionState=true&allowLoadLocalInfile=true&zeroDateTimeBehavior=round" +jdbcUrl = "jdbc:mysql://127.0.0.1:49546/?useLocalSessionState=true&allowLoadLocalInfile=true&zeroDateTimeBehavior=round" +targetJdbcUrl = "jdbc:mysql://127.0.0.1:49546/?useLocalSessionState=true&allowLoadLocalInfile=true&zeroDateTimeBehavior=round" jdbcUser = "root" jdbcPassword = "" @@ -35,7 +35,7 @@ syncerAddress = "127.0.0.1:9190" feSyncerUser = "root" feSyncerPassword = "" -feHttpAddress = "127.0.0.1:8030" +feHttpAddress = "127.0.0.1:48536" feHttpUser = "root" feHttpPassword = "" @@ -125,7 +125,7 @@ pg_14_port=5442 oracle_11_port=1521 sqlserver_2022_port=1433 clickhouse_22_port=8123 -doris_port=9030 +doris_port=49546 mariadb_10_port=3326 db2_11_port=50000 oceanbase_port=2881 diff --git a/regression-test/suites/unique_with_mow_c_p0/test_compact_with_seq2.groovy b/regression-test/suites/unique_with_mow_c_p0/test_compact_with_seq2.groovy index 7fc9d2a48fdcbd..a3fe8a5e37cc94 100644 --- a/regression-test/suites/unique_with_mow_c_p0/test_compact_with_seq2.groovy +++ b/regression-test/suites/unique_with_mow_c_p0/test_compact_with_seq2.groovy @@ -61,6 +61,9 @@ suite("test_compact_with_seq2", "nonConcurrent") { for (def tablet in tablets) { String tablet_id = tablet.TabletId def backend_id = tablet.BackendId + GetDebugPoint().enableDebugPointForAllBEs( + "SizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets", + [tablet_id: tablet_id.toLong(), start_version: 2, end_version: 11]) def (code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) @@ -152,6 +155,7 @@ suite("test_compact_with_seq2", "nonConcurrent") { assertEquals(0, json.NumberFilteredRows) } } + GetDebugPoint().enableDebugPointForAllBEs("SegcompactionWorker::compact_segments.sleep", [ms: 500]) streamLoad { table "${tableName}" set 'column_separator', ','