Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,9 @@ private void handleQuery(ConnectContext context, String requestDb, String reques
*/
private Map<String, Node> assemblePrunedPartitions(List<TScanRangeLocations> scanRangeLocationsList) {
Map<String, Node> result = new HashMap<>();
if (scanRangeLocationsList == null) {
return result;
}
for (TScanRangeLocations scanRangeLocations : scanRangeLocationsList) {
// only process palo(doris) scan range
TPaloScanRange scanRange = scanRangeLocations.scan_range.palo_scan_range;
Expand Down
27 changes: 27 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/planner/OlapSplit.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.planner;

import lombok.Data;

@Data
public class OlapSplit extends Split {
private long tabletId;

public OlapSplit() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,15 @@ protected Expr castToSlot(SlotDescriptor slotDesc, Expr expr) throws UserExcepti
*/
public abstract List<TScanRangeLocations> getScanRangeLocations(long maxScanRangeLength);

/**
* Get the ScanInfoList of this ScanNode, each subclass need to override this method.
* This method will eventually replace the method getScanRangeLocations.
* @return ScanRangeList for this ScanNode
*/
public ScanRangeList getScanRangeList() {
return null;
}

/**
* Update required_slots in scan node contexts. This is called after Nereids planner do the projection.
* In the projection process, some slots may be removed. So call this to update the slots info.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.planner;

import org.apache.doris.thrift.TScanRange;

import lombok.Data;

import java.util.List;

/**
* This class represents all the scan ranges for a scan node (including external scan node, olap scan node etc.)
* Each element in scanRanges is a range of the scan task. For example, a tablet for OlapScanNode,
* one or several file blocks for ExternalFileScanNode.
* ScanNode need to generate this scanRanges during init or finalize stage,
* and the Coordinator will get the scanRanges generated and assign each TScanRange a BE node to execute it.
*/
@Data
public class ScanRangeList {
private List<TScanRange> scanRanges;

public void addToScanRanges(TScanRange range) {
if (scanRanges == null) {
scanRanges = new java.util.ArrayList<>();
}
scanRanges.add(range);
}

public int getScanRangeSize() {
if (scanRanges == null) {
return 0;
}
return scanRanges.size();
}
}
8 changes: 8 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/planner/Split.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,12 @@ public Split() {}
public Split(String[] hosts) {
this.hosts = hosts;
}

public String[] getHosts() {
if (this.hosts == null) {
return new String[]{};
} else {
return this.hosts;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
import org.apache.doris.thrift.TQueryGlobals;
import org.apache.doris.thrift.TQueryOptions;
import org.apache.doris.thrift.TQueryType;
import org.apache.doris.thrift.TScanRangeLocations;
import org.apache.doris.thrift.TScanRange;
import org.apache.doris.thrift.TScanRangeParams;
import org.apache.doris.thrift.TUniqueId;

Expand Down Expand Up @@ -258,8 +258,8 @@ public TExecPlanFragmentParams plan(TUniqueId loadId) throws UserException {
execParams.destinations = Lists.newArrayList();
Map<Integer, List<TScanRangeParams>> perNodeScanRange = Maps.newHashMap();
List<TScanRangeParams> scanRangeParams = Lists.newArrayList();
for (TScanRangeLocations locations : scanNode.getScanRangeLocations(0)) {
scanRangeParams.add(new TScanRangeParams(locations.getScanRange()));
for (TScanRange range : scanNode.getScanRangeList().getScanRanges()) {
scanRangeParams.add(new TScanRangeParams(range));
}
// For stream load, only one sender
execParams.setSenderId(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanRangeList;
import org.apache.doris.planner.external.iceberg.IcebergApiSource;
import org.apache.doris.planner.external.iceberg.IcebergHMSSource;
import org.apache.doris.planner.external.iceberg.IcebergScanProvider;
Expand All @@ -65,14 +66,12 @@
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TPlanNodeType;
import org.apache.doris.thrift.TScanRangeLocations;
import org.apache.doris.thrift.TScanRange;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -132,7 +131,7 @@ public enum Type {
private List<ParamCreateContext> contexts = Lists.newArrayList();

// Final output of this file scan node
private List<TScanRangeLocations> scanRangeLocations = Lists.newArrayList();
private ScanRangeList scanRangeList = new ScanRangeList();

// For explain
private long inputSplitsNum = 0;
Expand Down Expand Up @@ -424,7 +423,7 @@ public void finalize(Analyzer analyzer) throws UserException {
setDefaultValueExprs(scanProvider, context);
setColumnPositionMappingForTextFile(scanProvider, context);
finalizeParamsForLoad(context, analyzer);
createScanRangeLocations(context, scanProvider);
createScanInfoList(context, scanProvider);
this.inputSplitsNum += scanProvider.getInputSplitNum();
this.totalFileSize += scanProvider.getInputFileSize();
TableIf table = desc.getTable();
Expand All @@ -450,7 +449,7 @@ public void finalizeForNereids() throws UserException {
setDefaultValueExprs(scanProvider, context);
setColumnPositionMappingForTextFile(scanProvider, context);
finalizeParamsForLoad(context, analyzer);
createScanRangeLocations(context, scanProvider);
createScanInfoList(context, scanProvider);
this.inputSplitsNum += scanProvider.getInputSplitNum();
this.totalFileSize += scanProvider.getInputFileSize();
TableIf table = desc.getTable();
Expand Down Expand Up @@ -677,9 +676,8 @@ protected void checkQuantileStateCompatibility(Analyzer analyzer, SlotDescriptor
}
}

private void createScanRangeLocations(ParamCreateContext context, FileScanProviderIf scanProvider)
throws UserException {
scanProvider.createScanRangeLocations(context, backendPolicy, scanRangeLocations);
private void createScanInfoList(ParamCreateContext context, FileScanProviderIf scanProvider) throws UserException {
scanProvider.createScanRangeList(context, scanRangeList);
}

private void genSlotToSchemaIdMap(ParamCreateContext context) {
Expand All @@ -700,7 +698,7 @@ private void genSlotToSchemaIdMap(ParamCreateContext context) {

@Override
public int getNumInstances() {
return scanRangeLocations.size();
return scanRangeList.getScanRangeSize();
}

@Override
Expand All @@ -712,9 +710,9 @@ protected void toThrift(TPlanNode planNode) {
}

@Override
public List<TScanRangeLocations> getScanRangeLocations(long maxScanRangeLength) {
LOG.debug("There is {} scanRangeLocations for execution.", scanRangeLocations.size());
return scanRangeLocations;
public ScanRangeList getScanRangeList() {
LOG.debug("There is {} scanRanges for execution.", scanRangeList.getScanRangeSize());
return scanRangeList;
}

@Override
Expand All @@ -730,29 +728,24 @@ public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
}

output.append(prefix).append("inputSplitNum=").append(inputSplitsNum).append(", totalFileSize=")
.append(totalFileSize).append(", scanRanges=").append(scanRangeLocations.size()).append("\n");
.append(totalFileSize).append(", scanRanges=").append(scanRangeList.getScanRangeSize()).append("\n");
output.append(prefix).append("partition=").append(readPartitionNum).append("/").append(totalPartitionNum)
.append("\n");

if (detailLevel == TExplainLevel.VERBOSE) {
output.append(prefix).append("backends:").append("\n");
Multimap<Long, TFileRangeDesc> scanRangeLocationsMap = ArrayListMultimap.create();
// 1. group by backend id
for (TScanRangeLocations locations : scanRangeLocations) {
scanRangeLocationsMap.putAll(locations.getLocations().get(0).backend_id,
locations.getScanRange().getExtScanRange().getFileScanRange().getRanges());
}
for (long beId : scanRangeLocationsMap.keySet()) {
output.append(prefix).append(" ").append(beId).append("\n");
List<TFileRangeDesc> fileRangeDescs = Lists.newArrayList(scanRangeLocationsMap.get(beId));
// 2. sort by file start offset
int seq = 1;
output.append(prefix).append(" scanRangeListSize: ").append(scanRangeList.getScanRangeSize()).append("\n");
for (TScanRange range : scanRangeList.getScanRanges()) {
output.append(prefix).append(" range sequence: ").append(seq).append("\n");
List<TFileRangeDesc> fileRangeDescs = range.getExtScanRange().getFileScanRange().getRanges();
// 1. sort by file start offset
Collections.sort(fileRangeDescs, new Comparator<TFileRangeDesc>() {
@Override
public int compare(TFileRangeDesc o1, TFileRangeDesc o2) {
return Long.compare(o1.getStartOffset(), o2.getStartOffset());
}
});
// 3. if size <= 4, print all. if size > 4, print first 3 and last 1
// 2. if size <= 4, print all. if size > 4, print first 3 and last 1
int size = fileRangeDescs.size();
if (size <= 4) {
for (TFileRangeDesc file : fileRangeDescs) {
Expand All @@ -777,6 +770,10 @@ public int compare(TFileRangeDesc o1, TFileRangeDesc o2) {
.append(" length: ").append(file.getFileSize())
.append("\n");
}
seq++;
if (seq > 4) {
break;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.FsBroker;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.common.util.Util;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.planner.ScanRangeList;
import org.apache.doris.planner.external.ExternalFileScanNode.ParamCreateContext;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TExternalScanRange;
import org.apache.doris.thrift.TFileFormatType;
Expand All @@ -40,8 +39,6 @@
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TScanRange;
import org.apache.doris.thrift.TScanRangeLocation;
import org.apache.doris.thrift.TScanRangeLocations;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -188,9 +185,8 @@ public void getFileStatusAndCalcInstance(FederationBackendPolicy backendPolicy)
LOG.info("number instance of file scan node is: {}, bytes per instance: {}", numInstances, bytesPerInstance);
}

public void createScanRangeLocations(ParamCreateContext context, FederationBackendPolicy backendPolicy,
List<TScanRangeLocations> scanRangeLocations) throws UserException {
TScanRangeLocations curLocations = newLocations(context.params, brokerDesc, backendPolicy);
public void createScanRangeList(ParamCreateContext context, ScanRangeList scanRangeList) throws UserException {
TScanRange range = newRange(context.params, brokerDesc);
long curInstanceBytes = 0;
long curFileOffset = 0;
for (int i = 0; i < fileStatuses.size(); ) {
Expand All @@ -211,78 +207,50 @@ public void createScanRangeLocations(ParamCreateContext context, FederationBacke
long rangeBytes = bytesPerInstance - curInstanceBytes;
TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, rangeBytes,
columnsFromPath);
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
range.getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
curFileOffset += rangeBytes;
} else {
TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, leftBytes,
columnsFromPath);
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
range.getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
curFileOffset = 0;
i++;
}

// New one scan
scanRangeLocations.add(curLocations);
curLocations = newLocations(context.params, brokerDesc, backendPolicy);
scanRangeList.addToScanRanges(range);
range = newRange(context.params, brokerDesc);
curInstanceBytes = 0;

} else {
TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, leftBytes, columnsFromPath);
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
range.getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
curFileOffset = 0;
curInstanceBytes += leftBytes;
i++;
}
}

// Put the last file
if (curLocations.getScanRange().getExtScanRange().getFileScanRange().isSetRanges()) {
scanRangeLocations.add(curLocations);
if (range.getExtScanRange().getFileScanRange().getRangesSize() > 0) {
scanRangeList.addToScanRanges(range);
}
}

protected TScanRangeLocations newLocations(TFileScanRangeParams params, BrokerDesc brokerDesc,
FederationBackendPolicy backendPolicy) throws UserException {

Backend selectedBackend = backendPolicy.getNextBe();

// Generate one file scan range
private TScanRange newRange(TFileScanRangeParams params, BrokerDesc brokerDesc) {
TFileScanRange fileScanRange = new TFileScanRange();

if (brokerDesc.getStorageType() == StorageBackend.StorageType.BROKER) {
FsBroker broker = null;
try {
broker = Env.getCurrentEnv().getBrokerMgr().getBroker(brokerDesc.getName(), selectedBackend.getIp());
} catch (AnalysisException e) {
throw new UserException(e.getMessage());
}
FsBroker broker = Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker();
params.addToBrokerAddresses(new TNetworkAddress(broker.ip, broker.port));
} else {
params.setBrokerAddresses(new ArrayList<>());
}
fileScanRange.setParams(params);

// Scan range
TExternalScanRange externalScanRange = new TExternalScanRange();
externalScanRange.setFileScanRange(fileScanRange);
TScanRange scanRange = new TScanRange();
scanRange.setExtScanRange(externalScanRange);

// Locations
TScanRangeLocations locations = new TScanRangeLocations();
locations.setScanRange(scanRange);

if (jobType == JobType.BULK_LOAD) {
TScanRangeLocation location = new TScanRangeLocation();
location.setBackendId(selectedBackend.getId());
location.setServer(new TNetworkAddress(selectedBackend.getIp(), selectedBackend.getBePort()));
locations.addToLocations(location);
} else {
// stream load do not need locations
locations.setLocations(Lists.newArrayList());
}

return locations;
return scanRange;
}

private TFileFormatType formatType(String fileFormat, String path) throws UserException {
Expand Down
Loading