diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitBlockSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitBlockSink.java index a45b65ca19aea3..0b1ef04496fd98 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitBlockSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitBlockSink.java @@ -20,12 +20,18 @@ import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.common.UserException; import org.apache.doris.thrift.TDataSink; import org.apache.doris.thrift.TDataSinkType; import org.apache.doris.thrift.TGroupCommitMode; +import org.apache.doris.thrift.TOlapTableLocationParam; +import org.apache.doris.thrift.TOlapTableSink; +import org.apache.doris.thrift.TTabletLocation; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import java.util.Arrays; import java.util.List; import java.util.Map; @@ -63,6 +69,21 @@ protected TDataSink toThrift() { return tDataSink; } + // BE-side GroupCommitBlockSinkOperatorX::init does not consume location/slave_location + // (it only reads tuple_id/schema/db_id/table_id/partition/group_commit_mode/load_id/ + // max_filter_ratio). Skip the per-tablet replica enumeration in createLocation, which + // is the dominant FE CPU cost under high-concurrency group-commit stream load. + // We still return placeholder TOlapTableLocationParam objects because + // TOlapTableSink.location is a required thrift field. + @Override + protected List initLocationParams(TOlapTableSink tSink) throws UserException { + TOlapTableLocationParam locationParam = new TOlapTableLocationParam(); + TOlapTableLocationParam slaveLocationParam = new TOlapTableLocationParam(); + locationParam.setTablets(Lists.newArrayList()); + slaveLocationParam.setTablets(Lists.newArrayList()); + return Arrays.asList(locationParam, slaveLocationParam); + } + public static TGroupCommitMode parseGroupCommit(String groupCommit) { if (groupCommit == null) { return null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java index 4d95fababc149f..e3a9954b627f05 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -236,7 +236,7 @@ public void init(TUniqueId loadId, long txnId, long dbId, long loadChannelTimeou partition.setTabletVersionGapBackends(gapBackends); } } - tOlapTableLocationParams = createLocation(tSink.getDbId(), dstTable); + tOlapTableLocationParams = initLocationParams(tSink); tSink.setTableId(dstTable.getId()); tSink.setTupleId(tupleDescriptor.getId().asInt()); @@ -294,7 +294,7 @@ public void init(TUniqueId loadId, long txnId, long dbId, long loadChannelTimeou partition.setTabletVersionGapBackends(gapBackends); } } - tOlapTableLocationParams = createLocation(tSink.getDbId(), dstTable); + tOlapTableLocationParams = initLocationParams(tSink); tSink.setTableId(dstTable.getId()); tSink.setTupleId(tupleDescriptor.getId().asInt()); @@ -737,6 +737,15 @@ public static void setPartitionKeys(TOlapTablePartition tPartition, PartitionIte } } + // Hook for subclasses to control how the tablet location params are populated. + // Default behavior computes the full tablet -> backend mapping via createLocation, + // which under high-concurrency stream load on large tables is the dominant FE CPU + // cost. Subclasses whose BE counterpart does not consume TOlapTableSink.location + // (e.g. GroupCommitBlockSink) can override this hook to skip that work. + protected List initLocationParams(TOlapTableSink tSink) throws UserException { + return createLocation(tSink.getDbId(), dstTable); + } + public List createDummyLocation(OlapTable table) throws UserException { TOlapTableLocationParam locationParam = new TOlapTableLocationParam(); TOlapTableLocationParam slaveLocationParam = new TOlapTableLocationParam(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/GroupCommitBlockSinkTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/GroupCommitBlockSinkTest.java new file mode 100644 index 00000000000000..281afd9b59ecf9 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/GroupCommitBlockSinkTest.java @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.common.UserException; +import org.apache.doris.thrift.TGroupCommitMode; +import org.apache.doris.thrift.TOlapTableLocationParam; +import org.apache.doris.thrift.TOlapTableSink; + +import com.google.common.collect.Lists; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.List; + +public class GroupCommitBlockSinkTest { + + @Test + public void testInitLocationParamsSkipsCreateLocation() throws UserException { + OlapTable dstTable = Mockito.mock(OlapTable.class); + TupleDescriptor tuple = Mockito.mock(TupleDescriptor.class); + GroupCommitBlockSink sink = new GroupCommitBlockSink( + dstTable, tuple, Lists.newArrayList(1L), false, "async_mode", 0.0); + + List params = sink.initLocationParams(new TOlapTableSink()); + + Assert.assertEquals(2, params.size()); + Assert.assertNotNull(params.get(0).getTablets()); + Assert.assertTrue("master location should be empty placeholder", + params.get(0).getTablets().isEmpty()); + Assert.assertNotNull(params.get(1).getTablets()); + Assert.assertTrue("slave location should be empty placeholder", + params.get(1).getTablets().isEmpty()); + Mockito.verifyNoInteractions(dstTable); + Mockito.verifyNoInteractions(tuple); + } + + @Test + public void testParseGroupCommit() { + Assert.assertEquals(TGroupCommitMode.ASYNC_MODE, + GroupCommitBlockSink.parseGroupCommit("async_mode")); + Assert.assertEquals(TGroupCommitMode.ASYNC_MODE, + GroupCommitBlockSink.parseGroupCommit("ASYNC_MODE")); + Assert.assertEquals(TGroupCommitMode.SYNC_MODE, + GroupCommitBlockSink.parseGroupCommit("sync_mode")); + Assert.assertEquals(TGroupCommitMode.OFF_MODE, + GroupCommitBlockSink.parseGroupCommit("off_mode")); + Assert.assertNull(GroupCommitBlockSink.parseGroupCommit(null)); + Assert.assertNull(GroupCommitBlockSink.parseGroupCommit("invalid")); + } +}