From aa9c25aff3b8c9b7e4488d40c994a877b53be7e8 Mon Sep 17 00:00:00 2001 From: meegoo Date: Thu, 14 Jul 2022 13:57:27 +0800 Subject: [PATCH] [BugFix] Support shuffle broker load for non duplicate keys table --- .../java/com/starrocks/common/Config.java | 6 + .../load/loadv2/LoadingTaskPlanner.java | 115 +++++++++- .../load/loadv2/LoadingTaskPlannerTest.java | 203 ++++++++++++++++++ 3 files changed, 321 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/com/starrocks/common/Config.java b/fe/fe-core/src/main/java/com/starrocks/common/Config.java index 8e370703b4cf1..a325d45d30a15 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/Config.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/Config.java @@ -1497,6 +1497,12 @@ public class Config extends ConfigBase { @ConfField(mutable = true) public static boolean enable_pipeline_load = true; + /** + * Enable shuffle load + */ + @ConfField(mutable = true) + public static boolean enable_shuffle_load = true; + /** * Unused config field, leave it here for backward compatibility */ diff --git a/fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadingTaskPlanner.java b/fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadingTaskPlanner.java index d96b11de5ab26..d5d1b31a4b1e2 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadingTaskPlanner.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadingTaskPlanner.java @@ -26,8 +26,11 @@ import com.starrocks.analysis.Analyzer; import com.starrocks.analysis.BrokerDesc; import com.starrocks.analysis.DescriptorTable; +import com.starrocks.analysis.Expr; import com.starrocks.analysis.SlotDescriptor; +import com.starrocks.analysis.SlotRef; import com.starrocks.analysis.TupleDescriptor; +import com.starrocks.catalog.AggregateType; import com.starrocks.catalog.Column; import com.starrocks.catalog.KeysType; import com.starrocks.catalog.OlapTable; @@ -44,6 +47,7 @@ import com.starrocks.load.BrokerFileGroup; import com.starrocks.load.Load; import com.starrocks.planner.DataPartition; +import com.starrocks.planner.ExchangeNode; import com.starrocks.planner.FileScanNode; import com.starrocks.planner.OlapTableSink; import com.starrocks.planner.PlanFragment; @@ -55,12 +59,14 @@ import com.starrocks.sql.optimizer.statistics.ColumnDict; import com.starrocks.sql.optimizer.statistics.IDictManager; import com.starrocks.thrift.TBrokerFileStatus; +import com.starrocks.thrift.TPartitionType; import com.starrocks.thrift.TUniqueId; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; @@ -91,6 +97,9 @@ public class LoadingTaskPlanner { private int nextNodeId = 0; + private TupleDescriptor tupleDesc; + private List> globalDicts = Lists.newArrayList(); + public LoadingTaskPlanner(Long loadJobId, long txnId, long dbId, OlapTable table, BrokerDesc brokerDesc, List brokerFileGroups, boolean strictMode, String timezone, long timeoutS, @@ -112,8 +121,7 @@ public LoadingTaskPlanner(Long loadJobId, long txnId, long dbId, OlapTable table public void plan(TUniqueId loadId, List> fileStatusesList, int filesAdded) throws UserException { // Generate tuple descriptor - TupleDescriptor tupleDesc = descTable.createTupleDescriptor("DestTableTuple"); - List> globalDicts = Lists.newArrayList(); + tupleDesc = descTable.createTupleDescriptor("DestTableTuple"); List destColumns = Lists.newArrayList(); boolean isPrimaryKey = table.getKeysType() == KeysType.PRIMARY_KEYS; if (isPrimaryKey && partialUpdate) { @@ -154,6 +162,15 @@ public void plan(TUniqueId loadId, List> fileStatusesLis slotDesc.setIsNullable(false); } + if (Config.enable_shuffle_load && needShufflePlan()) { + buildShufflePlan(loadId, fileStatusesList, filesAdded); + } else { + buildDirectPlan(loadId, fileStatusesList, filesAdded); + } + } + + public void buildDirectPlan(TUniqueId loadId, List> fileStatusesList, int filesAdded) + throws UserException { // Generate plan trees // 1. Broker scan node FileScanNode scanNode = new FileScanNode(new PlanNodeId(nextNodeId++), tupleDesc, "FileScanNode", @@ -162,7 +179,6 @@ public void plan(TUniqueId loadId, List> fileStatusesLis scanNode.setUseVectorizedLoad(true); scanNode.init(analyzer); scanNode.finalizeStats(analyzer); - LOG.info("use vectorized load: {}, load job id: {}", true, loadJobId); scanNodes.add(scanNode); descTable.computeMemLayout(); @@ -199,6 +215,99 @@ public void plan(TUniqueId loadId, List> fileStatusesLis Collections.reverse(fragments); } + public void buildShufflePlan(TUniqueId loadId, List> fileStatusesList, int filesAdded) + throws UserException { + // Generate plan trees + // 1. Broker scan node + FileScanNode scanNode = new FileScanNode(new PlanNodeId(nextNodeId++), tupleDesc, "FileScanNode", + fileStatusesList, filesAdded); + scanNode.setLoadInfo(loadJobId, txnId, table, brokerDesc, fileGroups, strictMode, parallelInstanceNum); + scanNode.setUseVectorizedLoad(true); + scanNode.init(analyzer); + scanNode.finalizeStats(analyzer); + scanNodes.add(scanNode); + descTable.computeMemLayout(); + + // 3. Scan plan fragment + PlanFragment scanFragment = new PlanFragment(new PlanFragmentId(0), scanNode, DataPartition.RANDOM); + scanFragment.setParallelExecNum(parallelInstanceNum); + + fragments.add(scanFragment); + + // 5. Exchange node + List columns = table.getFullSchema(); + + List keyColumns = table.getKeyColumnsByIndexId(table.getBaseIndexId()); + List partitionExprs = Lists.newArrayList(); + keyColumns.forEach(column -> { + partitionExprs.add(new SlotRef(tupleDesc.getColumnSlot(column.getName()))); + }); + + DataPartition dataPartition = new DataPartition(TPartitionType.HASH_PARTITIONED, partitionExprs); + ExchangeNode exchangeNode = new ExchangeNode(new PlanNodeId(nextNodeId++), scanFragment.getPlanRoot(), + dataPartition); + PlanFragment sinkFragment = new PlanFragment(new PlanFragmentId(1), exchangeNode, dataPartition); + exchangeNode.setFragment(sinkFragment); + scanFragment.setDestination(exchangeNode); + scanFragment.setOutputPartition(dataPartition); + + // 4. Olap table sink + List partitionIds = getAllPartitionIds(); + OlapTableSink olapTableSink = new OlapTableSink(table, tupleDesc, partitionIds); + olapTableSink.init(loadId, txnId, dbId, timeoutS); + olapTableSink.complete(); + + // 6. Sink plan fragment + sinkFragment.setSink(olapTableSink); + // At present, we only support dop=1 for olap table sink. + // because tablet writing needs to know the number of senders in advance + // and guaranteed order of data writing + // It can be parallel only in some scenes, for easy use 1 dop now. + sinkFragment.setPipelineDop(1); + sinkFragment.setParallelExecNum(parallelInstanceNum); + // After data loading, we need to check the global dict for low cardinality string column + // whether update. + sinkFragment.setLoadGlobalDicts(globalDicts); + + fragments.add(sinkFragment); + + // 4. finalize + for (PlanFragment fragment : fragments) { + try { + fragment.finalize(analyzer, false); + } catch (NotImplementedException e) { + LOG.info("Fragment finalize failed.{}", e.getMessage()); + throw new UserException("Fragment finalize failed."); + } + } + Collections.reverse(fragments); + } + + public Boolean needShufflePlan() { + if (KeysType.DUP_KEYS.equals(table.getKeysType())) { + return false; + } + + if (table.getDefaultReplicationNum() <= 1) { + return false; + } + + if (KeysType.AGG_KEYS.equals(table.getKeysType())) { + for (Map.Entry> entry : table.getIndexIdToSchema().entrySet()) { + List schema = entry.getValue(); + for (Column column : schema) { + if (column.getAggregationType() == AggregateType.REPLACE + || column.getAggregationType() == AggregateType.REPLACE_IF_NOT_NULL) { + return true; + } + } + } + return false; + } + + return true; + } + public long getStartTime() { return startTime; } diff --git a/fe/fe-core/src/test/java/com/starrocks/load/loadv2/LoadingTaskPlannerTest.java b/fe/fe-core/src/test/java/com/starrocks/load/loadv2/LoadingTaskPlannerTest.java index e70170b697d69..9ba60a5b15177 100644 --- a/fe/fe-core/src/test/java/com/starrocks/load/loadv2/LoadingTaskPlannerTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/load/loadv2/LoadingTaskPlannerTest.java @@ -33,6 +33,8 @@ import com.starrocks.analysis.SqlParser; import com.starrocks.analysis.SqlScanner; import com.starrocks.analysis.StringLiteral; +import com.starrocks.analysis.TupleDescriptor; +import com.starrocks.analysis.TupleId; import com.starrocks.catalog.AggregateType; import com.starrocks.catalog.Column; import com.starrocks.catalog.Database; @@ -58,6 +60,7 @@ import com.starrocks.system.SystemInfoService; import com.starrocks.thrift.TBrokerFileStatus; import com.starrocks.thrift.TBrokerScanRangeParams; +import com.starrocks.thrift.TExplainLevel; import com.starrocks.thrift.TExpr; import com.starrocks.thrift.TExprNode; import com.starrocks.thrift.TExprNodeType; @@ -71,6 +74,8 @@ import mockit.Expectations; import mockit.Injectable; import mockit.Mocked; +import mockit.Mock; +import mockit.MockUp; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -78,6 +83,7 @@ import java.io.StringReader; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -805,4 +811,201 @@ public void testLoadWithOpAutoMapping(@Mocked GlobalStateMgr globalStateMgr, Assert.assertEquals(1, opExpr.nodes.size()); Assert.assertEquals(TExprNodeType.SLOT_REF, opExpr.nodes.get(0).node_type); } + + @Test + public void testShuffle(@Mocked GlobalStateMgr globalStateMgr, @Mocked SystemInfoService systemInfoService, + @Injectable Database db, @Injectable OlapTable table) throws Exception { + // table schema + List columns = Lists.newArrayList(); + columns.add(new Column("k1", Type.TINYINT, true, null, true, null, "")); + columns.add(new Column("k2", Type.INT, true, null, false, null, "")); + columns.add(new Column("k3", ScalarType.createVarchar(50), true, null, true, null, "")); + columns.add(new Column("v", Type.BIGINT, false, AggregateType.SUM, false, null, "")); + + List keyColumns = Lists.newArrayList(); + keyColumns.add(columns.get(0)); + keyColumns.add(columns.get(1)); + keyColumns.add(columns.get(2)); + + Function f1 = new Function(new FunctionName(FunctionSet.SUBSTR), new Type[] {Type.VARCHAR, Type.INT, Type.INT}, + Type.VARCHAR, true); + Function f2 = new Function(new FunctionName("casttoint"), new Type[] {Type.VARCHAR}, + Type.INT, true); + new Expectations() { + { + GlobalStateMgr.getCurrentSystemInfo(); + result = systemInfoService; + systemInfoService.getIdToBackend(); + result = idToBackend; + table.getKeysType(); + result = KeysType.UNIQUE_KEYS; + table.getDefaultReplicationNum(); + result = 3; + table.getBaseIndexId(); + result = 1; + table.getKeyColumnsByIndexId((long) 1); + result = keyColumns; + table.getBaseSchema(); + result = columns; + table.getFullSchema(); + result = columns; + table.getPartitions(); + minTimes = 0; + result = Arrays.asList(partition); + partition.getId(); + minTimes = 0; + result = 0; + table.getColumn("k1"); + result = columns.get(0); + table.getColumn("k2"); + result = columns.get(1); + table.getColumn("k3"); + result = columns.get(2); + table.getColumn("v"); + result = columns.get(3); + table.getColumn("k33"); + result = null; + globalStateMgr.getFunction((Function) any, (Function.CompareMode) any); + returns(f1, f1, f2); + } + }; + + // column mappings + String sql = "LOAD LABEL label0 (DATA INFILE('path/k2=1/file1') INTO TABLE t2 FORMAT AS 'orc' (k1,k33,v) " + + "COLUMNS FROM PATH AS (k2) set (k3 = substr(k33,1,5))) WITH BROKER 'broker0'"; + SqlParser parser = new SqlParser(new SqlScanner(new StringReader(sql))); + LoadStmt loadStmt = (LoadStmt) SqlParserUtils.getFirstStmt(parser); + List columnMappingList = Deencapsulation.getField(loadStmt.getDataDescriptions().get(0), + "columnMappingList"); + + // file groups + List fileGroups = Lists.newArrayList(); + List files = Lists.newArrayList("path/k2=1/file1"); + List columnNames = Lists.newArrayList("k1", "k33", "v"); + DataDescription desc = new DataDescription("t2", null, files, columnNames, + null, null, "ORC", Lists.newArrayList("k2"), + false, columnMappingList, null); + Deencapsulation.invoke(desc, "analyzeColumns"); + BrokerFileGroup brokerFileGroup = new BrokerFileGroup(desc); + Deencapsulation.setField(brokerFileGroup, "columnSeparator", "\t"); + Deencapsulation.setField(brokerFileGroup, "rowDelimiter", "\n"); + Deencapsulation.setField(brokerFileGroup, "fileFormat", "ORC"); + fileGroups.add(brokerFileGroup); + + // file status + List> fileStatusesList = Lists.newArrayList(); + List fileStatusList = Lists.newArrayList(); + fileStatusList.add(new TBrokerFileStatus("path/k2=1/file1", false, 268435456, true)); + fileStatusesList.add(fileStatusList); + + // plan + LoadingTaskPlanner planner = new LoadingTaskPlanner(jobId, txnId, db.getId(), table, brokerDesc, fileGroups, + false, TimeUtils.DEFAULT_TIME_ZONE, 3600, System.currentTimeMillis(), false); + planner.plan(loadId, fileStatusesList, 1); + + // check fragment + List fragments = planner.getFragments(); + Assert.assertEquals(2, fragments.size()); + } + + @Test + public void testAggShuffle(@Mocked GlobalStateMgr globalStateMgr, @Mocked SystemInfoService systemInfoService, + @Injectable Database db, @Injectable OlapTable table) throws Exception { + // table schema + List columns = Lists.newArrayList(); + columns.add(new Column("k1", Type.TINYINT, true, null, true, null, "")); + columns.add(new Column("k2", Type.INT, true, null, false, null, "")); + columns.add(new Column("k3", ScalarType.createVarchar(50), true, null, true, null, "")); + columns.add(new Column("v", Type.BIGINT, false, AggregateType.REPLACE, false, null, "")); + + List keyColumns = Lists.newArrayList(); + keyColumns.add(columns.get(0)); + keyColumns.add(columns.get(1)); + keyColumns.add(columns.get(2)); + + Map> indexSchema = Maps.newHashMap(); + indexSchema.put((long) 1, columns); + + Function f1 = new Function(new FunctionName(FunctionSet.SUBSTR), new Type[] {Type.VARCHAR, Type.INT, Type.INT}, + Type.VARCHAR, true); + Function f2 = new Function(new FunctionName("casttoint"), new Type[] {Type.VARCHAR}, + Type.INT, true); + new Expectations() { + { + GlobalStateMgr.getCurrentSystemInfo(); + result = systemInfoService; + systemInfoService.getIdToBackend(); + result = idToBackend; + table.getKeysType(); + result = KeysType.AGG_KEYS; + table.getDefaultReplicationNum(); + result = 3; + table.getBaseIndexId(); + result = 1; + table.getIndexIdToSchema(); + result = indexSchema; + table.getKeyColumnsByIndexId((long) 1); + result = keyColumns; + table.getBaseSchema(); + result = columns; + table.getFullSchema(); + result = columns; + table.getPartitions(); + minTimes = 0; + result = Arrays.asList(partition); + partition.getId(); + minTimes = 0; + result = 0; + table.getColumn("k1"); + result = columns.get(0); + table.getColumn("k2"); + result = columns.get(1); + table.getColumn("k3"); + result = columns.get(2); + table.getColumn("v"); + result = columns.get(3); + table.getColumn("k33"); + result = null; + globalStateMgr.getFunction((Function) any, (Function.CompareMode) any); + returns(f1, f1, f2); + } + }; + + // column mappings + String sql = "LOAD LABEL label0 (DATA INFILE('path/k2=1/file1') INTO TABLE t2 FORMAT AS 'orc' (k1,k33,v) " + + "COLUMNS FROM PATH AS (k2) set (k3 = substr(k33,1,5))) WITH BROKER 'broker0'"; + SqlParser parser = new SqlParser(new SqlScanner(new StringReader(sql))); + LoadStmt loadStmt = (LoadStmt) SqlParserUtils.getFirstStmt(parser); + List columnMappingList = Deencapsulation.getField(loadStmt.getDataDescriptions().get(0), + "columnMappingList"); + + // file groups + List fileGroups = Lists.newArrayList(); + List files = Lists.newArrayList("path/k2=1/file1"); + List columnNames = Lists.newArrayList("k1", "k33", "v"); + DataDescription desc = new DataDescription("t2", null, files, columnNames, + null, null, "ORC", Lists.newArrayList("k2"), + false, columnMappingList, null); + Deencapsulation.invoke(desc, "analyzeColumns"); + BrokerFileGroup brokerFileGroup = new BrokerFileGroup(desc); + Deencapsulation.setField(brokerFileGroup, "columnSeparator", "\t"); + Deencapsulation.setField(brokerFileGroup, "rowDelimiter", "\n"); + Deencapsulation.setField(brokerFileGroup, "fileFormat", "ORC"); + fileGroups.add(brokerFileGroup); + + // file status + List> fileStatusesList = Lists.newArrayList(); + List fileStatusList = Lists.newArrayList(); + fileStatusList.add(new TBrokerFileStatus("path/k2=1/file1", false, 268435456, true)); + fileStatusesList.add(fileStatusList); + + // plan + LoadingTaskPlanner planner = new LoadingTaskPlanner(jobId, txnId, db.getId(), table, brokerDesc, fileGroups, + false, TimeUtils.DEFAULT_TIME_ZONE, 3600, System.currentTimeMillis(), false); + planner.plan(loadId, fileStatusesList, 1); + + // check fragment + List fragments = planner.getFragments(); + Assert.assertEquals(2, fragments.size()); + } }