Skip to content

Commit

Permalink
[FixBug] Fix lose of meta data bug after alter routine load (#6937) (#…
Browse files Browse the repository at this point in the history
…6971) (#7068)

When routine load serializes the load properties(columns/rows separator, column list, partitions, where filter), it stores the sql statement, and get the load properties by parsing the sql for deserialization. But after alter routine load is done, it will only keep the alter statement, this will cause the loss of metadata. We should merge the alter sql with the origin create sql to retain all load properties.
  • Loading branch information
gengjun-git committed Jun 10, 2022
1 parent 8f88ad4 commit fa32cdd
Show file tree
Hide file tree
Showing 6 changed files with 364 additions and 56 deletions.
Expand Up @@ -25,19 +25,29 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.starrocks.analysis.SqlParser;
import com.starrocks.analysis.SqlScanner;
import com.starrocks.common.AnalysisException;
import com.starrocks.common.Config;
import com.starrocks.common.FeNameFormat;
import com.starrocks.common.Pair;
import com.starrocks.common.UserException;
import com.starrocks.common.util.SqlParserUtils;
import com.starrocks.common.util.TimeUtils;
import com.starrocks.common.util.Util;
import com.starrocks.load.RoutineLoadDesc;
import com.starrocks.load.routineload.KafkaProgress;
import com.starrocks.load.routineload.LoadDataSourceType;
import com.starrocks.load.routineload.RoutineLoadJob;
import com.starrocks.qe.ConnectContext;

import com.starrocks.qe.OriginStatement;
import com.starrocks.qe.SessionVariable;
import com.starrocks.qe.SqlModeHelper;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.io.StringReader;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -84,6 +94,8 @@
KAFKA
*/
public class CreateRoutineLoadStmt extends DdlStmt {
private static final Logger LOG = LogManager.getLogger(CreateRoutineLoadStmt.class);

// routine load properties
public static final String DESIRED_CONCURRENT_NUMBER_PROPERTY = "desired_concurrent_number";
// max error number in ten thousand records
Expand Down Expand Up @@ -285,6 +297,33 @@ public void analyze(Analyzer analyzer) throws UserException {
checkDataSourceProperties();
}

public static RoutineLoadDesc getLoadDesc(OriginStatement origStmt, Map<String, String> sessionVariables) {
long sqlMode;
if (sessionVariables != null && sessionVariables.containsKey(SessionVariable.SQL_MODE)) {
sqlMode = Long.parseLong(sessionVariables.get(SessionVariable.SQL_MODE));
} else {
sqlMode = SqlModeHelper.MODE_DEFAULT;
}

// parse the origin stmt to get routine load desc
SqlParser parser = new SqlParser(new SqlScanner(new StringReader(origStmt.originStmt), sqlMode));
try {
StatementBase stmt = SqlParserUtils.getStmt(parser, origStmt.idx);
if (stmt instanceof CreateRoutineLoadStmt) {
return CreateRoutineLoadStmt.
buildLoadDesc(((CreateRoutineLoadStmt) stmt).getLoadPropertyList());
} else if (stmt instanceof AlterRoutineLoadStmt) {
return CreateRoutineLoadStmt.
buildLoadDesc(((AlterRoutineLoadStmt) stmt).getLoadPropertyList());
} else {
throw new IOException("stmt is neither CreateRoutineLoadStmt nor AlterRoutineLoadStmt");
}
} catch (Exception e) {
LOG.error("error happens when parsing create/alter routine load stmt: " + origStmt.originStmt, e);
return null;
}
}

public void checkDBTable(Analyzer analyzer) throws AnalysisException {
labelName.analyze(analyzer);
dbName = labelName.getDbName();
Expand Down
87 changes: 82 additions & 5 deletions fe/fe-core/src/main/java/com/starrocks/load/RoutineLoadDesc.java
Expand Up @@ -22,18 +22,25 @@
package com.starrocks.load;

import com.starrocks.analysis.ColumnSeparator;
import com.starrocks.analysis.ImportColumnDesc;
import com.starrocks.analysis.ImportColumnsStmt;
import com.starrocks.analysis.ImportWhereStmt;
import com.starrocks.analysis.PartitionNames;
import com.starrocks.analysis.RowDelimiter;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

public class RoutineLoadDesc {
private final ColumnSeparator columnSeparator;
private final RowDelimiter rowDelimiter;
private final ImportColumnsStmt columnsInfo;
private final ImportWhereStmt wherePredicate;
private ColumnSeparator columnSeparator;
private RowDelimiter rowDelimiter;
private ImportColumnsStmt columnsInfo;
private ImportWhereStmt wherePredicate;
// nullable
private final PartitionNames partitionNames;
private PartitionNames partitionNames;

public RoutineLoadDesc() {}

public RoutineLoadDesc(ColumnSeparator columnSeparator, RowDelimiter rowDelimiter, ImportColumnsStmt columnsInfo,
ImportWhereStmt wherePredicate, PartitionNames partitionNames) {
Expand All @@ -48,20 +55,90 @@ public ColumnSeparator getColumnSeparator() {
return columnSeparator;
}

public void setColumnSeparator(ColumnSeparator columnSeparator) {
this.columnSeparator = columnSeparator;
}

public RowDelimiter getRowDelimiter() {
return rowDelimiter;
}

public void setRowDelimiter(RowDelimiter rowDelimiter) {
this.rowDelimiter = rowDelimiter;
}

public ImportColumnsStmt getColumnsInfo() {
return columnsInfo;
}

public void setColumnsInfo(ImportColumnsStmt importColumnsStmt) {
this.columnsInfo = importColumnsStmt;
}

public ImportWhereStmt getWherePredicate() {
return wherePredicate;
}

public void setWherePredicate(ImportWhereStmt wherePredicate) {
this.wherePredicate = wherePredicate;
}

// nullable
public PartitionNames getPartitionNames() {
return partitionNames;
}

public void setPartitionNames(PartitionNames partitionNames) {
this.partitionNames = partitionNames;
}

public String toSql() {
List<String> subSQLs = new ArrayList<>();
if (columnSeparator != null) {
subSQLs.add("COLUMNS TERMINATED BY " + columnSeparator.toSql());
}
if (rowDelimiter != null) {
subSQLs.add("ROWS TERMINATED BY " + rowDelimiter.toSql());
}
if (columnsInfo != null) {
String subSQL = "COLUMNS(" +
columnsInfo.getColumns().stream().map(this::columnToString)
.collect(Collectors.joining(", ")) +
")";
subSQLs.add(subSQL);
}
if (partitionNames != null) {
String subSQL = null;
if (partitionNames.isTemp()) {
subSQL = "TEMPORARY PARTITION";
} else {
subSQL = "PARTITION";
}
subSQL += "(" + partitionNames.getPartitionNames().stream().map(this::pack)
.collect(Collectors.joining(", "))
+ ")";
subSQLs.add(subSQL);
}
if (wherePredicate != null) {
subSQLs.add("WHERE " + wherePredicate.getExpr().toSql());
}
return String.join(", ", subSQLs);
}

private String pack(String str) {
return "`" + str + "`";
}

public String columnToString(ImportColumnDesc desc) {
String str = pack(desc.getColumnName());
if (desc.getExpr() != null) {
str += " = " + desc.getExpr().toSql();
}
return str;
}

@Override
public String toString() {
return toSql();
}
}
Expand Up @@ -29,7 +29,6 @@
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.starrocks.analysis.AlterRoutineLoadStmt;
import com.starrocks.analysis.ColumnSeparator;
import com.starrocks.analysis.CreateRoutineLoadStmt;
import com.starrocks.analysis.Expr;
Expand All @@ -39,9 +38,6 @@
import com.starrocks.analysis.PartitionNames;
import com.starrocks.analysis.RoutineLoadDataSourceProperties;
import com.starrocks.analysis.RowDelimiter;
import com.starrocks.analysis.SqlParser;
import com.starrocks.analysis.SqlScanner;
import com.starrocks.analysis.StatementBase;
import com.starrocks.catalog.Catalog;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.OlapTable;
Expand All @@ -60,7 +56,6 @@
import com.starrocks.common.util.DebugUtil;
import com.starrocks.common.util.LogBuilder;
import com.starrocks.common.util.LogKey;
import com.starrocks.common.util.SqlParserUtils;
import com.starrocks.common.util.TimeUtils;
import com.starrocks.load.RoutineLoadDesc;
import com.starrocks.metric.MetricRepo;
Expand All @@ -84,7 +79,6 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -1221,6 +1215,10 @@ public void setOrigStmt(OriginStatement origStmt) {
this.origStmt = origStmt;
}

public OriginStatement getOrigStmt() {
return origStmt;
}

// check the correctness of commit info
protected abstract boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment,
TransactionState txnState,
Expand Down Expand Up @@ -1491,22 +1489,7 @@ public void readFields(DataInput in) throws IOException {
sessionVariables.put(SessionVariable.SQL_MODE, String.valueOf(SqlModeHelper.MODE_DEFAULT));
}

// parse the origin stmt to get routine load desc
SqlParser parser = new SqlParser(new SqlScanner(new StringReader(origStmt.originStmt),
Long.valueOf(sessionVariables.get(SessionVariable.SQL_MODE))));
try {
StatementBase stmt = SqlParserUtils.getStmt(parser, origStmt.idx);
if (stmt instanceof CreateRoutineLoadStmt) {
setRoutineLoadDesc(CreateRoutineLoadStmt.
buildLoadDesc(((CreateRoutineLoadStmt) stmt).getLoadPropertyList()));
} else if (stmt instanceof AlterRoutineLoadStmt) {
setRoutineLoadDesc(CreateRoutineLoadStmt.
buildLoadDesc(((AlterRoutineLoadStmt) stmt).getLoadPropertyList()));
}

} catch (Exception e) {
throw new IOException("error happens when parsing create routine load stmt: " + origStmt, e);
}
setRoutineLoadDesc(CreateRoutineLoadStmt.getLoadDesc(origStmt, sessionVariables));
}

public void modifyJob(RoutineLoadDesc routineLoadDesc,
Expand All @@ -1525,7 +1508,7 @@ public void modifyJob(RoutineLoadDesc routineLoadDesc,
if (dataSourceProperties != null) {
modifyDataSourceProperties(dataSourceProperties);
}
origStmt = originStatement;
mergeLoadDescToOriginStatement(routineLoadDesc);
if (!isReplay) {
AlterRoutineLoadJobOperationLog log = new AlterRoutineLoadJobOperationLog(id,
jobProperties, dataSourceProperties, originStatement);
Expand All @@ -1536,7 +1519,50 @@ public void modifyJob(RoutineLoadDesc routineLoadDesc,
}
}

protected abstract void modifyDataSourceProperties(RoutineLoadDataSourceProperties dataSourceProperties) throws DdlException;
public void mergeLoadDescToOriginStatement(RoutineLoadDesc routineLoadDesc) {
if (origStmt == null) {
return;
}

RoutineLoadDesc originLoadDesc = CreateRoutineLoadStmt.getLoadDesc(origStmt, sessionVariables);
if (originLoadDesc == null) {
originLoadDesc = new RoutineLoadDesc();
}
if (routineLoadDesc.getColumnSeparator() != null) {
originLoadDesc.setColumnSeparator(routineLoadDesc.getColumnSeparator());
}
if (routineLoadDesc.getRowDelimiter() != null) {
originLoadDesc.setRowDelimiter(routineLoadDesc.getRowDelimiter());
}
if (routineLoadDesc.getColumnsInfo() != null) {
originLoadDesc.setColumnsInfo(routineLoadDesc.getColumnsInfo());
}
if (routineLoadDesc.getWherePredicate() != null) {
originLoadDesc.setWherePredicate(routineLoadDesc.getWherePredicate());
}
if (routineLoadDesc.getPartitionNames() != null) {
originLoadDesc.setPartitionNames(routineLoadDesc.getPartitionNames());
}

String tableName = null;
try {
tableName = getTableName();
} catch (Exception e) {
LOG.warn("get table name failed", e);
tableName = "unknown";
}

// we use sql to persist the load properties, so we just put the load properties to sql.
String sql = String.format("CREATE ROUTINE LOAD %s ON %s %s" +
" PROPERTIES (\"desired_concurrent_number\"=\"1\")" +
" FROM KAFKA (\"kafka_topic\" = \"my_topic\")",
name, tableName, originLoadDesc.toSql());
LOG.debug("merge result: {}", sql);
origStmt = new OriginStatement(sql, 0);
}

protected abstract void modifyDataSourceProperties(RoutineLoadDataSourceProperties dataSourceProperties)
throws DdlException;

// for ALTER ROUTINE LOAD
private void modifyCommonJobProperties(Map<String, String> jobProperties) {
Expand Down
Expand Up @@ -29,8 +29,6 @@
import com.starrocks.analysis.CreateRoutineLoadStmt;
import com.starrocks.analysis.PauseRoutineLoadStmt;
import com.starrocks.analysis.ResumeRoutineLoadStmt;
import com.starrocks.analysis.SqlParser;
import com.starrocks.analysis.SqlScanner;
import com.starrocks.analysis.StopRoutineLoadStmt;
import com.starrocks.catalog.Catalog;
import com.starrocks.catalog.Database;
Expand All @@ -45,22 +43,18 @@
import com.starrocks.common.io.Writable;
import com.starrocks.common.util.LogBuilder;
import com.starrocks.common.util.LogKey;
import com.starrocks.common.util.SqlParserUtils;
import com.starrocks.load.RoutineLoadDesc;
import com.starrocks.mysql.privilege.PrivPredicate;
import com.starrocks.persist.AlterRoutineLoadJobOperationLog;
import com.starrocks.persist.RoutineLoadOperation;
import com.starrocks.qe.ConnectContext;
import com.starrocks.qe.SessionVariable;
import com.starrocks.qe.SqlModeHelper;
import com.starrocks.sql.optimizer.statistics.IDictManager;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -571,24 +565,8 @@ public void replayAlterRoutineLoadJob(AlterRoutineLoadJobOperationLog log) throw
// NOTE: we use the origin statement to get the RoutineLoadDesc
RoutineLoadDesc routineLoadDesc = null;
if (log.getOriginStatement() != null) {
long sqlMode;
if (job.getSessionVariables() != null && job.getSessionVariables().containsKey(SessionVariable.SQL_MODE)) {
sqlMode = Long.parseLong(job.getSessionVariables().get(SessionVariable.SQL_MODE));
} else {
sqlMode = SqlModeHelper.MODE_DEFAULT;
}
try {
SqlParser parser = new SqlParser(
new SqlScanner(new StringReader(log.getOriginStatement().originStmt), sqlMode));
AlterRoutineLoadStmt stmt = (AlterRoutineLoadStmt) SqlParserUtils.getStmt(
parser, log.getOriginStatement().idx);
if (stmt.getLoadPropertyList() != null) {
routineLoadDesc = CreateRoutineLoadStmt.buildLoadDesc(stmt.getLoadPropertyList());
}
} catch (Exception e) {
throw new IOException("error happens when parsing alter routine load stmt: "
+ log.getOriginStatement().originStmt, e);
}
routineLoadDesc = CreateRoutineLoadStmt.getLoadDesc(
log.getOriginStatement(), job.getSessionVariables());
}
job.modifyJob(routineLoadDesc, log.getJobProperties(),
log.getDataSourceProperties(), log.getOriginStatement(), true);
Expand Down

0 comments on commit fa32cdd

Please sign in to comment.