Skip to content

Commit

Permalink
[config](load) enable new load scan node by default (#14808)
Browse files Browse the repository at this point in the history
Set FE `enable_new_load_scan_node` to true by default.
So that all load tasks(broker load, stream load, routine load, insert into) will use FileScanNode instead of BrokerScanNode
to read data

1. Support loading parquet file in stream load with new load scan node.
2. Fix bug that new parquet reader can not read column without logical or converted type.
3. Change jsonb parser function to "jsonb_parse_error_to_null"
    So that if the input string is not a valid json string, it will return null for jsonb column in load task.
  • Loading branch information
morningman committed Dec 16, 2022
1 parent e0d5289 commit 0e1e5a8
Show file tree
Hide file tree
Showing 36 changed files with 164 additions and 304 deletions.
1 change: 1 addition & 0 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext*
RETURN_IF_ERROR(file_sink->open());
request.__isset.path = true;
request.fileType = TFileType::FILE_LOCAL;
request.__set_file_size(ctx->body_bytes);
ctx->body_sink = file_sink;
}
if (!http_req->header(HTTP_COLUMNS).empty()) {
Expand Down
25 changes: 16 additions & 9 deletions be/src/util/jsonb_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
#include "jsonb_document.h"
#include "jsonb_error.h"
#include "jsonb_writer.h"
#include "string_parser.hpp"

namespace doris {

Expand Down Expand Up @@ -894,8 +895,12 @@ class JsonbParserT {
}

*pbuf = 0; // set null-terminator
int64_t val = strtol(num_buf_, NULL, 10);
if (errno == ERANGE) {
StringParser::ParseResult parse_result = StringParser::PARSE_SUCCESS;
int64_t val =
StringParser::string_to_int<int64_t>(num_buf_, pbuf - num_buf_, &parse_result);
if (parse_result != StringParser::PARSE_SUCCESS) {
VLOG_ROW << "debug string_to_int error for " << num_buf_ << " val=" << val
<< " parse_result=" << parse_result;
err_ = JsonbErrType::E_DECIMAL_OVERFLOW;
return false;
}
Expand Down Expand Up @@ -950,7 +955,7 @@ class JsonbParserT {
}

*pbuf = 0; // set null-terminator
return internConvertBufferToDouble();
return internConvertBufferToDouble(num_buf_, pbuf - num_buf_);
}

// parse the exponent part of a double number
Expand Down Expand Up @@ -990,15 +995,17 @@ class JsonbParserT {
}

*pbuf = 0; // set null-terminator
return internConvertBufferToDouble();
return internConvertBufferToDouble(num_buf_, pbuf - num_buf_);
}

// call system function to parse double to string
bool internConvertBufferToDouble() {
double val = strtod(num_buf_, NULL);

if (errno == ERANGE) {
err_ = JsonbErrType::E_DOUBLE_OVERFLOW;
bool internConvertBufferToDouble(char* num_buf_, int len) {
StringParser::ParseResult parse_result = StringParser::PARSE_SUCCESS;
double val = StringParser::string_to_float<double>(num_buf_, len, &parse_result);
if (parse_result != StringParser::PARSE_SUCCESS) {
VLOG_ROW << "debug string_to_float error for " << num_buf_ << " val=" << val
<< " parse_result=" << parse_result;
err_ = JsonbErrType::E_DECIMAL_OVERFLOW;
return false;
}

Expand Down
14 changes: 9 additions & 5 deletions be/src/vec/exec/format/parquet/schema_desc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,20 +168,24 @@ TypeDescriptor FieldDescriptor::get_doris_type(const tparquet::SchemaElement& ph
switch (physical_schema.type) {
case tparquet::Type::BOOLEAN:
type.type = TYPE_BOOLEAN;
return type;
break;
case tparquet::Type::INT32:
type.type = TYPE_INT;
return type;
break;
case tparquet::Type::INT64:
case tparquet::Type::INT96:
type.type = TYPE_BIGINT;
return type;
break;
case tparquet::Type::FLOAT:
type.type = TYPE_FLOAT;
return type;
break;
case tparquet::Type::DOUBLE:
type.type = TYPE_DOUBLE;
return type;
break;
case tparquet::Type::BYTE_ARRAY:
case tparquet::Type::FIXED_LEN_BYTE_ARRAY:
type.type = TYPE_STRING;
break;
default:
break;
}
Expand Down
6 changes: 5 additions & 1 deletion be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,11 @@ Status VFileScanner::_init_src_block(Block* block) {
data_type = DataTypeFactory::instance().create_data_type(it->second, true);
}
if (data_type == nullptr) {
return Status::NotSupported(fmt::format("Not support arrow type:{}", slot->col_name()));
return Status::NotSupported(
fmt::format("Not support data type:{} for column: {}",
(it == _name_to_col_type.end() ? slot->type().debug_string()
: it->second.debug_string()),
slot->col_name()));
}
MutableColumnPtr data_column = data_type->create_column();
_src_block.insert(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,14 @@ public DataDescription(String tableName,
public DataDescription(String tableName, LoadTaskInfo taskInfo) {
this.tableName = tableName;
this.partitionNames = taskInfo.getPartitions();
// Add a dummy path to just make analyze() happy.
// Stream load does not need this field.
this.filePaths = Lists.newArrayList("dummy");

if (!Strings.isNullOrEmpty(taskInfo.getPath())) {
this.filePaths = Lists.newArrayList(taskInfo.getPath());
} else {
// Add a dummy path to just make analyze() happy.
this.filePaths = Lists.newArrayList("dummy");
}

this.fileFieldNames = taskInfo.getColumnExprDescs().getFileColNames();
this.columnSeparator = taskInfo.getColumnSeparator();
this.lineDelimiter = taskInfo.getLineDelimiter();
Expand Down Expand Up @@ -259,7 +264,20 @@ private void getFileFormatAndCompressType(LoadTaskInfo taskInfo) {
// the compress type is saved in "compressType"
this.fileFormat = "csv";
} else {
this.fileFormat = "json";
switch (type) {
case FORMAT_ORC:
this.fileFormat = "orc";
break;
case FORMAT_PARQUET:
this.fileFormat = "parquet";
break;
case FORMAT_JSON:
this.fileFormat = "json";
break;
default:
this.fileFormat = "unknown";
break;
}
}
}
// get compress type
Expand Down Expand Up @@ -1019,3 +1037,4 @@ public String toString() {
return toSql();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -1749,7 +1749,7 @@ public class Config extends ConfigBase {
* Temp config, should be removed when new file scan node is ready.
*/
@ConfField(mutable = true)
public static boolean enable_new_load_scan_node = false;
public static boolean enable_new_load_scan_node = true;

/**
* Max data version of backends serialize block.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ public BrokerFileGroup(DataDescription dataDescription) {
this.deleteCondition = dataDescription.getDeleteCondition();
this.mergeType = dataDescription.getMergeType();
this.sequenceCol = dataDescription.getSequenceCol();
this.filePaths = dataDescription.getFilePaths();
}

// NOTE: DBLock will be held
Expand Down Expand Up @@ -598,3 +599,4 @@ public static BrokerFileGroup read(DataInput in) throws IOException {
return fileGroup;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ protected void finalizeParams(Map<String, SlotDescriptor> slotDescByName,
expr.analyze(analyzer);
}

// for jsonb type, use jsonb_parse_xxx to parse src string to jsonb.
// and if input string is not a valid json string, return null.
PrimitiveType dstType = destSlotDesc.getType().getPrimitiveType();
PrimitiveType srcType = expr.getType().getPrimitiveType();
if (dstType == PrimitiveType.JSONB
Expand All @@ -217,7 +219,7 @@ protected void finalizeParams(Map<String, SlotDescriptor> slotDescByName,
if (destSlotDesc.getIsNullable() || expr.isNullable()) {
nullable = "nullable";
}
String name = "jsonb_parse_" + nullable + "_error_to_invalid";
String name = "jsonb_parse_" + nullable + "_error_to_null";
expr = new FunctionCallExpr(name, args);
expr.analyze(analyzer);
} else {
Expand Down Expand Up @@ -251,3 +253,4 @@ protected void toThrift(TPlanNode planNode) {
planNode.setBrokerScanNode(brokerScanNode);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.doris.thrift.PaloInternalServiceVersion;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TLoadErrorHubInfo;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPlanFragmentExecParams;
Expand Down Expand Up @@ -180,9 +181,15 @@ public TExecPlanFragmentParams plan(TUniqueId loadId) throws UserException {
fileGroup.parse(db, dataDescription);
// 2. create dummy file status
TBrokerFileStatus fileStatus = new TBrokerFileStatus();
fileStatus.setPath("");
fileStatus.setIsDir(false);
fileStatus.setSize(-1); // must set to -1, means stream.
if (taskInfo.getFileType() == TFileType.FILE_LOCAL) {
fileStatus.setPath(taskInfo.getPath());
fileStatus.setIsDir(false);
fileStatus.setSize(taskInfo.getFileSize()); // must set to -1, means stream.
} else {
fileStatus.setPath("");
fileStatus.setIsDir(false);
fileStatus.setSize(-1); // must set to -1, means stream.
}
fileScanNode.setLoadInfo(loadId, taskInfo.getTxnId(), destTable, BrokerDesc.createForStreamLoad(),
fileGroup, fileStatus, taskInfo.isStrictMode(), taskInfo.getFileType());
scanNode = fileScanNode;
Expand Down Expand Up @@ -324,3 +331,4 @@ private List<Long> getAllPartitionIds() throws DdlException, AnalysisException {
return null;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,25 @@ protected void finalizeParamsForLoad(ParamCreateContext context, Analyzer analyz
expr = new ArithmeticExpr(ArithmeticExpr.Operator.MULTIPLY, expr, new IntLiteral(-1));
expr.analyze(analyzer);
}
expr = castToSlot(destSlotDesc, expr);

// for jsonb type, use jsonb_parse_xxx to parse src string to jsonb.
// and if input string is not a valid json string, return null.
PrimitiveType dstType = destSlotDesc.getType().getPrimitiveType();
PrimitiveType srcType = expr.getType().getPrimitiveType();
if (dstType == PrimitiveType.JSONB
&& (srcType == PrimitiveType.VARCHAR || srcType == PrimitiveType.STRING)) {
List<Expr> args = Lists.newArrayList();
args.add(expr);
String nullable = "notnull";
if (destSlotDesc.getIsNullable() || expr.isNullable()) {
nullable = "nullable";
}
String name = "jsonb_parse_" + nullable + "_error_to_null";
expr = new FunctionCallExpr(name, args);
expr.analyze(analyzer);
} else {
expr = castToSlot(destSlotDesc, expr);
}
params.putToExprOfDestSlot(destSlotDesc.getId().asInt(), expr.treeToThrift());
}
params.setDestSidToSrcSidWithoutTrans(destSidToSrcSidWithoutTrans);
Expand Down Expand Up @@ -550,3 +568,4 @@ public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
}



Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.doris.thrift.TScanRangeLocations;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -140,6 +141,10 @@ public int getLoadParallelism() {
return loadParallelism;
}

public TFileType getFileType() {
return fileType;
}

public String getExplainString(String prefix) {
StringBuilder sb = new StringBuilder();
sb.append("file scan\n");
Expand Down Expand Up @@ -303,10 +308,20 @@ private TFileRangeDesc createFileRangeDesc(long curFileOffset, TBrokerFileStatus
rangeDesc.setSize(rangeBytes);
rangeDesc.setColumnsFromPath(columnsFromPath);
} else {
// for stream load
if (getFileType() == TFileType.FILE_LOCAL) {
// when loading parquet via stream, there will be a local file saved on BE
// so to read it as a local file.
Preconditions.checkState(fileGroup.getFilePaths().size() == 1);
rangeDesc.setPath(fileGroup.getFilePaths().get(0));
rangeDesc.setStartOffset(0);
rangeDesc.setSize(fileStatus.size);
}
rangeDesc.setLoadId(loadId);
rangeDesc.setSize(fileStatus.size);
}
return rangeDesc;
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public ParamCreateContext createContext(Analyzer analyzer) throws UserException
TFileAttributes fileAttributes = new TFileAttributes();
setFileAttributes(ctx.fileGroup, fileAttributes);
params.setFileAttributes(fileAttributes);
params.setFileType(fileGroupInfo.getBrokerDesc().getFileType());
params.setFileType(fileGroupInfo.getFileType());
ctx.params = params;

initColumns(ctx, analyzer);
Expand Down Expand Up @@ -252,3 +252,4 @@ public TableIf getTargetTable() {
return fileGroupInfo.getTargetTable();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -893,7 +893,7 @@ public TStreamLoadPutResult streamLoadPut(TStreamLoadPutRequest request) {
} catch (Throwable e) {
LOG.warn("catch unknown result.", e);
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
status.addToErrorMsgs(e.getClass().getSimpleName() + ": " + Strings.nullToEmpty(e.getMessage()));
return result;
}
return result;
Expand Down Expand Up @@ -1224,3 +1224,4 @@ private TInitExternalCtlMetaResult initDb(long catalogId, long dbId) throws TExc
return result;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ public interface LoadTaskInfo {

String getPath();

default long getFileSize() {
return 0;
}

double getMaxFilterRatio();

ImportColumnDescs getColumnExprDescs();
Expand Down Expand Up @@ -118,3 +122,4 @@ public List<Expr> getColumnMappingList() {
}
}
}

11 changes: 11 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class StreamLoadTask implements LoadTaskInfo {
private Separator lineDelimiter;
private PartitionNames partitions;
private String path;
private long fileSize = 0;
private boolean negative;
private boolean strictMode = false; // default is false
private String timezone = TimeUtils.DEFAULT_TIME_ZONE;
Expand Down Expand Up @@ -159,6 +160,11 @@ public String getPath() {
return path;
}

@Override
public long getFileSize() {
return fileSize;
}

public boolean getNegative() {
return negative;
}
Expand Down Expand Up @@ -234,6 +240,7 @@ public boolean hasSequenceCol() {
return !Strings.isNullOrEmpty(sequenceCol);
}


@Override
public String getSequenceCol() {
return sequenceCol;
Expand All @@ -249,6 +256,9 @@ public static StreamLoadTask fromTStreamLoadPutRequest(TStreamLoadPutRequest req
request.getFileType(), request.getFormatType(),
request.getCompressType());
streamLoadTask.setOptionalFromTSLPutRequest(request);
if (request.isSetFileSize()) {
streamLoadTask.fileSize = request.getFileSize();
}
return streamLoadTask;
}

Expand Down Expand Up @@ -416,3 +426,4 @@ public double getMaxFilterRatio() {
return maxFilterRatio;
}
}

Loading

0 comments on commit 0e1e5a8

Please sign in to comment.