Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Cherry-Pick-2.2][FixBug] Fix lose of meta data bug after alter routine load (#6937) #7068

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -25,19 +25,29 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.starrocks.analysis.SqlParser;
import com.starrocks.analysis.SqlScanner;
import com.starrocks.common.AnalysisException;
import com.starrocks.common.Config;
import com.starrocks.common.FeNameFormat;
import com.starrocks.common.Pair;
import com.starrocks.common.UserException;
import com.starrocks.common.util.SqlParserUtils;
import com.starrocks.common.util.TimeUtils;
import com.starrocks.common.util.Util;
import com.starrocks.load.RoutineLoadDesc;
import com.starrocks.load.routineload.KafkaProgress;
import com.starrocks.load.routineload.LoadDataSourceType;
import com.starrocks.load.routineload.RoutineLoadJob;
import com.starrocks.qe.ConnectContext;

import com.starrocks.qe.OriginStatement;
import com.starrocks.qe.SessionVariable;
import com.starrocks.qe.SqlModeHelper;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.io.StringReader;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -84,6 +94,8 @@
KAFKA
*/
public class CreateRoutineLoadStmt extends DdlStmt {
private static final Logger LOG = LogManager.getLogger(CreateRoutineLoadStmt.class);

// routine load properties
public static final String DESIRED_CONCURRENT_NUMBER_PROPERTY = "desired_concurrent_number";
// max error number in ten thousand records
Expand Down Expand Up @@ -285,6 +297,33 @@ public void analyze(Analyzer analyzer) throws UserException {
checkDataSourceProperties();
}

public static RoutineLoadDesc getLoadDesc(OriginStatement origStmt, Map<String, String> sessionVariables) {
long sqlMode;
if (sessionVariables != null && sessionVariables.containsKey(SessionVariable.SQL_MODE)) {
sqlMode = Long.parseLong(sessionVariables.get(SessionVariable.SQL_MODE));
} else {
sqlMode = SqlModeHelper.MODE_DEFAULT;
}

// parse the origin stmt to get routine load desc
SqlParser parser = new SqlParser(new SqlScanner(new StringReader(origStmt.originStmt), sqlMode));
try {
StatementBase stmt = SqlParserUtils.getStmt(parser, origStmt.idx);
if (stmt instanceof CreateRoutineLoadStmt) {
return CreateRoutineLoadStmt.
buildLoadDesc(((CreateRoutineLoadStmt) stmt).getLoadPropertyList());
} else if (stmt instanceof AlterRoutineLoadStmt) {
return CreateRoutineLoadStmt.
buildLoadDesc(((AlterRoutineLoadStmt) stmt).getLoadPropertyList());
} else {
throw new IOException("stmt is neither CreateRoutineLoadStmt nor AlterRoutineLoadStmt");
}
} catch (Exception e) {
LOG.error("error happens when parsing create/alter routine load stmt: " + origStmt.originStmt, e);
return null;
}
}

public void checkDBTable(Analyzer analyzer) throws AnalysisException {
labelName.analyze(analyzer);
dbName = labelName.getDbName();
Expand Down
87 changes: 82 additions & 5 deletions fe/fe-core/src/main/java/com/starrocks/load/RoutineLoadDesc.java
Expand Up @@ -22,18 +22,25 @@
package com.starrocks.load;

import com.starrocks.analysis.ColumnSeparator;
import com.starrocks.analysis.ImportColumnDesc;
import com.starrocks.analysis.ImportColumnsStmt;
import com.starrocks.analysis.ImportWhereStmt;
import com.starrocks.analysis.PartitionNames;
import com.starrocks.analysis.RowDelimiter;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

public class RoutineLoadDesc {
private final ColumnSeparator columnSeparator;
private final RowDelimiter rowDelimiter;
private final ImportColumnsStmt columnsInfo;
private final ImportWhereStmt wherePredicate;
private ColumnSeparator columnSeparator;
private RowDelimiter rowDelimiter;
private ImportColumnsStmt columnsInfo;
private ImportWhereStmt wherePredicate;
// nullable
private final PartitionNames partitionNames;
private PartitionNames partitionNames;

public RoutineLoadDesc() {}

public RoutineLoadDesc(ColumnSeparator columnSeparator, RowDelimiter rowDelimiter, ImportColumnsStmt columnsInfo,
ImportWhereStmt wherePredicate, PartitionNames partitionNames) {
Expand All @@ -48,20 +55,90 @@ public ColumnSeparator getColumnSeparator() {
return columnSeparator;
}

public void setColumnSeparator(ColumnSeparator columnSeparator) {
this.columnSeparator = columnSeparator;
}

public RowDelimiter getRowDelimiter() {
return rowDelimiter;
}

public void setRowDelimiter(RowDelimiter rowDelimiter) {
this.rowDelimiter = rowDelimiter;
}

public ImportColumnsStmt getColumnsInfo() {
return columnsInfo;
}

public void setColumnsInfo(ImportColumnsStmt importColumnsStmt) {
this.columnsInfo = importColumnsStmt;
}

public ImportWhereStmt getWherePredicate() {
return wherePredicate;
}

public void setWherePredicate(ImportWhereStmt wherePredicate) {
this.wherePredicate = wherePredicate;
}

// nullable
public PartitionNames getPartitionNames() {
return partitionNames;
}

public void setPartitionNames(PartitionNames partitionNames) {
this.partitionNames = partitionNames;
}

public String toSql() {
List<String> subSQLs = new ArrayList<>();
if (columnSeparator != null) {
subSQLs.add("COLUMNS TERMINATED BY " + columnSeparator.toSql());
}
if (rowDelimiter != null) {
subSQLs.add("ROWS TERMINATED BY " + rowDelimiter.toSql());
}
if (columnsInfo != null) {
String subSQL = "COLUMNS(" +
columnsInfo.getColumns().stream().map(this::columnToString)
.collect(Collectors.joining(", ")) +
")";
subSQLs.add(subSQL);
}
if (partitionNames != null) {
String subSQL = null;
if (partitionNames.isTemp()) {
subSQL = "TEMPORARY PARTITION";
} else {
subSQL = "PARTITION";
}
subSQL += "(" + partitionNames.getPartitionNames().stream().map(this::pack)
.collect(Collectors.joining(", "))
+ ")";
subSQLs.add(subSQL);
}
if (wherePredicate != null) {
subSQLs.add("WHERE " + wherePredicate.getExpr().toSql());
}
return String.join(", ", subSQLs);
}

private String pack(String str) {
return "`" + str + "`";
}

public String columnToString(ImportColumnDesc desc) {
String str = pack(desc.getColumnName());
if (desc.getExpr() != null) {
str += " = " + desc.getExpr().toSql();
}
return str;
}

@Override
public String toString() {
return toSql();
}
}
Expand Up @@ -29,7 +29,6 @@
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.starrocks.analysis.AlterRoutineLoadStmt;
import com.starrocks.analysis.ColumnSeparator;
import com.starrocks.analysis.CreateRoutineLoadStmt;
import com.starrocks.analysis.Expr;
Expand All @@ -39,9 +38,6 @@
import com.starrocks.analysis.PartitionNames;
import com.starrocks.analysis.RoutineLoadDataSourceProperties;
import com.starrocks.analysis.RowDelimiter;
import com.starrocks.analysis.SqlParser;
import com.starrocks.analysis.SqlScanner;
import com.starrocks.analysis.StatementBase;
import com.starrocks.catalog.Catalog;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.OlapTable;
Expand All @@ -60,7 +56,6 @@
import com.starrocks.common.util.DebugUtil;
import com.starrocks.common.util.LogBuilder;
import com.starrocks.common.util.LogKey;
import com.starrocks.common.util.SqlParserUtils;
import com.starrocks.common.util.TimeUtils;
import com.starrocks.load.RoutineLoadDesc;
import com.starrocks.metric.MetricRepo;
Expand All @@ -84,7 +79,6 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -1221,6 +1215,10 @@ public void setOrigStmt(OriginStatement origStmt) {
this.origStmt = origStmt;
}

public OriginStatement getOrigStmt() {
return origStmt;
}

// check the correctness of commit info
protected abstract boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment,
TransactionState txnState,
Expand Down Expand Up @@ -1491,22 +1489,7 @@ public void readFields(DataInput in) throws IOException {
sessionVariables.put(SessionVariable.SQL_MODE, String.valueOf(SqlModeHelper.MODE_DEFAULT));
}

// parse the origin stmt to get routine load desc
SqlParser parser = new SqlParser(new SqlScanner(new StringReader(origStmt.originStmt),
Long.valueOf(sessionVariables.get(SessionVariable.SQL_MODE))));
try {
StatementBase stmt = SqlParserUtils.getStmt(parser, origStmt.idx);
if (stmt instanceof CreateRoutineLoadStmt) {
setRoutineLoadDesc(CreateRoutineLoadStmt.
buildLoadDesc(((CreateRoutineLoadStmt) stmt).getLoadPropertyList()));
} else if (stmt instanceof AlterRoutineLoadStmt) {
setRoutineLoadDesc(CreateRoutineLoadStmt.
buildLoadDesc(((AlterRoutineLoadStmt) stmt).getLoadPropertyList()));
}

} catch (Exception e) {
throw new IOException("error happens when parsing create routine load stmt: " + origStmt, e);
}
setRoutineLoadDesc(CreateRoutineLoadStmt.getLoadDesc(origStmt, sessionVariables));
}

public void modifyJob(RoutineLoadDesc routineLoadDesc,
Expand All @@ -1525,7 +1508,7 @@ public void modifyJob(RoutineLoadDesc routineLoadDesc,
if (dataSourceProperties != null) {
modifyDataSourceProperties(dataSourceProperties);
}
origStmt = originStatement;
mergeLoadDescToOriginStatement(routineLoadDesc);
if (!isReplay) {
AlterRoutineLoadJobOperationLog log = new AlterRoutineLoadJobOperationLog(id,
jobProperties, dataSourceProperties, originStatement);
Expand All @@ -1536,7 +1519,50 @@ public void modifyJob(RoutineLoadDesc routineLoadDesc,
}
}

protected abstract void modifyDataSourceProperties(RoutineLoadDataSourceProperties dataSourceProperties) throws DdlException;
public void mergeLoadDescToOriginStatement(RoutineLoadDesc routineLoadDesc) {
if (origStmt == null) {
return;
}

RoutineLoadDesc originLoadDesc = CreateRoutineLoadStmt.getLoadDesc(origStmt, sessionVariables);
if (originLoadDesc == null) {
originLoadDesc = new RoutineLoadDesc();
}
if (routineLoadDesc.getColumnSeparator() != null) {
originLoadDesc.setColumnSeparator(routineLoadDesc.getColumnSeparator());
}
if (routineLoadDesc.getRowDelimiter() != null) {
originLoadDesc.setRowDelimiter(routineLoadDesc.getRowDelimiter());
}
if (routineLoadDesc.getColumnsInfo() != null) {
originLoadDesc.setColumnsInfo(routineLoadDesc.getColumnsInfo());
}
if (routineLoadDesc.getWherePredicate() != null) {
originLoadDesc.setWherePredicate(routineLoadDesc.getWherePredicate());
}
if (routineLoadDesc.getPartitionNames() != null) {
originLoadDesc.setPartitionNames(routineLoadDesc.getPartitionNames());
}

String tableName = null;
try {
tableName = getTableName();
} catch (Exception e) {
LOG.warn("get table name failed", e);
tableName = "unknown";
}

// we use sql to persist the load properties, so we just put the load properties to sql.
String sql = String.format("CREATE ROUTINE LOAD %s ON %s %s" +
" PROPERTIES (\"desired_concurrent_number\"=\"1\")" +
" FROM KAFKA (\"kafka_topic\" = \"my_topic\")",
name, tableName, originLoadDesc.toSql());
LOG.debug("merge result: {}", sql);
origStmt = new OriginStatement(sql, 0);
}

protected abstract void modifyDataSourceProperties(RoutineLoadDataSourceProperties dataSourceProperties)
throws DdlException;

// for ALTER ROUTINE LOAD
private void modifyCommonJobProperties(Map<String, String> jobProperties) {
Expand Down
Expand Up @@ -29,8 +29,6 @@
import com.starrocks.analysis.CreateRoutineLoadStmt;
import com.starrocks.analysis.PauseRoutineLoadStmt;
import com.starrocks.analysis.ResumeRoutineLoadStmt;
import com.starrocks.analysis.SqlParser;
import com.starrocks.analysis.SqlScanner;
import com.starrocks.analysis.StopRoutineLoadStmt;
import com.starrocks.catalog.Catalog;
import com.starrocks.catalog.Database;
Expand All @@ -45,22 +43,18 @@
import com.starrocks.common.io.Writable;
import com.starrocks.common.util.LogBuilder;
import com.starrocks.common.util.LogKey;
import com.starrocks.common.util.SqlParserUtils;
import com.starrocks.load.RoutineLoadDesc;
import com.starrocks.mysql.privilege.PrivPredicate;
import com.starrocks.persist.AlterRoutineLoadJobOperationLog;
import com.starrocks.persist.RoutineLoadOperation;
import com.starrocks.qe.ConnectContext;
import com.starrocks.qe.SessionVariable;
import com.starrocks.qe.SqlModeHelper;
import com.starrocks.sql.optimizer.statistics.IDictManager;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -571,24 +565,8 @@ public void replayAlterRoutineLoadJob(AlterRoutineLoadJobOperationLog log) throw
// NOTE: we use the origin statement to get the RoutineLoadDesc
RoutineLoadDesc routineLoadDesc = null;
if (log.getOriginStatement() != null) {
long sqlMode;
if (job.getSessionVariables() != null && job.getSessionVariables().containsKey(SessionVariable.SQL_MODE)) {
sqlMode = Long.parseLong(job.getSessionVariables().get(SessionVariable.SQL_MODE));
} else {
sqlMode = SqlModeHelper.MODE_DEFAULT;
}
try {
SqlParser parser = new SqlParser(
new SqlScanner(new StringReader(log.getOriginStatement().originStmt), sqlMode));
AlterRoutineLoadStmt stmt = (AlterRoutineLoadStmt) SqlParserUtils.getStmt(
parser, log.getOriginStatement().idx);
if (stmt.getLoadPropertyList() != null) {
routineLoadDesc = CreateRoutineLoadStmt.buildLoadDesc(stmt.getLoadPropertyList());
}
} catch (Exception e) {
throw new IOException("error happens when parsing alter routine load stmt: "
+ log.getOriginStatement().originStmt, e);
}
routineLoadDesc = CreateRoutineLoadStmt.getLoadDesc(
log.getOriginStatement(), job.getSessionVariables());
}
job.modifyJob(routineLoadDesc, log.getJobProperties(),
log.getDataSourceProperties(), log.getOriginStatement(), true);
Expand Down