Skip to content

Commit

Permalink
Change strategy of incorrect data (#1255)
Browse files Browse the repository at this point in the history
This change adds a load property named strict_mode which is used to prohibit the incorrect data.
When it is set to false, the incorrect data will be loaded by NULL just like before.
When it is set to true, the incorrect data which belongs to a column without expr will be filtered.
The strict_mode is supported in broker load v2 now. It will be supported in stream load later.
  • Loading branch information
EmmyMiao87 authored and imay committed Jun 10, 2019
1 parent 6a54464 commit 5306212
Show file tree
Hide file tree
Showing 11 changed files with 137 additions and 12 deletions.
46 changes: 40 additions & 6 deletions be/src/exec/broker_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ BrokerScanner::BrokerScanner(RuntimeState* state,
// _splittable(params.splittable),
_value_separator(static_cast<char>(params.column_separator)),
_line_delimiter(static_cast<char>(params.line_delimiter)),
_strict_mode(false),
_cur_file_reader(nullptr),
_cur_line_reader(nullptr),
_cur_decompressor(nullptr),
Expand Down Expand Up @@ -117,6 +118,7 @@ Status BrokerScanner::init_expr_ctxes() {
return Status(ss.str());
}

bool has_slot_id_map = _params.__isset.dest_sid_to_src_sid_without_trans;
for (auto slot_desc : _dest_tuple_desc->slots()) {
if (!slot_desc->is_materialized()) {
continue;
Expand All @@ -133,6 +135,20 @@ Status BrokerScanner::init_expr_ctxes() {
RETURN_IF_ERROR(ctx->prepare(_state, *_row_desc.get(), _mem_tracker.get()));
RETURN_IF_ERROR(ctx->open(_state));
_dest_expr_ctx.emplace_back(ctx);
if (has_slot_id_map) {
auto it = _params.dest_sid_to_src_sid_without_trans.find(slot_desc->id());
if (it == std::end(_params.dest_sid_to_src_sid_without_trans)) {
_src_slot_descs_order_by_dest.emplace_back(nullptr);
} else {
auto _src_slot_it = src_slot_desc_map.find(it->second);
if (_src_slot_it == std::end(src_slot_desc_map)) {
std::stringstream ss;
ss << "No src slot " << it->second << " in src slot descs";
return Status(ss.str());
}
_src_slot_descs_order_by_dest.emplace_back(_src_slot_it->second);
}
}
}

return Status::OK;
Expand All @@ -148,6 +164,12 @@ Status BrokerScanner::open() {
_rows_read_counter = ADD_COUNTER(_profile, "RowsRead", TUnit::UNIT);
_read_timer = ADD_TIMER(_profile, "TotalRawReadTime(*)");
_materialize_timer = ADD_TIMER(_profile, "MaterializeTupleTime(*)");
if (_params.__isset.strict_mode) {
_strict_mode = _params.strict_mode;
}
if (_strict_mode && !_params.__isset.dest_sid_to_src_sid_without_trans) {
return Status("Slot map of dest to src must be set in strict mode");
}

return Status::OK;
}
Expand Down Expand Up @@ -585,24 +607,36 @@ bool BrokerScanner::fill_dest_tuple(const Slice& line, Tuple* dest_tuple, MemPoo
continue;
}

ExprContext* ctx = _dest_expr_ctx[ctx_idx++];
int dest_index = ctx_idx++;
ExprContext* ctx = _dest_expr_ctx[dest_index];
void* value = ctx->get_value(_src_tuple_row);
if (value == nullptr) {
if (slot_desc->is_nullable()) {
dest_tuple->set_null(slot_desc->null_indicator_offset());
continue;
} else {
if (_strict_mode && (_src_slot_descs_order_by_dest[dest_index] != nullptr)
&& !_src_tuple->is_null(_src_slot_descs_order_by_dest[dest_index]->null_indicator_offset())) {
std::stringstream error_msg;
error_msg << "column(" << slot_desc->col_name() << ") value is null";
error_msg << "column(" << slot_desc->col_name() << ") value is incorrect "
<< "while strict mode is " << std::boolalpha << _strict_mode;
_state->append_error_msg_to_file(
std::string(line.data, line.size), error_msg.str());
_counter->num_rows_filtered++;
return false;
}
if (!slot_desc->is_nullable()) {
std::stringstream error_msg;
error_msg << "column(" << slot_desc->col_name() << ") value is null "
<< "while columns is not nullable";
_state->append_error_msg_to_file(
std::string(line.data, line.size), error_msg.str());
_counter->num_rows_filtered++;
return false;
}
dest_tuple->set_null(slot_desc->null_indicator_offset());
continue;
}
dest_tuple->set_not_null(slot_desc->null_indicator_offset());
void* slot = dest_tuple->get_slot(slot_desc->tuple_offset());
RawValue::write(value, slot, slot_desc->type(), mem_pool);
continue;
}
return true;
}
Expand Down
4 changes: 4 additions & 0 deletions be/src/exec/broker_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ class BrokerScanner {

char _value_separator;
char _line_delimiter;
bool _strict_mode;

// Reader
FileReader* _cur_file_reader;
Expand Down Expand Up @@ -158,6 +159,9 @@ class BrokerScanner {
// Dest tuple descriptor and dest expr context
const TupleDescriptor* _dest_tuple_desc;
std::vector<ExprContext*> _dest_expr_ctx;
// the map values of dest slot id to src slot desc
// if there is not key of dest slot id in dest_sid_to_src_sid_without_trans, it will be set to nullptr
std::vector<SlotDescriptor*> _src_slot_descs_order_by_dest;

// used to hold current StreamLoadPipe
std::shared_ptr<StreamLoadPipe> _stream_load_pipe;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# broker load 使用说明

## properties

### strict mode
broker load导入可以开启strict mode模式。开启方式为 ```properties ("strict_mode" = "true")``` 。默认的 strict mode为开启。

*注意:strict mode 功能仅在新版本的broker load中有效,如果导入明确指定 ```"version" = "v1"``` 则没有此功能。*

strict mode模式的意思是:对于导入过程中的列类型转换进行严格过滤。严格过滤的策略如下:

对于列类型转换来说,如果 strict\_mode 为true,则错误的数据将被filter。这里的错误数据是指:原始数据并不为空值,在参与列类型转换后结果为空值的这一类数据。

对于导入的某列包含函数变换的,导入的值和函数的结果一致,strict 对其不产生影响。(其中 strftime 等 broker 系统支持的函数也属于这类)。

### strict mode 与 source data 的导入关系

这里以列类型为 int 来举例
注:当表中的列允许导入空值时

source data | source data example | string to int | strict_mode | load_data
------------|---------------------|-----------------|--------------------|---------
空值 | \N | N/A | true or false | NULL
not null | aaa | NULL | true | filtered
not null | aaa | NULL | false | NULL
not null | 1 | 1 | true or false | correct data
12 changes: 12 additions & 0 deletions fe/src/main/java/org/apache/doris/analysis/LoadStmt.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class LoadStmt extends DdlStmt {
public static final String EXEC_MEM_LIMIT = "exec_mem_limit";
public static final String CLUSTER_PROPERTY = "cluster";
private static final String VERSION = "version";
public static final String STRICT_MODE = "strict_mode";

// for load data from Baidu Object Store(BOS)
public static final String BOS_ENDPOINT = "bos_endpoint";
Expand All @@ -81,6 +82,7 @@ public class LoadStmt extends DdlStmt {
private String user;

private String version = "v1";
private boolean strictMode = true;

// properties set
private final static ImmutableSet<String> PROPERTIES_SET = new ImmutableSet.Builder<String>()
Expand All @@ -90,6 +92,7 @@ public class LoadStmt extends DdlStmt {
.add(EXEC_MEM_LIMIT)
.add(CLUSTER_PROPERTY)
// .add(VERSION)
.add(STRICT_MODE)
.build();

public LoadStmt(LabelName label, List<DataDescription> dataDescriptions,
Expand Down Expand Up @@ -185,6 +188,15 @@ public static void checkProperties(Map<String, String> properties) throws DdlExc
}
}

// strict mode
final String strictModeProperty = properties.get(STRICT_MODE);
if (strictModeProperty != null) {
if (!strictModeProperty.equalsIgnoreCase("true")
&& !strictModeProperty.equalsIgnoreCase("false")) {
throw new DdlException(STRICT_MODE + " is not a boolean");
}
}

}

private void analyzeVersion() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ private void createLoadingTask(Database db, BrokerPendingTaskAttachment attachme
// Generate loading task and init the plan of task
LoadLoadingTask task = new LoadLoadingTask(db, table, brokerDesc,
entry.getValue(), getDeadlineMs(), execMemLimit,
transactionId, this);
strictMode, transactionId, this);
task.init(attachment.getFileStatusByTable(tableId),
attachment.getFileNumByTable(tableId));
// Add tasks into list and pool
Expand Down
12 changes: 12 additions & 0 deletions fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
Expand Down Expand Up @@ -90,6 +91,7 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
protected long execMemLimit = 2147483648L; // 2GB;
protected double maxFilterRatio = 0;
protected boolean deleteFlag = false;
protected boolean strictMode = true;

protected long createTimestamp = System.currentTimeMillis();
protected long loadStartTimestamp = -1;
Expand Down Expand Up @@ -222,6 +224,10 @@ protected void setJobProperties(Map<String, String> properties) throws DdlExcept
throw new DdlException("Execute memory limit is not Long", e);
}
}

if (properties.containsKey(LoadStmt.STRICT_MODE)) {
strictMode = Boolean.valueOf(properties.get(LoadStmt.STRICT_MODE));
}
}
}

Expand Down Expand Up @@ -705,6 +711,9 @@ public void write(DataOutput out) throws IOException {
}
out.writeInt(progress);
loadingStatus.write(out);
// if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_52) {
// out.writeBoolean(strictMode);
// }
}

@Override
Expand All @@ -731,5 +740,8 @@ public void readFields(DataInput in) throws IOException {
}
progress = in.readInt();
loadingStatus.readFields(in);
// if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_52) {
// strictMode = in.readBoolean();
// }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class LoadLoadingTask extends LoadTask {
private final List<BrokerFileGroup> fileGroups;
private final long jobDeadlineMs;
private final long execMemLimit;
private final boolean strictMode;
private final long txnId;

private LoadingTaskPlanner planner;
Expand All @@ -61,21 +62,23 @@ public class LoadLoadingTask extends LoadTask {

public LoadLoadingTask(Database db, OlapTable table,
BrokerDesc brokerDesc, List<BrokerFileGroup> fileGroups,
long jobDeadlineMs, long execMemLimit, long txnId, LoadTaskCallback callback) {
long jobDeadlineMs, long execMemLimit, boolean strictMode,
long txnId, LoadTaskCallback callback) {
super(callback);
this.db = db;
this.table = table;
this.brokerDesc = brokerDesc;
this.fileGroups = fileGroups;
this.jobDeadlineMs = jobDeadlineMs;
this.execMemLimit = execMemLimit;
this.strictMode = strictMode;
this.txnId = txnId;
this.failMsg = new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL);
this.retryTime = 3;
}

public void init(List<List<TBrokerFileStatus>> fileStatusList, int fileNum) throws UserException {
planner = new LoadingTaskPlanner(txnId, db.getId(), table, brokerDesc, fileGroups);
planner = new LoadingTaskPlanner(txnId, db.getId(), table, brokerDesc, fileGroups, strictMode);
planner.plan(fileStatusList, fileNum);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class LoadingTaskPlanner {
private final OlapTable table;
private final BrokerDesc brokerDesc;
private final List<BrokerFileGroup> fileGroups;
private final boolean strictMode;

// Something useful
private Analyzer analyzer = new Analyzer(Catalog.getInstance(), null);
Expand All @@ -75,12 +76,14 @@ public class LoadingTaskPlanner {
private int nextNodeId = 0;

public LoadingTaskPlanner(long txnId, long dbId, OlapTable table,
BrokerDesc brokerDesc, List<BrokerFileGroup> brokerFileGroups) {
BrokerDesc brokerDesc, List<BrokerFileGroup> brokerFileGroups,
boolean strictMode) {
this.txnId = txnId;
this.dbId = dbId;
this.table = table;
this.brokerDesc = brokerDesc;
this.fileGroups = brokerFileGroups;
this.strictMode = strictMode;
}

public void plan(List<List<TBrokerFileStatus>> fileStatusesList, int filesAdded) throws UserException {
Expand All @@ -103,7 +106,7 @@ public void plan(List<List<TBrokerFileStatus>> fileStatusesList, int filesAdded)
// 1. Broker scan node
BrokerScanNode scanNode = new BrokerScanNode(new PlanNodeId(nextNodeId++), tupleDesc, "BrokerScanNode",
fileStatusesList, filesAdded);
scanNode.setLoadInfo(table, brokerDesc, fileGroups);
scanNode.setLoadInfo(table, brokerDesc, fileGroups, strictMode);
scanNode.init(analyzer);
scanNode.finalize(analyzer);
scanNodes.add(scanNode);
Expand Down
21 changes: 20 additions & 1 deletion fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -77,6 +78,7 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;

// Broker scan node
Expand Down Expand Up @@ -111,6 +113,7 @@ public int compare(TBrokerFileStatus o1, TBrokerFileStatus o2) {
private Table targetTable;
private BrokerDesc brokerDesc;
private List<BrokerFileGroup> fileGroups;
private boolean strictMode;

private List<List<TBrokerFileStatus>> fileStatusesList;
// file num
Expand Down Expand Up @@ -179,6 +182,7 @@ private boolean isLoad() {
return desc.getTable() == null;
}

@Deprecated
public void setLoadInfo(Table targetTable,
BrokerDesc brokerDesc,
List<BrokerFileGroup> fileGroups) {
Expand All @@ -187,6 +191,16 @@ public void setLoadInfo(Table targetTable,
this.fileGroups = fileGroups;
}

public void setLoadInfo(Table targetTable,
BrokerDesc brokerDesc,
List<BrokerFileGroup> fileGroups,
boolean strictMode) {
this.targetTable = targetTable;
this.brokerDesc = brokerDesc;
this.fileGroups = fileGroups;
this.strictMode = strictMode;
}

private void createPartitionInfos() throws AnalysisException {
if (partitionInfos != null) {
return;
Expand Down Expand Up @@ -313,6 +327,7 @@ private void initParams(ParamCreateContext context) throws AnalysisException, Us
BrokerFileGroup fileGroup = context.fileGroup;
params.setColumn_separator(fileGroup.getValueSeparator().getBytes(Charset.forName("UTF-8"))[0]);
params.setLine_delimiter(fileGroup.getLineDelimiter().getBytes(Charset.forName("UTF-8"))[0]);
params.setStrict_mode(strictMode);

// Parse partition information
List<Long> partitionIds = fileGroup.getPartitionIds();
Expand Down Expand Up @@ -367,6 +382,7 @@ private void initParams(ParamCreateContext context) throws AnalysisException, Us
private void finalizeParams(ParamCreateContext context) throws UserException, AnalysisException {
Map<String, SlotDescriptor> slotDescByName = context.slotDescByName;
Map<String, Expr> exprMap = context.exprMap;
Map<Integer, Integer> destSidToSrcSidWithoutTrans = Maps.newHashMap();
// Analyze expr map
if (exprMap != null) {
for (Map.Entry<String, Expr> entry : exprMap.entrySet()) {
Expand Down Expand Up @@ -402,6 +418,7 @@ private void finalizeParams(ParamCreateContext context) throws UserException, An
if (expr == null) {
SlotDescriptor srcSlotDesc = slotDescByName.get(destSlotDesc.getColumn().getName());
if (srcSlotDesc != null) {
destSidToSrcSidWithoutTrans.put(destSlotDesc.getId().asInt(), srcSlotDesc.getId().asInt());
// If dest is allow null, we set source to nullable
if (destSlotDesc.getColumn().isAllowNull()) {
srcSlotDesc.setIsNullable(true);
Expand All @@ -416,7 +433,7 @@ private void finalizeParams(ParamCreateContext context) throws UserException, An
expr = NullLiteral.create(column.getType());
} else {
throw new UserException("Unknown slot ref("
+ destSlotDesc.getColumn().getName() + ") in source file");
+ destSlotDesc.getColumn().getName() + ") in source file");
}
}
}
Expand All @@ -429,7 +446,9 @@ private void finalizeParams(ParamCreateContext context) throws UserException, An
expr = castToSlot(destSlotDesc, expr);
context.params.putToExpr_of_dest_slot(destSlotDesc.getId().asInt(), expr.treeToThrift());
}
context.params.setDest_sid_to_src_sid_without_trans(destSidToSrcSidWithoutTrans);
context.params.setDest_tuple_id(desc.getId().asInt());
context.params.setStrict_mode(strictMode);
// Need re compute memory layout after set some slot descriptor to nullable
context.tupleDescriptor.computeMemLayout();
}
Expand Down
Loading

0 comments on commit 5306212

Please sign in to comment.