Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] Support shuffle broker load for non duplicate keys table #8714

Merged
merged 1 commit into from Jul 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/common/Config.java
Expand Up @@ -1497,6 +1497,12 @@ public class Config extends ConfigBase {
@ConfField(mutable = true)
public static boolean enable_pipeline_load = true;

/**
* Enable shuffle load
*/
@ConfField(mutable = true)
public static boolean enable_shuffle_load = true;

/**
* Unused config field, leave it here for backward compatibility
*/
Expand Down
Expand Up @@ -26,8 +26,11 @@
import com.starrocks.analysis.Analyzer;
import com.starrocks.analysis.BrokerDesc;
import com.starrocks.analysis.DescriptorTable;
import com.starrocks.analysis.Expr;
import com.starrocks.analysis.SlotDescriptor;
import com.starrocks.analysis.SlotRef;
import com.starrocks.analysis.TupleDescriptor;
import com.starrocks.catalog.AggregateType;
import com.starrocks.catalog.Column;
import com.starrocks.catalog.KeysType;
import com.starrocks.catalog.OlapTable;
Expand All @@ -44,6 +47,7 @@
import com.starrocks.load.BrokerFileGroup;
import com.starrocks.load.Load;
import com.starrocks.planner.DataPartition;
import com.starrocks.planner.ExchangeNode;
import com.starrocks.planner.FileScanNode;
import com.starrocks.planner.OlapTableSink;
import com.starrocks.planner.PlanFragment;
Expand All @@ -55,12 +59,14 @@
import com.starrocks.sql.optimizer.statistics.ColumnDict;
import com.starrocks.sql.optimizer.statistics.IDictManager;
import com.starrocks.thrift.TBrokerFileStatus;
import com.starrocks.thrift.TPartitionType;
import com.starrocks.thrift.TUniqueId;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

Expand Down Expand Up @@ -91,6 +97,9 @@ public class LoadingTaskPlanner {

private int nextNodeId = 0;

private TupleDescriptor tupleDesc;
private List<Pair<Integer, ColumnDict>> globalDicts = Lists.newArrayList();

public LoadingTaskPlanner(Long loadJobId, long txnId, long dbId, OlapTable table,
BrokerDesc brokerDesc, List<BrokerFileGroup> brokerFileGroups,
boolean strictMode, String timezone, long timeoutS,
Expand All @@ -112,8 +121,7 @@ public LoadingTaskPlanner(Long loadJobId, long txnId, long dbId, OlapTable table
public void plan(TUniqueId loadId, List<List<TBrokerFileStatus>> fileStatusesList, int filesAdded)
throws UserException {
// Generate tuple descriptor
TupleDescriptor tupleDesc = descTable.createTupleDescriptor("DestTableTuple");
List<Pair<Integer, ColumnDict>> globalDicts = Lists.newArrayList();
tupleDesc = descTable.createTupleDescriptor("DestTableTuple");
List<Column> destColumns = Lists.newArrayList();
boolean isPrimaryKey = table.getKeysType() == KeysType.PRIMARY_KEYS;
if (isPrimaryKey && partialUpdate) {
Expand Down Expand Up @@ -154,6 +162,15 @@ public void plan(TUniqueId loadId, List<List<TBrokerFileStatus>> fileStatusesLis
slotDesc.setIsNullable(false);
}

if (Config.enable_shuffle_load && needShufflePlan()) {
buildShufflePlan(loadId, fileStatusesList, filesAdded);
} else {
buildDirectPlan(loadId, fileStatusesList, filesAdded);
}
}

public void buildDirectPlan(TUniqueId loadId, List<List<TBrokerFileStatus>> fileStatusesList, int filesAdded)
throws UserException {
// Generate plan trees
// 1. Broker scan node
FileScanNode scanNode = new FileScanNode(new PlanNodeId(nextNodeId++), tupleDesc, "FileScanNode",
Expand All @@ -162,7 +179,6 @@ public void plan(TUniqueId loadId, List<List<TBrokerFileStatus>> fileStatusesLis
scanNode.setUseVectorizedLoad(true);
scanNode.init(analyzer);
scanNode.finalizeStats(analyzer);
LOG.info("use vectorized load: {}, load job id: {}", true, loadJobId);
scanNodes.add(scanNode);
descTable.computeMemLayout();

Expand Down Expand Up @@ -199,6 +215,99 @@ public void plan(TUniqueId loadId, List<List<TBrokerFileStatus>> fileStatusesLis
Collections.reverse(fragments);
}

public void buildShufflePlan(TUniqueId loadId, List<List<TBrokerFileStatus>> fileStatusesList, int filesAdded)
throws UserException {
// Generate plan trees
// 1. Broker scan node
FileScanNode scanNode = new FileScanNode(new PlanNodeId(nextNodeId++), tupleDesc, "FileScanNode",
fileStatusesList, filesAdded);
scanNode.setLoadInfo(loadJobId, txnId, table, brokerDesc, fileGroups, strictMode, parallelInstanceNum);
scanNode.setUseVectorizedLoad(true);
scanNode.init(analyzer);
scanNode.finalizeStats(analyzer);
scanNodes.add(scanNode);
descTable.computeMemLayout();

// 3. Scan plan fragment
PlanFragment scanFragment = new PlanFragment(new PlanFragmentId(0), scanNode, DataPartition.RANDOM);
scanFragment.setParallelExecNum(parallelInstanceNum);

fragments.add(scanFragment);

// 5. Exchange node
List<Column> columns = table.getFullSchema();

List<Column> keyColumns = table.getKeyColumnsByIndexId(table.getBaseIndexId());
List<Expr> partitionExprs = Lists.newArrayList();
keyColumns.forEach(column -> {
partitionExprs.add(new SlotRef(tupleDesc.getColumnSlot(column.getName())));
});

DataPartition dataPartition = new DataPartition(TPartitionType.HASH_PARTITIONED, partitionExprs);
ExchangeNode exchangeNode = new ExchangeNode(new PlanNodeId(nextNodeId++), scanFragment.getPlanRoot(),
dataPartition);
PlanFragment sinkFragment = new PlanFragment(new PlanFragmentId(1), exchangeNode, dataPartition);
exchangeNode.setFragment(sinkFragment);
scanFragment.setDestination(exchangeNode);
scanFragment.setOutputPartition(dataPartition);

// 4. Olap table sink
List<Long> partitionIds = getAllPartitionIds();
OlapTableSink olapTableSink = new OlapTableSink(table, tupleDesc, partitionIds);
olapTableSink.init(loadId, txnId, dbId, timeoutS);
olapTableSink.complete();

// 6. Sink plan fragment
sinkFragment.setSink(olapTableSink);
// At present, we only support dop=1 for olap table sink.
// because tablet writing needs to know the number of senders in advance
// and guaranteed order of data writing
// It can be parallel only in some scenes, for easy use 1 dop now.
sinkFragment.setPipelineDop(1);
sinkFragment.setParallelExecNum(parallelInstanceNum);
// After data loading, we need to check the global dict for low cardinality string column
// whether update.
sinkFragment.setLoadGlobalDicts(globalDicts);

fragments.add(sinkFragment);

// 4. finalize
for (PlanFragment fragment : fragments) {
try {
fragment.finalize(analyzer, false);
} catch (NotImplementedException e) {
LOG.info("Fragment finalize failed.{}", e.getMessage());
throw new UserException("Fragment finalize failed.");
}
}
Collections.reverse(fragments);
}

public Boolean needShufflePlan() {
if (KeysType.DUP_KEYS.equals(table.getKeysType())) {
meegoo marked this conversation as resolved.
Show resolved Hide resolved
return false;
}

if (table.getDefaultReplicationNum() <= 1) {
meegoo marked this conversation as resolved.
Show resolved Hide resolved
return false;
}

if (KeysType.AGG_KEYS.equals(table.getKeysType())) {
for (Map.Entry<Long, List<Column>> entry : table.getIndexIdToSchema().entrySet()) {
List<Column> schema = entry.getValue();
for (Column column : schema) {
if (column.getAggregationType() == AggregateType.REPLACE
|| column.getAggregationType() == AggregateType.REPLACE_IF_NOT_NULL) {
return true;
}
}
}
return false;
}

return true;
}

public long getStartTime() {
return startTime;
}
Expand Down