Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change strategy of incorrect data #1255

Merged
merged 6 commits into from
Jun 10, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 36 additions & 6 deletions be/src/exec/broker_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ BrokerScanner::BrokerScanner(RuntimeState* state,
// _splittable(params.splittable),
_value_separator(static_cast<char>(params.column_separator)),
_line_delimiter(static_cast<char>(params.line_delimiter)),
_strict_mode(false),
_cur_file_reader(nullptr),
_cur_line_reader(nullptr),
_cur_decompressor(nullptr),
Expand Down Expand Up @@ -117,6 +118,7 @@ Status BrokerScanner::init_expr_ctxes() {
return Status(ss.str());
}

bool has_transform_slot_ids = _params.__isset.transform_slot_ids;
for (auto slot_desc : _dest_tuple_desc->slots()) {
if (!slot_desc->is_materialized()) {
continue;
Expand All @@ -133,6 +135,15 @@ Status BrokerScanner::init_expr_ctxes() {
RETURN_IF_ERROR(ctx->prepare(_state, *_row_desc.get(), _mem_tracker.get()));
RETURN_IF_ERROR(ctx->open(_state));
_dest_expr_ctx.emplace_back(ctx);
if (has_transform_slot_ids) {
auto it = _params.transform_slot_ids.find(slot_desc->id());
if (it == std::end(_params.transform_slot_ids)) {
_has_expr_columns.emplace_back(false);
}
else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

} else {

_has_expr_columns.emplace_back(true);
}
}
}

return Status::OK;
Expand All @@ -148,6 +159,12 @@ Status BrokerScanner::open() {
_rows_read_counter = ADD_COUNTER(_profile, "RowsRead", TUnit::UNIT);
_read_timer = ADD_TIMER(_profile, "TotalRawReadTime(*)");
_materialize_timer = ADD_TIMER(_profile, "MaterializeTupleTime(*)");
if (_params.__isset.strict_mode) {
_strict_mode = _params.strict_mode;
}
if (_strict_mode && !_params.__isset.transform_slot_ids) {
return Status("Expr column list must be set in strict mode");
}

return Status::OK;
}
Expand Down Expand Up @@ -585,24 +602,37 @@ bool BrokerScanner::fill_dest_tuple(const Slice& line, Tuple* dest_tuple, MemPoo
continue;
}

ExprContext* ctx = _dest_expr_ctx[ctx_idx++];
ExprContext* ctx = _dest_expr_ctx[ctx_idx];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cid = ctx_idx++;

void* value = ctx->get_value(_src_tuple_row);
if (value == nullptr) {
if (slot_desc->is_nullable()) {
dest_tuple->set_null(slot_desc->null_indicator_offset());
continue;
} else {
if (_strict_mode && !_src_tuple->is_null(slot_desc->null_indicator_offset())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

slot_desc is dest tuple descriptor, what you need is a src tuple descriptor

&& !_has_expr_columns[ctx_idx]) {
std::stringstream error_msg;
error_msg << "column(" << slot_desc->col_name() << ") value is null";
error_msg << "column(" << slot_desc->col_name() << ") value is incorrect "
<< "while strict mode is " << std::boolalpha << _strict_mode;
_state->append_error_msg_to_file(
std::string(line.data, line.size), error_msg.str());
_counter->num_rows_filtered++;
return false;
}
if (!slot_desc->is_nullable()) {
std::stringstream error_msg;
error_msg << "column(" << slot_desc->col_name() << ") value is null "
<< "while columns is not nullable";
_state->append_error_msg_to_file(
std::string(line.data, line.size), error_msg.str());
_counter->num_rows_filtered++;
return false;
}
dest_tuple->set_null(slot_desc->null_indicator_offset());
ctx_idx++;
continue;
}
dest_tuple->set_not_null(slot_desc->null_indicator_offset());
void* slot = dest_tuple->get_slot(slot_desc->tuple_offset());
RawValue::write(value, slot, slot_desc->type(), mem_pool);
ctx_idx++;
continue;
}
return true;
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/broker_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ class BrokerScanner {

char _value_separator;
char _line_delimiter;
bool _strict_mode;

// Reader
FileReader* _cur_file_reader;
Expand Down Expand Up @@ -158,6 +159,7 @@ class BrokerScanner {
// Dest tuple descriptor and dest expr context
const TupleDescriptor* _dest_tuple_desc;
std::vector<ExprContext*> _dest_expr_ctx;
std::vector<bool> _has_expr_columns;

// used to hold current StreamLoadPipe
std::shared_ptr<StreamLoadPipe> _stream_load_pipe;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# broker load 使用说明

## properties

### strict mode
broker load导入可以开启strict mode模式。开启方式为 ```properties ("strict_mode" = "true")``` 。默认的 strict mode为开启。

*注意:strict mode 功能仅在新版本的broker load中有效,如果导入明确指定 ```"version" = "v1"``` 则没有此功能。*

strict mode模式的意思是:对于导入过程中的列类型转换进行严格过滤。严格过滤的策略如下:

对于列类型转换来说,如果 strict\_mode 为true,则错误的数据将被filter。这里的错误数据是指:原始数据并不为空值,在参与列类型转换后结果为空值的这一类数据。

对于导入的某列包含函数变换的,导入的值和函数的结果一致,strict 对其不产生影响。(其中 strftime 等 broker 系统支持的函数也属于这类)。

### strict mode 与 source data 的导入关系

这里以列类型为 int 来举例
注:当表中的列允许导入空值时

source data | source data example | string to int | strict_mode | load_data
------------|---------------------|-----------------|--------------------|---------
空值 | \N | N/A | true or false | NULL
not null | aaa | NULL | true | filtered
not null | aaa | NULL | false | NULL
not null | 1 | 1 | true or false | correct data
12 changes: 12 additions & 0 deletions fe/src/main/java/org/apache/doris/analysis/LoadStmt.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class LoadStmt extends DdlStmt {
public static final String EXEC_MEM_LIMIT = "exec_mem_limit";
public static final String CLUSTER_PROPERTY = "cluster";
private static final String VERSION = "version";
public static final String STRICT_MODE = "strict_mode";

// for load data from Baidu Object Store(BOS)
public static final String BOS_ENDPOINT = "bos_endpoint";
Expand All @@ -81,6 +82,7 @@ public class LoadStmt extends DdlStmt {
private String user;

private String version = "v1";
private boolean strictMode = true;

// properties set
private final static ImmutableSet<String> PROPERTIES_SET = new ImmutableSet.Builder<String>()
Expand All @@ -90,6 +92,7 @@ public class LoadStmt extends DdlStmt {
.add(EXEC_MEM_LIMIT)
.add(CLUSTER_PROPERTY)
// .add(VERSION)
.add(STRICT_MODE)
.build();

public LoadStmt(LabelName label, List<DataDescription> dataDescriptions,
Expand Down Expand Up @@ -185,6 +188,15 @@ public static void checkProperties(Map<String, String> properties) throws DdlExc
}
}

// strict mode
final String strictModeProperty = properties.get(STRICT_MODE);
if (strictModeProperty != null) {
if (!strictModeProperty.equalsIgnoreCase("true")
&& !strictModeProperty.equalsIgnoreCase("false")) {
throw new DdlException(STRICT_MODE + " is not a boolean");
}
}

}

private void analyzeVersion() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ private void createLoadingTask(Database db, BrokerPendingTaskAttachment attachme
// Generate loading task and init the plan of task
LoadLoadingTask task = new LoadLoadingTask(db, table, brokerDesc,
entry.getValue(), getDeadlineMs(), execMemLimit,
transactionId, this);
strictMode, transactionId, this);
task.init(attachment.getFileStatusByTable(tableId),
attachment.getFileNumByTable(tableId));
// Add tasks into list and pool
Expand Down
12 changes: 12 additions & 0 deletions fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
Expand Down Expand Up @@ -90,6 +91,7 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
protected long execMemLimit = 2147483648L; // 2GB;
protected double maxFilterRatio = 0;
protected boolean deleteFlag = false;
protected boolean strictMode = true;

protected long createTimestamp = System.currentTimeMillis();
protected long loadStartTimestamp = -1;
Expand Down Expand Up @@ -222,6 +224,10 @@ protected void setJobProperties(Map<String, String> properties) throws DdlExcept
throw new DdlException("Execute memory limit is not Long", e);
}
}

if (properties.containsKey(LoadStmt.STRICT_MODE)) {
strictMode = Boolean.valueOf(properties.get(LoadStmt.STRICT_MODE));
}
}
}

Expand Down Expand Up @@ -686,6 +692,9 @@ public void write(DataOutput out) throws IOException {
}
out.writeInt(progress);
loadingStatus.write(out);
// if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_52) {
// out.writeBoolean(strictMode);
// }
}

@Override
Expand All @@ -712,5 +721,8 @@ public void readFields(DataInput in) throws IOException {
}
progress = in.readInt();
loadingStatus.readFields(in);
// if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_52) {
// strictMode = in.readBoolean();
// }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class LoadLoadingTask extends LoadTask {
private final List<BrokerFileGroup> fileGroups;
private final long jobDeadlineMs;
private final long execMemLimit;
private final boolean strictMode;
private final long txnId;

private LoadingTaskPlanner planner;
Expand All @@ -61,21 +62,23 @@ public class LoadLoadingTask extends LoadTask {

public LoadLoadingTask(Database db, OlapTable table,
BrokerDesc brokerDesc, List<BrokerFileGroup> fileGroups,
long jobDeadlineMs, long execMemLimit, long txnId, LoadTaskCallback callback) {
long jobDeadlineMs, long execMemLimit, boolean strictMode,
long txnId, LoadTaskCallback callback) {
super(callback);
this.db = db;
this.table = table;
this.brokerDesc = brokerDesc;
this.fileGroups = fileGroups;
this.jobDeadlineMs = jobDeadlineMs;
this.execMemLimit = execMemLimit;
this.strictMode = strictMode;
this.txnId = txnId;
this.failMsg = new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL);
this.retryTime = 3;
}

public void init(List<List<TBrokerFileStatus>> fileStatusList, int fileNum) throws UserException {
planner = new LoadingTaskPlanner(txnId, db.getId(), table, brokerDesc, fileGroups);
planner = new LoadingTaskPlanner(txnId, db.getId(), table, brokerDesc, fileGroups, strictMode);
planner.plan(fileStatusList, fileNum);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class LoadingTaskPlanner {
private final OlapTable table;
private final BrokerDesc brokerDesc;
private final List<BrokerFileGroup> fileGroups;
private final boolean strictMode;

// Something useful
private Analyzer analyzer = new Analyzer(Catalog.getInstance(), null);
Expand All @@ -75,12 +76,14 @@ public class LoadingTaskPlanner {
private int nextNodeId = 0;

public LoadingTaskPlanner(long txnId, long dbId, OlapTable table,
BrokerDesc brokerDesc, List<BrokerFileGroup> brokerFileGroups) {
BrokerDesc brokerDesc, List<BrokerFileGroup> brokerFileGroups,
boolean strictMode) {
this.txnId = txnId;
this.dbId = dbId;
this.table = table;
this.brokerDesc = brokerDesc;
this.fileGroups = brokerFileGroups;
this.strictMode = strictMode;
}

public void plan(List<List<TBrokerFileStatus>> fileStatusesList, int filesAdded) throws UserException {
Expand All @@ -103,7 +106,7 @@ public void plan(List<List<TBrokerFileStatus>> fileStatusesList, int filesAdded)
// 1. Broker scan node
BrokerScanNode scanNode = new BrokerScanNode(new PlanNodeId(nextNodeId++), tupleDesc, "BrokerScanNode",
fileStatusesList, filesAdded);
scanNode.setLoadInfo(table, brokerDesc, fileGroups);
scanNode.setLoadInfo(table, brokerDesc, fileGroups, strictMode);
scanNode.init(analyzer);
scanNode.finalize(analyzer);
scanNodes.add(scanNode);
Expand Down
22 changes: 21 additions & 1 deletion fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -77,6 +78,7 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;

// Broker scan node
Expand Down Expand Up @@ -111,6 +113,7 @@ public int compare(TBrokerFileStatus o1, TBrokerFileStatus o2) {
private Table targetTable;
private BrokerDesc brokerDesc;
private List<BrokerFileGroup> fileGroups;
private boolean strictMode;

private List<List<TBrokerFileStatus>> fileStatusesList;
// file num
Expand Down Expand Up @@ -179,6 +182,7 @@ private boolean isLoad() {
return desc.getTable() == null;
}

@Deprecated
public void setLoadInfo(Table targetTable,
BrokerDesc brokerDesc,
List<BrokerFileGroup> fileGroups) {
Expand All @@ -187,6 +191,16 @@ public void setLoadInfo(Table targetTable,
this.fileGroups = fileGroups;
}

public void setLoadInfo(Table targetTable,
BrokerDesc brokerDesc,
List<BrokerFileGroup> fileGroups,
boolean strictMode) {
this.targetTable = targetTable;
this.brokerDesc = brokerDesc;
this.fileGroups = fileGroups;
this.strictMode = strictMode;
}

private void createPartitionInfos() throws AnalysisException {
if (partitionInfos != null) {
return;
Expand Down Expand Up @@ -313,6 +327,7 @@ private void initParams(ParamCreateContext context) throws AnalysisException, Us
BrokerFileGroup fileGroup = context.fileGroup;
params.setColumn_separator(fileGroup.getValueSeparator().getBytes(Charset.forName("UTF-8"))[0]);
params.setLine_delimiter(fileGroup.getLineDelimiter().getBytes(Charset.forName("UTF-8"))[0]);
params.setStrict_mode(strictMode);

// Parse partition information
List<Long> partitionIds = fileGroup.getPartitionIds();
Expand Down Expand Up @@ -367,6 +382,7 @@ private void initParams(ParamCreateContext context) throws AnalysisException, Us
private void finalizeParams(ParamCreateContext context) throws UserException, AnalysisException {
Map<String, SlotDescriptor> slotDescByName = context.slotDescByName;
Map<String, Expr> exprMap = context.exprMap;
Set<Integer> transformSlotIds = Sets.newHashSet();
// Analyze expr map
if (exprMap != null) {
for (Map.Entry<String, Expr> entry : exprMap.entrySet()) {
Expand Down Expand Up @@ -416,10 +432,12 @@ private void finalizeParams(ParamCreateContext context) throws UserException, An
expr = NullLiteral.create(column.getType());
} else {
throw new UserException("Unknown slot ref("
+ destSlotDesc.getColumn().getName() + ") in source file");
+ destSlotDesc.getColumn().getName() + ") in source file");
}
}
}
} else {
transformSlotIds.add(destSlotDesc.getId().asInt());
}

if (isNegative && destSlotDesc.getColumn().getAggregationType() == AggregateType.SUM) {
Expand All @@ -429,7 +447,9 @@ private void finalizeParams(ParamCreateContext context) throws UserException, An
expr = castToSlot(destSlotDesc, expr);
context.params.putToExpr_of_dest_slot(destSlotDesc.getId().asInt(), expr.treeToThrift());
}
context.params.setTransform_slot_ids(transformSlotIds);
context.params.setDest_tuple_id(desc.getId().asInt());
context.params.setStrict_mode(strictMode);
// Need re compute memory layout after set some slot descriptor to nullable
context.tupleDescriptor.computeMemLayout();
}
Expand Down
Loading