Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: support for undo full data columns on update operate #2509

Merged
merged 8 commits into from
May 3, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,11 @@ public class ConfigurationKeys {
*/
public static final String TRANSACTION_UNDO_LOG_SERIALIZATION = CLIENT_UNDO_PREFIX + "logSerialization";

/**
* The constant TRANSACTION_UNDO_ONLY_CARE_UPDATE_COLUMNS.
*/
public static final String TRANSACTION_UNDO_ONLY_CARE_UPDATE_COLUMNS = CLIENT_UNDO_PREFIX + "onlyCareUpdateColumns";

/**
* The constant METRICS_PREFIX.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class DefaultValues {
public static final boolean DEFAULT_TRANSPORT_HEARTBEAT = true;
public static final boolean DEFAULT_TRANSACTION_UNDO_DATA_VALIDATION = true;
public static final String DEFAULT_TRANSACTION_UNDO_LOG_SERIALIZATION = "jackson";
public static final boolean DEFAULT_ONLY_CARE_UPDATE_COLUMNS = true;
/**
* The constant DEFAULT_TRANSACTION_UNDO_LOG_TABLE.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,6 @@
*/
package io.seata.rm.datasource.exec;

import io.seata.common.util.IOUtil;
import io.seata.common.util.StringUtils;
import io.seata.rm.datasource.StatementProxy;
import io.seata.rm.datasource.sql.struct.Field;
import io.seata.rm.datasource.sql.struct.TableMeta;
import io.seata.rm.datasource.sql.struct.TableRecords;
import io.seata.sqlparser.SQLRecognizer;
import io.seata.sqlparser.SQLUpdateRecognizer;

import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
Expand All @@ -34,6 +25,20 @@
import java.util.Set;
import java.util.StringJoiner;

import io.seata.common.util.IOUtil;
import io.seata.common.util.StringUtils;
import io.seata.config.Configuration;
import io.seata.config.ConfigurationFactory;
import io.seata.core.constants.ConfigurationKeys;
import io.seata.core.constants.DefaultValues;
import io.seata.rm.datasource.ColumnUtils;
import io.seata.rm.datasource.StatementProxy;
import io.seata.rm.datasource.sql.struct.Field;
import io.seata.rm.datasource.sql.struct.TableMeta;
import io.seata.rm.datasource.sql.struct.TableRecords;
import io.seata.sqlparser.SQLRecognizer;
import io.seata.sqlparser.SQLUpdateRecognizer;

/**
* The type MultiSql executor.
*
Expand All @@ -42,6 +47,19 @@
* @author wangwei-ying
*/
public class MultiUpdateExecutor<T, S extends Statement> extends AbstractDMLBaseExecutor<T, S> {

private static final Configuration CONFIG = ConfigurationFactory.getInstance();

private static final boolean ONLY_CARE_UPDATE_COLUMNS = CONFIG.getBoolean(
ConfigurationKeys.TRANSACTION_UNDO_ONLY_CARE_UPDATE_COLUMNS, DefaultValues.DEFAULT_ONLY_CARE_UPDATE_COLUMNS);

/**
* Instantiates a new Multi update executor.
*
* @param statementProxy the statement proxy
* @param statementCallback the statement callback
* @param sqlRecognizers the sql recognizers
*/
public MultiUpdateExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, List<SQLRecognizer> sqlRecognizers) {
super(statementProxy, statementCallback, sqlRecognizers);
}
Expand Down Expand Up @@ -77,9 +95,6 @@ protected TableRecords beforeImage() throws SQLException {
}
}
StringBuilder prefix = new StringBuilder("SELECT ");
if (!containsPK(new ArrayList<>(updateColumnsSet))) {
prefix.append(getColumnNameInSQL(tmeta.getEscapePkName(getDbType()))).append(", ");
}
final StringBuilder suffix = new StringBuilder(" FROM ").append(getFromTableInSQL());
if (noWhereCondition) {
//select all rows
Expand All @@ -89,8 +104,17 @@ protected TableRecords beforeImage() throws SQLException {
}
suffix.append(" FOR UPDATE");
final StringJoiner selectSQLAppender = new StringJoiner(", ", prefix, suffix.toString());
for (String updateCol : updateColumnsSet) {
selectSQLAppender.add(updateCol);
if (ONLY_CARE_UPDATE_COLUMNS) {
if (!containsPK(new ArrayList<>(updateColumnsSet))) {
selectSQLAppender.add(getColumnNameInSQL(tmeta.getEscapePkName(getDbType())));
}
for (String updateCol : updateColumnsSet) {
selectSQLAppender.add(updateCol);
}
} else {
for (String columnName : tmeta.getAllColumns().keySet()) {
selectSQLAppender.add(ColumnUtils.addEscape(columnName, getDbType()));
}
}
return buildTableRecords(tmeta, selectSQLAppender.toString(), paramAppenderList);
}
Expand Down Expand Up @@ -129,14 +153,19 @@ private String buildAfterImageSQL(TableMeta tableMeta, TableRecords beforeImage)
updateColumnsSet.addAll(sqlUpdateRecognizer.getUpdateColumns());
}
StringBuilder prefix = new StringBuilder("SELECT ");
if (!containsPK(new ArrayList<>(updateColumnsSet))) {
// PK should be included.
prefix.append(getColumnNameInSQL(tableMeta.getEscapePkName(getDbType()))).append(", ");
}
String suffix = " FROM " + getFromTableInSQL() + " WHERE " + buildWhereConditionByPKs(beforeImage.pkRows());
StringJoiner selectSQLJoiner = new StringJoiner(", ", prefix.toString(), suffix);
for (String column : updateColumnsSet) {
selectSQLJoiner.add(column);
if (ONLY_CARE_UPDATE_COLUMNS) {
if (!containsPK(new ArrayList<>(updateColumnsSet))) {
selectSQLJoiner.add(getColumnNameInSQL(tableMeta.getEscapePkName(getDbType())));
}
for (String updateCol : updateColumnsSet) {
selectSQLJoiner.add(updateCol);
}
} else {
for (String columnName : tableMeta.getAllColumns().keySet()) {
selectSQLJoiner.add(ColumnUtils.addEscape(columnName, getDbType()));
}
}
return selectSQLJoiner.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,33 +24,41 @@
import java.util.StringJoiner;

import io.seata.common.util.IOUtil;
import io.seata.config.Configuration;
import io.seata.config.ConfigurationFactory;
import io.seata.core.constants.ConfigurationKeys;
import io.seata.core.constants.DefaultValues;
import io.seata.rm.datasource.ColumnUtils;
import io.seata.rm.datasource.StatementProxy;

import io.seata.sqlparser.SQLRecognizer;
import io.seata.sqlparser.SQLUpdateRecognizer;
import io.seata.rm.datasource.sql.struct.Field;
import io.seata.rm.datasource.sql.struct.TableMeta;
import io.seata.rm.datasource.sql.struct.TableRecords;
import io.seata.sqlparser.SQLRecognizer;
import io.seata.sqlparser.SQLUpdateRecognizer;
import org.apache.commons.lang.StringUtils;

/**
* The type Update executor.
*
* @author sharajava
*
* @param <T> the type parameter
* @param <S> the type parameter
* @author sharajava
*/
public class UpdateExecutor<T, S extends Statement> extends AbstractDMLBaseExecutor<T, S> {

private static final Configuration CONFIG = ConfigurationFactory.getInstance();

private static final boolean ONLY_CARE_UPDATE_COLUMNS = CONFIG.getBoolean(
ConfigurationKeys.TRANSACTION_UNDO_ONLY_CARE_UPDATE_COLUMNS, DefaultValues.DEFAULT_ONLY_CARE_UPDATE_COLUMNS);

/**
* Instantiates a new Update executor.
*
* @param statementProxy the statement proxy
* @param statementCallback the statement callback
* @param sqlRecognizer the sql recognizer
*/
public UpdateExecutor(StatementProxy<S> statementProxy, StatementCallback<T,S> statementCallback,
public UpdateExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback,
SQLRecognizer sqlRecognizer) {
super(statementProxy, statementCallback, sqlRecognizer);
}
Expand All @@ -66,20 +74,26 @@ protected TableRecords beforeImage() throws SQLException {

private String buildBeforeImageSQL(TableMeta tableMeta, ArrayList<List<Object>> paramAppenderList) {
SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer)sqlRecognizer;
List<String> updateColumns = recognizer.getUpdateColumns();
StringBuilder prefix = new StringBuilder("SELECT ");
if (!containsPK(updateColumns)) {
prefix.append(getColumnNameInSQL(tableMeta.getEscapePkName(getDbType()))).append(", ");
}
StringBuilder suffix = new StringBuilder(" FROM ").append(getFromTableInSQL());
String whereCondition = buildWhereCondition(recognizer, paramAppenderList);
if (StringUtils.isNotBlank(whereCondition)) {
suffix.append(" WHERE ").append(whereCondition);
}
suffix.append(" FOR UPDATE");
StringJoiner selectSQLJoin = new StringJoiner(", ", prefix.toString(), suffix.toString());
for (String updateColumn : updateColumns) {
selectSQLJoin.add(updateColumn);
if (ONLY_CARE_UPDATE_COLUMNS) {
List<String> updateColumns = recognizer.getUpdateColumns();
if (!containsPK(updateColumns)) {
selectSQLJoin.add(getColumnNameInSQL(tableMeta.getEscapePkName(getDbType())));
}
for (String columnName : updateColumns) {
selectSQLJoin.add(columnName);
}
} else {
for (String columnName : tableMeta.getAllColumns().keySet()) {
selectSQLJoin.add(ColumnUtils.addEscape(columnName, getDbType()));
}
}
return selectSQLJoin.toString();
}
Expand All @@ -106,17 +120,22 @@ protected TableRecords afterImage(TableRecords beforeImage) throws SQLException
}

private String buildAfterImageSQL(TableMeta tableMeta, TableRecords beforeImage) throws SQLException {
SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer)sqlRecognizer;
List<String> updateColumns = recognizer.getUpdateColumns();
StringBuilder prefix = new StringBuilder("SELECT ");
if (!containsPK(updateColumns)) {
// PK should be included.
prefix.append(getColumnNameInSQL(tableMeta.getEscapePkName(getDbType()))).append(", ");
}
String suffix = " FROM " + getFromTableInSQL() + " WHERE " + buildWhereConditionByPKs(beforeImage.pkRows());
StringJoiner selectSQLJoiner = new StringJoiner(", ", prefix.toString(), suffix);
for (String column : updateColumns) {
selectSQLJoiner.add(column);
if (ONLY_CARE_UPDATE_COLUMNS) {
SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer)sqlRecognizer;
List<String> updateColumns = recognizer.getUpdateColumns();
if (!containsPK(updateColumns)) {
selectSQLJoiner.add(getColumnNameInSQL(tableMeta.getEscapePkName(getDbType())));
}
for (String columnName : updateColumns) {
selectSQLJoiner.add(columnName);
}
} else {
for (String columnName : tableMeta.getAllColumns().keySet()) {
selectSQLJoiner.add(ColumnUtils.addEscape(columnName, getDbType()));
}
}
return selectSQLJoiner.toString();
}
Expand Down
1 change: 1 addition & 0 deletions script/client/conf/file.conf
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ client {
}
undo {
dataValidation = true
onlyCareUpdateColumns = true
logSerialization = "jackson"
logTable = "undo_log"
}
Expand Down
1 change: 1 addition & 0 deletions script/client/spring/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ seata.client.tm.commit-retry-count=5
seata.client.tm.rollback-retry-count=5
seata.client.undo.data-validation=true
seata.client.undo.log-serialization=jackson
seata.client.undo.only-care-update-columns=true
seata.client.undo.log-table=undo_log
seata.client.log.exceptionRate=100
seata.service.vgroup-mapping.my_test_tx_group=default
Expand Down
1 change: 1 addition & 0 deletions script/client/spring/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ seata:
data-validation: true
log-serialization: jackson
log-table: undo_log
only-care-update-columns: true
log:
exceptionRate: 100
service:
Expand Down
1 change: 1 addition & 0 deletions script/config-center/config.txt
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ server.maxRollbackRetryTimeout=-1
server.rollbackRetryTimeoutUnlockEnable=false
client.undo.dataValidation=true
client.undo.logSerialization=jackson
client.undo.onlyCareUpdateColumns=true
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
client.undo.logTable=undo_log
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

import static io.seata.core.constants.DefaultValues.DEFAULT_ONLY_CARE_UPDATE_COLUMNS;
import static io.seata.core.constants.DefaultValues.DEFAULT_TRANSACTION_UNDO_DATA_VALIDATION;
import static io.seata.core.constants.DefaultValues.DEFAULT_TRANSACTION_UNDO_LOG_SERIALIZATION;
import static io.seata.core.constants.DefaultValues.DEFAULT_TRANSACTION_UNDO_LOG_TABLE;
Expand All @@ -32,6 +33,7 @@ public class UndoProperties {
private boolean dataValidation = DEFAULT_TRANSACTION_UNDO_DATA_VALIDATION;
private String logSerialization = DEFAULT_TRANSACTION_UNDO_LOG_SERIALIZATION;
private String logTable = DEFAULT_TRANSACTION_UNDO_LOG_TABLE;
private boolean onlyCareUpdateColumns = DEFAULT_ONLY_CARE_UPDATE_COLUMNS;

public boolean isDataValidation() {
return dataValidation;
Expand Down Expand Up @@ -59,4 +61,13 @@ public UndoProperties setLogTable(String logTable) {
this.logTable = logTable;
return this;
}

public boolean isOnlyCareUpdateColumns() {
return onlyCareUpdateColumns;
}

public UndoProperties setOnlyCareUpdateColumns(boolean onlyCareUpdateColumns) {
this.onlyCareUpdateColumns = onlyCareUpdateColumns;
return this;
}
}