Skip to content

Commit

Permalink
feature: support for undo full data columns on update operate (#2509)
Browse files Browse the repository at this point in the history
  • Loading branch information
slievrly committed May 3, 2020
1 parent fedc085 commit e373700
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,11 @@ public class ConfigurationKeys {
*/
public static final String TRANSACTION_UNDO_LOG_SERIALIZATION = CLIENT_UNDO_PREFIX + "logSerialization";

/**
* The constant TRANSACTION_UNDO_ONLY_CARE_UPDATE_COLUMNS.
*/
public static final String TRANSACTION_UNDO_ONLY_CARE_UPDATE_COLUMNS = CLIENT_UNDO_PREFIX + "onlyCareUpdateColumns";

/**
* The constant METRICS_PREFIX.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class DefaultValues {
public static final boolean DEFAULT_TRANSPORT_HEARTBEAT = true;
public static final boolean DEFAULT_TRANSACTION_UNDO_DATA_VALIDATION = true;
public static final String DEFAULT_TRANSACTION_UNDO_LOG_SERIALIZATION = "jackson";
public static final boolean DEFAULT_ONLY_CARE_UPDATE_COLUMNS = true;
/**
* The constant DEFAULT_TRANSACTION_UNDO_LOG_TABLE.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,6 @@
*/
package io.seata.rm.datasource.exec;

import io.seata.common.util.IOUtil;
import io.seata.common.util.StringUtils;
import io.seata.rm.datasource.StatementProxy;
import io.seata.rm.datasource.sql.struct.Field;
import io.seata.rm.datasource.sql.struct.TableMeta;
import io.seata.rm.datasource.sql.struct.TableRecords;
import io.seata.sqlparser.SQLRecognizer;
import io.seata.sqlparser.SQLUpdateRecognizer;

import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
Expand All @@ -34,6 +25,20 @@
import java.util.Set;
import java.util.StringJoiner;

import io.seata.common.util.IOUtil;
import io.seata.common.util.StringUtils;
import io.seata.config.Configuration;
import io.seata.config.ConfigurationFactory;
import io.seata.core.constants.ConfigurationKeys;
import io.seata.core.constants.DefaultValues;
import io.seata.rm.datasource.ColumnUtils;
import io.seata.rm.datasource.StatementProxy;
import io.seata.rm.datasource.sql.struct.Field;
import io.seata.rm.datasource.sql.struct.TableMeta;
import io.seata.rm.datasource.sql.struct.TableRecords;
import io.seata.sqlparser.SQLRecognizer;
import io.seata.sqlparser.SQLUpdateRecognizer;

/**
* The type MultiSql executor.
*
Expand All @@ -42,6 +47,19 @@
* @author wangwei-ying
*/
public class MultiUpdateExecutor<T, S extends Statement> extends AbstractDMLBaseExecutor<T, S> {

private static final Configuration CONFIG = ConfigurationFactory.getInstance();

private static final boolean ONLY_CARE_UPDATE_COLUMNS = CONFIG.getBoolean(
ConfigurationKeys.TRANSACTION_UNDO_ONLY_CARE_UPDATE_COLUMNS, DefaultValues.DEFAULT_ONLY_CARE_UPDATE_COLUMNS);

/**
* Instantiates a new Multi update executor.
*
* @param statementProxy the statement proxy
* @param statementCallback the statement callback
* @param sqlRecognizers the sql recognizers
*/
public MultiUpdateExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, List<SQLRecognizer> sqlRecognizers) {
super(statementProxy, statementCallback, sqlRecognizers);
}
Expand Down Expand Up @@ -77,9 +95,6 @@ protected TableRecords beforeImage() throws SQLException {
}
}
StringBuilder prefix = new StringBuilder("SELECT ");
if (!containsPK(new ArrayList<>(updateColumnsSet))) {
prefix.append(getColumnNameInSQL(tmeta.getEscapePkName(getDbType()))).append(", ");
}
final StringBuilder suffix = new StringBuilder(" FROM ").append(getFromTableInSQL());
if (noWhereCondition) {
//select all rows
Expand All @@ -89,8 +104,17 @@ protected TableRecords beforeImage() throws SQLException {
}
suffix.append(" FOR UPDATE");
final StringJoiner selectSQLAppender = new StringJoiner(", ", prefix, suffix.toString());
for (String updateCol : updateColumnsSet) {
selectSQLAppender.add(updateCol);
if (ONLY_CARE_UPDATE_COLUMNS) {
if (!containsPK(new ArrayList<>(updateColumnsSet))) {
selectSQLAppender.add(getColumnNameInSQL(tmeta.getEscapePkName(getDbType())));
}
for (String updateCol : updateColumnsSet) {
selectSQLAppender.add(updateCol);
}
} else {
for (String columnName : tmeta.getAllColumns().keySet()) {
selectSQLAppender.add(ColumnUtils.addEscape(columnName, getDbType()));
}
}
return buildTableRecords(tmeta, selectSQLAppender.toString(), paramAppenderList);
}
Expand Down Expand Up @@ -129,14 +153,19 @@ private String buildAfterImageSQL(TableMeta tableMeta, TableRecords beforeImage)
updateColumnsSet.addAll(sqlUpdateRecognizer.getUpdateColumns());
}
StringBuilder prefix = new StringBuilder("SELECT ");
if (!containsPK(new ArrayList<>(updateColumnsSet))) {
// PK should be included.
prefix.append(getColumnNameInSQL(tableMeta.getEscapePkName(getDbType()))).append(", ");
}
String suffix = " FROM " + getFromTableInSQL() + " WHERE " + buildWhereConditionByPKs(beforeImage.pkRows());
StringJoiner selectSQLJoiner = new StringJoiner(", ", prefix.toString(), suffix);
for (String column : updateColumnsSet) {
selectSQLJoiner.add(column);
if (ONLY_CARE_UPDATE_COLUMNS) {
if (!containsPK(new ArrayList<>(updateColumnsSet))) {
selectSQLJoiner.add(getColumnNameInSQL(tableMeta.getEscapePkName(getDbType())));
}
for (String updateCol : updateColumnsSet) {
selectSQLJoiner.add(updateCol);
}
} else {
for (String columnName : tableMeta.getAllColumns().keySet()) {
selectSQLJoiner.add(ColumnUtils.addEscape(columnName, getDbType()));
}
}
return selectSQLJoiner.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,33 +24,41 @@
import java.util.StringJoiner;

import io.seata.common.util.IOUtil;
import io.seata.config.Configuration;
import io.seata.config.ConfigurationFactory;
import io.seata.core.constants.ConfigurationKeys;
import io.seata.core.constants.DefaultValues;
import io.seata.rm.datasource.ColumnUtils;
import io.seata.rm.datasource.StatementProxy;

import io.seata.sqlparser.SQLRecognizer;
import io.seata.sqlparser.SQLUpdateRecognizer;
import io.seata.rm.datasource.sql.struct.Field;
import io.seata.rm.datasource.sql.struct.TableMeta;
import io.seata.rm.datasource.sql.struct.TableRecords;
import io.seata.sqlparser.SQLRecognizer;
import io.seata.sqlparser.SQLUpdateRecognizer;
import org.apache.commons.lang.StringUtils;

/**
* The type Update executor.
*
* @author sharajava
*
* @param <T> the type parameter
* @param <S> the type parameter
* @author sharajava
*/
public class UpdateExecutor<T, S extends Statement> extends AbstractDMLBaseExecutor<T, S> {

private static final Configuration CONFIG = ConfigurationFactory.getInstance();

private static final boolean ONLY_CARE_UPDATE_COLUMNS = CONFIG.getBoolean(
ConfigurationKeys.TRANSACTION_UNDO_ONLY_CARE_UPDATE_COLUMNS, DefaultValues.DEFAULT_ONLY_CARE_UPDATE_COLUMNS);

/**
* Instantiates a new Update executor.
*
* @param statementProxy the statement proxy
* @param statementCallback the statement callback
* @param sqlRecognizer the sql recognizer
*/
public UpdateExecutor(StatementProxy<S> statementProxy, StatementCallback<T,S> statementCallback,
public UpdateExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback,
SQLRecognizer sqlRecognizer) {
super(statementProxy, statementCallback, sqlRecognizer);
}
Expand All @@ -66,20 +74,26 @@ protected TableRecords beforeImage() throws SQLException {

private String buildBeforeImageSQL(TableMeta tableMeta, ArrayList<List<Object>> paramAppenderList) {
SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer)sqlRecognizer;
List<String> updateColumns = recognizer.getUpdateColumns();
StringBuilder prefix = new StringBuilder("SELECT ");
if (!containsPK(updateColumns)) {
prefix.append(getColumnNameInSQL(tableMeta.getEscapePkName(getDbType()))).append(", ");
}
StringBuilder suffix = new StringBuilder(" FROM ").append(getFromTableInSQL());
String whereCondition = buildWhereCondition(recognizer, paramAppenderList);
if (StringUtils.isNotBlank(whereCondition)) {
suffix.append(" WHERE ").append(whereCondition);
}
suffix.append(" FOR UPDATE");
StringJoiner selectSQLJoin = new StringJoiner(", ", prefix.toString(), suffix.toString());
for (String updateColumn : updateColumns) {
selectSQLJoin.add(updateColumn);
if (ONLY_CARE_UPDATE_COLUMNS) {
List<String> updateColumns = recognizer.getUpdateColumns();
if (!containsPK(updateColumns)) {
selectSQLJoin.add(getColumnNameInSQL(tableMeta.getEscapePkName(getDbType())));
}
for (String columnName : updateColumns) {
selectSQLJoin.add(columnName);
}
} else {
for (String columnName : tableMeta.getAllColumns().keySet()) {
selectSQLJoin.add(ColumnUtils.addEscape(columnName, getDbType()));
}
}
return selectSQLJoin.toString();
}
Expand All @@ -106,17 +120,22 @@ protected TableRecords afterImage(TableRecords beforeImage) throws SQLException
}

private String buildAfterImageSQL(TableMeta tableMeta, TableRecords beforeImage) throws SQLException {
SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer)sqlRecognizer;
List<String> updateColumns = recognizer.getUpdateColumns();
StringBuilder prefix = new StringBuilder("SELECT ");
if (!containsPK(updateColumns)) {
// PK should be included.
prefix.append(getColumnNameInSQL(tableMeta.getEscapePkName(getDbType()))).append(", ");
}
String suffix = " FROM " + getFromTableInSQL() + " WHERE " + buildWhereConditionByPKs(beforeImage.pkRows());
StringJoiner selectSQLJoiner = new StringJoiner(", ", prefix.toString(), suffix);
for (String column : updateColumns) {
selectSQLJoiner.add(column);
if (ONLY_CARE_UPDATE_COLUMNS) {
SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer)sqlRecognizer;
List<String> updateColumns = recognizer.getUpdateColumns();
if (!containsPK(updateColumns)) {
selectSQLJoiner.add(getColumnNameInSQL(tableMeta.getEscapePkName(getDbType())));
}
for (String columnName : updateColumns) {
selectSQLJoiner.add(columnName);
}
} else {
for (String columnName : tableMeta.getAllColumns().keySet()) {
selectSQLJoiner.add(ColumnUtils.addEscape(columnName, getDbType()));
}
}
return selectSQLJoiner.toString();
}
Expand Down
1 change: 1 addition & 0 deletions script/client/conf/file.conf
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ client {
}
undo {
dataValidation = true
onlyCareUpdateColumns = true
logSerialization = "jackson"
logTable = "undo_log"
}
Expand Down
1 change: 1 addition & 0 deletions script/client/spring/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ seata.client.tm.commit-retry-count=5
seata.client.tm.rollback-retry-count=5
seata.client.undo.data-validation=true
seata.client.undo.log-serialization=jackson
seata.client.undo.only-care-update-columns=true
seata.client.undo.log-table=undo_log
seata.client.log.exceptionRate=100
seata.service.vgroup-mapping.my_test_tx_group=default
Expand Down
1 change: 1 addition & 0 deletions script/client/spring/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ seata:
data-validation: true
log-serialization: jackson
log-table: undo_log
only-care-update-columns: true
log:
exceptionRate: 100
service:
Expand Down
1 change: 1 addition & 0 deletions script/config-center/config.txt
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ server.maxRollbackRetryTimeout=-1
server.rollbackRetryTimeoutUnlockEnable=false
client.undo.dataValidation=true
client.undo.logSerialization=jackson
client.undo.onlyCareUpdateColumns=true
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
client.undo.logTable=undo_log
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

import static io.seata.core.constants.DefaultValues.DEFAULT_ONLY_CARE_UPDATE_COLUMNS;
import static io.seata.core.constants.DefaultValues.DEFAULT_TRANSACTION_UNDO_DATA_VALIDATION;
import static io.seata.core.constants.DefaultValues.DEFAULT_TRANSACTION_UNDO_LOG_SERIALIZATION;
import static io.seata.core.constants.DefaultValues.DEFAULT_TRANSACTION_UNDO_LOG_TABLE;
Expand All @@ -32,6 +33,7 @@ public class UndoProperties {
private boolean dataValidation = DEFAULT_TRANSACTION_UNDO_DATA_VALIDATION;
private String logSerialization = DEFAULT_TRANSACTION_UNDO_LOG_SERIALIZATION;
private String logTable = DEFAULT_TRANSACTION_UNDO_LOG_TABLE;
private boolean onlyCareUpdateColumns = DEFAULT_ONLY_CARE_UPDATE_COLUMNS;

public boolean isDataValidation() {
return dataValidation;
Expand Down Expand Up @@ -59,4 +61,13 @@ public UndoProperties setLogTable(String logTable) {
this.logTable = logTable;
return this;
}

public boolean isOnlyCareUpdateColumns() {
return onlyCareUpdateColumns;
}

public UndoProperties setOnlyCareUpdateColumns(boolean onlyCareUpdateColumns) {
this.onlyCareUpdateColumns = onlyCareUpdateColumns;
return this;
}
}

0 comments on commit e373700

Please sign in to comment.