From 2df40fb95e9f996387a277a93e7261db4e28d271 Mon Sep 17 00:00:00 2001 From: hui lai Date: Thu, 14 May 2026 23:38:09 +0800 Subject: [PATCH] [fix](insert overwrite) delay overwrite partition routing until incremental open (#63209) ### What problem does this PR solve? Problem Summary: In auto-detect insert overwrite, BE sender could publish newly replaced temporary partitions to local row routing before incremental open finished on target BEs. The race was: 1. One sender calls FE `replacePartition` and receives new temporary partition/tablet metadata. 2. The sender records the new partition id and replaces local `_vpartition` routing first. 3. Another concurrent batch can then route rows to the new tablet. 4. The first sender has not finished incremental open yet, so the target BE may not have created the delta writer for that tablet. 5. The target BE returns `unknown tablet to append data`. This PR makes the sender finish `_create_partition_callback`, including incremental open/open_wait, before publishing the new partition/tablet to local routing and marking the new partition as handled. --- be/src/exec/sink/vrow_distribution.cpp | 19 +++++---- be/test/exec/sink/vrow_distribution_test.cpp | 45 +++++++++++++++++--- 2 files changed, 50 insertions(+), 14 deletions(-) diff --git a/be/src/exec/sink/vrow_distribution.cpp b/be/src/exec/sink/vrow_distribution.cpp index f14f80529694ec..130f21ad364cfe 100644 --- a/be/src/exec/sink/vrow_distribution.cpp +++ b/be/src/exec/sink/vrow_distribution.cpp @@ -141,13 +141,13 @@ Status VRowDistribution::automatic_create_partition() { Status status(Status::create(result.status)); VLOG_NOTICE << "automatic partition rpc end response " << result; if (result.status.status_code == TStatusCode::OK) { + RETURN_IF_ERROR(_create_partition_callback(_caller, &result)); // add new created partitions RETURN_IF_ERROR(_vpartition->add_partitions(result.partitions)); for (const auto& part : result.partitions) { _new_partition_ids.insert(part.id); VLOG_TRACE << "record new id: " << part.id; } - RETURN_IF_ERROR(_create_partition_callback(_caller, &result)); } // Record this request's elapsed time @@ -159,13 +159,13 @@ Status VRowDistribution::automatic_create_partition() { } // for reuse the same create callback of create-partition -static TCreatePartitionResult cast_as_create_result(TReplacePartitionResult& arg) { +static TCreatePartitionResult cast_as_create_result(const TReplacePartitionResult& arg) { TCreatePartitionResult result; result.status = arg.status; - result.nodes = std::move(arg.nodes); - result.partitions = std::move(arg.partitions); - result.tablets = std::move(arg.tablets); - result.slave_tablets = std::move(arg.slave_tablets); + result.nodes = arg.nodes; + result.partitions = arg.partitions; + result.tablets = arg.tablets; + result.slave_tablets = arg.slave_tablets; return result; } @@ -237,6 +237,10 @@ Status VRowDistribution::_replace_overwriting_partition() { Status status(Status::create(result.status)); VLOG_NOTICE << "auto detect replace partition result: " << result; if (result.status.status_code == TStatusCode::OK) { + // Reuse the function as the args' structure are same. It adds nodes/locations + // and waits for incremental_open before the new tablets become routable. + auto result_as_create = cast_as_create_result(result); + RETURN_IF_ERROR(_create_partition_callback(_caller, &result_as_create)); // record new partitions for (const auto& part : result.partitions) { _new_partition_ids.insert(part.id); @@ -244,9 +248,6 @@ Status VRowDistribution::_replace_overwriting_partition() { } // replace data in _partitions RETURN_IF_ERROR(_vpartition->replace_partitions(request_part_ids, result.partitions)); - // reuse the function as the args' structure are same. it add nodes/locations and incremental_open - auto result_as_create = cast_as_create_result(result); - RETURN_IF_ERROR(_create_partition_callback(_caller, &result_as_create)); } return status; diff --git a/be/test/exec/sink/vrow_distribution_test.cpp b/be/test/exec/sink/vrow_distribution_test.cpp index 36765eca44fd39..82c780a4e2c094 100644 --- a/be/test/exec/sink/vrow_distribution_test.cpp +++ b/be/test/exec/sink/vrow_distribution_test.cpp @@ -58,10 +58,16 @@ Status _noop_create_partition_callback(void*, TCreatePartitionResult*) { return Status::OK(); } +Status _delegated_create_partition_callback(void* caller, TCreatePartitionResult* result) { + return (*static_cast*>(caller))(result); +} + std::unique_ptr _build_vrow_distribution_harness( OperatorContext& ctx, const TOlapTableSchemaParam& tschema, const TOlapTablePartitionParam& tpartition, const TOlapTableLocationParam& tlocation, - TTupleId tablet_sink_tuple_id, int64_t txn_id) { + TTupleId tablet_sink_tuple_id, int64_t txn_id, + CreatePartitionCallback create_partition_callback = &_noop_create_partition_callback, + void* caller = nullptr) { auto h = std::make_unique(); h->schema = std::make_shared(); @@ -92,9 +98,9 @@ std::unique_ptr _build_vrow_distribution_harness( rctx.location = h->location.get(); rctx.vec_output_expr_ctxs = &h->output_expr_ctxs; rctx.schema = h->schema; - rctx.caller = nullptr; + rctx.caller = caller; rctx.write_single_replica = false; - rctx.create_partition_callback = &_noop_create_partition_callback; + rctx.create_partition_callback = create_partition_callback; h->row_distribution.init(rctx); st = h->row_distribution.open(h->output_row_desc.get()); @@ -358,8 +364,36 @@ TEST(VRowDistributionTest, ReplaceOverwritingPartitionInjectedRequestDedupAndRep tpartition.__set_overwrite_group_id(123); auto tlocation = sink_test_utils::build_location_param(); - auto h = _build_vrow_distribution_harness(ctx, tschema, tpartition, tlocation, - tablet_sink_tuple_id, txn_id); + VRowDistributionHarness* harness = nullptr; + bool create_callback_called = false; + std::function create_callback = + [&](TCreatePartitionResult* result) { + create_callback_called = true; + EXPECT_EQ(result->partitions.size(), 2); + + auto old_partition_block = ColumnHelper::create_block({1}); + VOlapTablePartition* old_part = nullptr; + harness->vpartition->find_partition(&old_partition_block, 0, old_part); + if (old_part == nullptr) { + return Status::InternalError("old partition is not found"); + } + EXPECT_EQ(old_part->id, 1); + + auto another_old_partition_block = ColumnHelper::create_block({25}); + VOlapTablePartition* another_old_part = nullptr; + harness->vpartition->find_partition(&another_old_partition_block, 0, + another_old_part); + if (another_old_part == nullptr) { + return Status::InternalError("another old partition is not found"); + } + EXPECT_EQ(another_old_part->id, 2); + return Status::OK(); + }; + + auto h = _build_vrow_distribution_harness( + ctx, tschema, tpartition, tlocation, tablet_sink_tuple_id, txn_id, + &_delegated_create_partition_callback, &create_callback); + harness = h.get(); doris::config::enable_debug_points = true; doris::DebugPoints::instance()->clear(); @@ -424,6 +458,7 @@ TEST(VRowDistributionTest, ReplaceOverwritingPartitionInjectedRequestDedupAndRep row_part_tablet_ids, rows_stat_val); EXPECT_TRUE(st.ok()) << st.to_string(); EXPECT_EQ(injected_times, 1); + EXPECT_TRUE(create_callback_called); ASSERT_EQ(row_part_tablet_ids.size(), 1); ASSERT_EQ(row_part_tablet_ids[0].partition_ids.size(), 2);