diff --git a/be/src/exec/sink/vrow_distribution.cpp b/be/src/exec/sink/vrow_distribution.cpp index f14f80529694ec..130f21ad364cfe 100644 --- a/be/src/exec/sink/vrow_distribution.cpp +++ b/be/src/exec/sink/vrow_distribution.cpp @@ -141,13 +141,13 @@ Status VRowDistribution::automatic_create_partition() { Status status(Status::create(result.status)); VLOG_NOTICE << "automatic partition rpc end response " << result; if (result.status.status_code == TStatusCode::OK) { + RETURN_IF_ERROR(_create_partition_callback(_caller, &result)); // add new created partitions RETURN_IF_ERROR(_vpartition->add_partitions(result.partitions)); for (const auto& part : result.partitions) { _new_partition_ids.insert(part.id); VLOG_TRACE << "record new id: " << part.id; } - RETURN_IF_ERROR(_create_partition_callback(_caller, &result)); } // Record this request's elapsed time @@ -159,13 +159,13 @@ Status VRowDistribution::automatic_create_partition() { } // for reuse the same create callback of create-partition -static TCreatePartitionResult cast_as_create_result(TReplacePartitionResult& arg) { +static TCreatePartitionResult cast_as_create_result(const TReplacePartitionResult& arg) { TCreatePartitionResult result; result.status = arg.status; - result.nodes = std::move(arg.nodes); - result.partitions = std::move(arg.partitions); - result.tablets = std::move(arg.tablets); - result.slave_tablets = std::move(arg.slave_tablets); + result.nodes = arg.nodes; + result.partitions = arg.partitions; + result.tablets = arg.tablets; + result.slave_tablets = arg.slave_tablets; return result; } @@ -237,6 +237,10 @@ Status VRowDistribution::_replace_overwriting_partition() { Status status(Status::create(result.status)); VLOG_NOTICE << "auto detect replace partition result: " << result; if (result.status.status_code == TStatusCode::OK) { + // Reuse the function as the args' structure are same. It adds nodes/locations + // and waits for incremental_open before the new tablets become routable. + auto result_as_create = cast_as_create_result(result); + RETURN_IF_ERROR(_create_partition_callback(_caller, &result_as_create)); // record new partitions for (const auto& part : result.partitions) { _new_partition_ids.insert(part.id); @@ -244,9 +248,6 @@ Status VRowDistribution::_replace_overwriting_partition() { } // replace data in _partitions RETURN_IF_ERROR(_vpartition->replace_partitions(request_part_ids, result.partitions)); - // reuse the function as the args' structure are same. it add nodes/locations and incremental_open - auto result_as_create = cast_as_create_result(result); - RETURN_IF_ERROR(_create_partition_callback(_caller, &result_as_create)); } return status; diff --git a/be/test/exec/sink/vrow_distribution_test.cpp b/be/test/exec/sink/vrow_distribution_test.cpp index 36765eca44fd39..82c780a4e2c094 100644 --- a/be/test/exec/sink/vrow_distribution_test.cpp +++ b/be/test/exec/sink/vrow_distribution_test.cpp @@ -58,10 +58,16 @@ Status _noop_create_partition_callback(void*, TCreatePartitionResult*) { return Status::OK(); } +Status _delegated_create_partition_callback(void* caller, TCreatePartitionResult* result) { + return (*static_cast*>(caller))(result); +} + std::unique_ptr _build_vrow_distribution_harness( OperatorContext& ctx, const TOlapTableSchemaParam& tschema, const TOlapTablePartitionParam& tpartition, const TOlapTableLocationParam& tlocation, - TTupleId tablet_sink_tuple_id, int64_t txn_id) { + TTupleId tablet_sink_tuple_id, int64_t txn_id, + CreatePartitionCallback create_partition_callback = &_noop_create_partition_callback, + void* caller = nullptr) { auto h = std::make_unique(); h->schema = std::make_shared(); @@ -92,9 +98,9 @@ std::unique_ptr _build_vrow_distribution_harness( rctx.location = h->location.get(); rctx.vec_output_expr_ctxs = &h->output_expr_ctxs; rctx.schema = h->schema; - rctx.caller = nullptr; + rctx.caller = caller; rctx.write_single_replica = false; - rctx.create_partition_callback = &_noop_create_partition_callback; + rctx.create_partition_callback = create_partition_callback; h->row_distribution.init(rctx); st = h->row_distribution.open(h->output_row_desc.get()); @@ -358,8 +364,36 @@ TEST(VRowDistributionTest, ReplaceOverwritingPartitionInjectedRequestDedupAndRep tpartition.__set_overwrite_group_id(123); auto tlocation = sink_test_utils::build_location_param(); - auto h = _build_vrow_distribution_harness(ctx, tschema, tpartition, tlocation, - tablet_sink_tuple_id, txn_id); + VRowDistributionHarness* harness = nullptr; + bool create_callback_called = false; + std::function create_callback = + [&](TCreatePartitionResult* result) { + create_callback_called = true; + EXPECT_EQ(result->partitions.size(), 2); + + auto old_partition_block = ColumnHelper::create_block({1}); + VOlapTablePartition* old_part = nullptr; + harness->vpartition->find_partition(&old_partition_block, 0, old_part); + if (old_part == nullptr) { + return Status::InternalError("old partition is not found"); + } + EXPECT_EQ(old_part->id, 1); + + auto another_old_partition_block = ColumnHelper::create_block({25}); + VOlapTablePartition* another_old_part = nullptr; + harness->vpartition->find_partition(&another_old_partition_block, 0, + another_old_part); + if (another_old_part == nullptr) { + return Status::InternalError("another old partition is not found"); + } + EXPECT_EQ(another_old_part->id, 2); + return Status::OK(); + }; + + auto h = _build_vrow_distribution_harness( + ctx, tschema, tpartition, tlocation, tablet_sink_tuple_id, txn_id, + &_delegated_create_partition_callback, &create_callback); + harness = h.get(); doris::config::enable_debug_points = true; doris::DebugPoints::instance()->clear(); @@ -424,6 +458,7 @@ TEST(VRowDistributionTest, ReplaceOverwritingPartitionInjectedRequestDedupAndRep row_part_tablet_ids, rows_stat_val); EXPECT_TRUE(st.ok()) << st.to_string(); EXPECT_EQ(injected_times, 1); + EXPECT_TRUE(create_callback_called); ASSERT_EQ(row_part_tablet_ids.size(), 1); ASSERT_EQ(row_part_tablet_ids[0].partition_ids.size(), 2);