Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[config](load) enable new load scan node by default #14808

Merged
merged 13 commits into from
Dec 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext*
RETURN_IF_ERROR(file_sink->open());
request.__isset.path = true;
request.fileType = TFileType::FILE_LOCAL;
request.__set_file_size(ctx->body_bytes);
ctx->body_sink = file_sink;
}
if (!http_req->header(HTTP_COLUMNS).empty()) {
Expand Down
25 changes: 16 additions & 9 deletions be/src/util/jsonb_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
#include "jsonb_document.h"
#include "jsonb_error.h"
#include "jsonb_writer.h"
#include "string_parser.hpp"

namespace doris {

Expand Down Expand Up @@ -894,8 +895,12 @@ class JsonbParserT {
}

*pbuf = 0; // set null-terminator
int64_t val = strtol(num_buf_, NULL, 10);
if (errno == ERANGE) {
StringParser::ParseResult parse_result = StringParser::PARSE_SUCCESS;
int64_t val =
StringParser::string_to_int<int64_t>(num_buf_, pbuf - num_buf_, &parse_result);
if (parse_result != StringParser::PARSE_SUCCESS) {
VLOG_ROW << "debug string_to_int error for " << num_buf_ << " val=" << val
<< " parse_result=" << parse_result;
err_ = JsonbErrType::E_DECIMAL_OVERFLOW;
return false;
}
Expand Down Expand Up @@ -950,7 +955,7 @@ class JsonbParserT {
}

*pbuf = 0; // set null-terminator
return internConvertBufferToDouble();
return internConvertBufferToDouble(num_buf_, pbuf - num_buf_);
}

// parse the exponent part of a double number
Expand Down Expand Up @@ -990,15 +995,17 @@ class JsonbParserT {
}

*pbuf = 0; // set null-terminator
return internConvertBufferToDouble();
return internConvertBufferToDouble(num_buf_, pbuf - num_buf_);
}

// call system function to parse double to string
bool internConvertBufferToDouble() {
double val = strtod(num_buf_, NULL);

if (errno == ERANGE) {
err_ = JsonbErrType::E_DOUBLE_OVERFLOW;
bool internConvertBufferToDouble(char* num_buf_, int len) {
StringParser::ParseResult parse_result = StringParser::PARSE_SUCCESS;
double val = StringParser::string_to_float<double>(num_buf_, len, &parse_result);
if (parse_result != StringParser::PARSE_SUCCESS) {
VLOG_ROW << "debug string_to_float error for " << num_buf_ << " val=" << val
<< " parse_result=" << parse_result;
err_ = JsonbErrType::E_DECIMAL_OVERFLOW;
return false;
}

Expand Down
14 changes: 9 additions & 5 deletions be/src/vec/exec/format/parquet/schema_desc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,20 +168,24 @@ TypeDescriptor FieldDescriptor::get_doris_type(const tparquet::SchemaElement& ph
switch (physical_schema.type) {
case tparquet::Type::BOOLEAN:
type.type = TYPE_BOOLEAN;
return type;
break;
case tparquet::Type::INT32:
type.type = TYPE_INT;
return type;
break;
case tparquet::Type::INT64:
case tparquet::Type::INT96:
type.type = TYPE_BIGINT;
return type;
break;
case tparquet::Type::FLOAT:
type.type = TYPE_FLOAT;
return type;
break;
case tparquet::Type::DOUBLE:
type.type = TYPE_DOUBLE;
return type;
break;
case tparquet::Type::BYTE_ARRAY:
case tparquet::Type::FIXED_LEN_BYTE_ARRAY:
type.type = TYPE_STRING;
break;
default:
break;
}
Expand Down
6 changes: 5 additions & 1 deletion be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,11 @@ Status VFileScanner::_init_src_block(Block* block) {
data_type = DataTypeFactory::instance().create_data_type(it->second, true);
}
if (data_type == nullptr) {
return Status::NotSupported(fmt::format("Not support arrow type:{}", slot->col_name()));
return Status::NotSupported(
fmt::format("Not support data type:{} for column: {}",
(it == _name_to_col_type.end() ? slot->type().debug_string()
: it->second.debug_string()),
slot->col_name()));
}
MutableColumnPtr data_column = data_type->create_column();
_src_block.insert(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,14 @@ public DataDescription(String tableName,
public DataDescription(String tableName, LoadTaskInfo taskInfo) {
this.tableName = tableName;
this.partitionNames = taskInfo.getPartitions();
// Add a dummy path to just make analyze() happy.
// Stream load does not need this field.
this.filePaths = Lists.newArrayList("dummy");

if (!Strings.isNullOrEmpty(taskInfo.getPath())) {
this.filePaths = Lists.newArrayList(taskInfo.getPath());
} else {
// Add a dummy path to just make analyze() happy.
this.filePaths = Lists.newArrayList("dummy");
}

this.fileFieldNames = taskInfo.getColumnExprDescs().getFileColNames();
this.columnSeparator = taskInfo.getColumnSeparator();
this.lineDelimiter = taskInfo.getLineDelimiter();
Expand Down Expand Up @@ -259,7 +264,20 @@ private void getFileFormatAndCompressType(LoadTaskInfo taskInfo) {
// the compress type is saved in "compressType"
this.fileFormat = "csv";
} else {
this.fileFormat = "json";
switch (type) {
case FORMAT_ORC:
this.fileFormat = "orc";
break;
case FORMAT_PARQUET:
this.fileFormat = "parquet";
break;
case FORMAT_JSON:
this.fileFormat = "json";
break;
default:
this.fileFormat = "unknown";
break;
}
}
}
// get compress type
Expand Down Expand Up @@ -1019,3 +1037,4 @@ public String toString() {
return toSql();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -1749,7 +1749,7 @@ public class Config extends ConfigBase {
* Temp config, should be removed when new file scan node is ready.
*/
@ConfField(mutable = true)
public static boolean enable_new_load_scan_node = false;
public static boolean enable_new_load_scan_node = true;

/**
* Max data version of backends serialize block.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ public BrokerFileGroup(DataDescription dataDescription) {
this.deleteCondition = dataDescription.getDeleteCondition();
this.mergeType = dataDescription.getMergeType();
this.sequenceCol = dataDescription.getSequenceCol();
this.filePaths = dataDescription.getFilePaths();
}

// NOTE: DBLock will be held
Expand Down Expand Up @@ -598,3 +599,4 @@ public static BrokerFileGroup read(DataInput in) throws IOException {
return fileGroup;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ protected void finalizeParams(Map<String, SlotDescriptor> slotDescByName,
expr.analyze(analyzer);
}

// for jsonb type, use jsonb_parse_xxx to parse src string to jsonb.
// and if input string is not a valid json string, return null.
PrimitiveType dstType = destSlotDesc.getType().getPrimitiveType();
PrimitiveType srcType = expr.getType().getPrimitiveType();
if (dstType == PrimitiveType.JSONB
Expand All @@ -217,7 +219,7 @@ protected void finalizeParams(Map<String, SlotDescriptor> slotDescByName,
if (destSlotDesc.getIsNullable() || expr.isNullable()) {
nullable = "nullable";
}
String name = "jsonb_parse_" + nullable + "_error_to_invalid";
String name = "jsonb_parse_" + nullable + "_error_to_null";
expr = new FunctionCallExpr(name, args);
expr.analyze(analyzer);
} else {
Expand Down Expand Up @@ -251,3 +253,4 @@ protected void toThrift(TPlanNode planNode) {
planNode.setBrokerScanNode(brokerScanNode);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.doris.thrift.PaloInternalServiceVersion;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TLoadErrorHubInfo;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPlanFragmentExecParams;
Expand Down Expand Up @@ -180,9 +181,15 @@ public TExecPlanFragmentParams plan(TUniqueId loadId) throws UserException {
fileGroup.parse(db, dataDescription);
// 2. create dummy file status
TBrokerFileStatus fileStatus = new TBrokerFileStatus();
fileStatus.setPath("");
fileStatus.setIsDir(false);
fileStatus.setSize(-1); // must set to -1, means stream.
if (taskInfo.getFileType() == TFileType.FILE_LOCAL) {
fileStatus.setPath(taskInfo.getPath());
fileStatus.setIsDir(false);
fileStatus.setSize(taskInfo.getFileSize()); // must set to -1, means stream.
} else {
fileStatus.setPath("");
fileStatus.setIsDir(false);
fileStatus.setSize(-1); // must set to -1, means stream.
}
fileScanNode.setLoadInfo(loadId, taskInfo.getTxnId(), destTable, BrokerDesc.createForStreamLoad(),
fileGroup, fileStatus, taskInfo.isStrictMode(), taskInfo.getFileType());
scanNode = fileScanNode;
Expand Down Expand Up @@ -324,3 +331,4 @@ private List<Long> getAllPartitionIds() throws DdlException, AnalysisException {
return null;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,25 @@ protected void finalizeParamsForLoad(ParamCreateContext context, Analyzer analyz
expr = new ArithmeticExpr(ArithmeticExpr.Operator.MULTIPLY, expr, new IntLiteral(-1));
expr.analyze(analyzer);
}
expr = castToSlot(destSlotDesc, expr);

// for jsonb type, use jsonb_parse_xxx to parse src string to jsonb.
// and if input string is not a valid json string, return null.
PrimitiveType dstType = destSlotDesc.getType().getPrimitiveType();
PrimitiveType srcType = expr.getType().getPrimitiveType();
if (dstType == PrimitiveType.JSONB
&& (srcType == PrimitiveType.VARCHAR || srcType == PrimitiveType.STRING)) {
List<Expr> args = Lists.newArrayList();
args.add(expr);
String nullable = "notnull";
if (destSlotDesc.getIsNullable() || expr.isNullable()) {
nullable = "nullable";
}
String name = "jsonb_parse_" + nullable + "_error_to_null";
expr = new FunctionCallExpr(name, args);
expr.analyze(analyzer);
} else {
expr = castToSlot(destSlotDesc, expr);
}
params.putToExprOfDestSlot(destSlotDesc.getId().asInt(), expr.treeToThrift());
}
params.setDestSidToSrcSidWithoutTrans(destSidToSrcSidWithoutTrans);
Expand Down Expand Up @@ -550,3 +568,4 @@ public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
}



Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.doris.thrift.TScanRangeLocations;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -140,6 +141,10 @@ public int getLoadParallelism() {
return loadParallelism;
}

public TFileType getFileType() {
return fileType;
}

public String getExplainString(String prefix) {
StringBuilder sb = new StringBuilder();
sb.append("file scan\n");
Expand Down Expand Up @@ -303,10 +308,20 @@ private TFileRangeDesc createFileRangeDesc(long curFileOffset, TBrokerFileStatus
rangeDesc.setSize(rangeBytes);
rangeDesc.setColumnsFromPath(columnsFromPath);
} else {
// for stream load
if (getFileType() == TFileType.FILE_LOCAL) {
// when loading parquet via stream, there will be a local file saved on BE
// so to read it as a local file.
Preconditions.checkState(fileGroup.getFilePaths().size() == 1);
rangeDesc.setPath(fileGroup.getFilePaths().get(0));
rangeDesc.setStartOffset(0);
rangeDesc.setSize(fileStatus.size);
}
rangeDesc.setLoadId(loadId);
rangeDesc.setSize(fileStatus.size);
}
return rangeDesc;
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public ParamCreateContext createContext(Analyzer analyzer) throws UserException
TFileAttributes fileAttributes = new TFileAttributes();
setFileAttributes(ctx.fileGroup, fileAttributes);
params.setFileAttributes(fileAttributes);
params.setFileType(fileGroupInfo.getBrokerDesc().getFileType());
params.setFileType(fileGroupInfo.getFileType());
ctx.params = params;

initColumns(ctx, analyzer);
Expand Down Expand Up @@ -252,3 +252,4 @@ public TableIf getTargetTable() {
return fileGroupInfo.getTargetTable();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -893,7 +893,7 @@ public TStreamLoadPutResult streamLoadPut(TStreamLoadPutRequest request) {
} catch (Throwable e) {
LOG.warn("catch unknown result.", e);
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
status.addToErrorMsgs(e.getClass().getSimpleName() + ": " + Strings.nullToEmpty(e.getMessage()));
return result;
}
return result;
Expand Down Expand Up @@ -1224,3 +1224,4 @@ private TInitExternalCtlMetaResult initDb(long catalogId, long dbId) throws TExc
return result;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ public interface LoadTaskInfo {

String getPath();

default long getFileSize() {
return 0;
}

double getMaxFilterRatio();

ImportColumnDescs getColumnExprDescs();
Expand Down Expand Up @@ -118,3 +122,4 @@ public List<Expr> getColumnMappingList() {
}
}
}

11 changes: 11 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class StreamLoadTask implements LoadTaskInfo {
private Separator lineDelimiter;
private PartitionNames partitions;
private String path;
private long fileSize = 0;
private boolean negative;
private boolean strictMode = false; // default is false
private String timezone = TimeUtils.DEFAULT_TIME_ZONE;
Expand Down Expand Up @@ -159,6 +160,11 @@ public String getPath() {
return path;
}

@Override
public long getFileSize() {
return fileSize;
}

public boolean getNegative() {
return negative;
}
Expand Down Expand Up @@ -234,6 +240,7 @@ public boolean hasSequenceCol() {
return !Strings.isNullOrEmpty(sequenceCol);
}


@Override
public String getSequenceCol() {
return sequenceCol;
Expand All @@ -249,6 +256,9 @@ public static StreamLoadTask fromTStreamLoadPutRequest(TStreamLoadPutRequest req
request.getFileType(), request.getFormatType(),
request.getCompressType());
streamLoadTask.setOptionalFromTSLPutRequest(request);
if (request.isSetFileSize()) {
streamLoadTask.fileSize = request.getFileSize();
}
return streamLoadTask;
}

Expand Down Expand Up @@ -416,3 +426,4 @@ public double getMaxFilterRatio() {
return maxFilterRatio;
}
}

Loading