Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do data validation when undo. #1060

Merged
merged 12 commits into from
May 21, 2019
12 changes: 12 additions & 0 deletions bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
<httpcore.version>4.4.11</httpcore.version>
<druid.version>1.1.12</druid.version>
<caffeine.version>2.7.0</caffeine.version>
<commons-dbcp.version>1.3</commons-dbcp.version>
<h2.version>1.4.181</h2.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -270,6 +272,16 @@
<artifactId>caffeine</artifactId>
<version>${caffeine.version}</version>
</dependency>
<dependency>
<groupId>commons-dbcp</groupId>
<artifactId>commons-dbcp</artifactId>
<version>${commons-dbcp.version}</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>${h2.version}</version>
</dependency>

</dependencies>
</dependencyManagement>
Expand Down
11 changes: 11 additions & 0 deletions rm-datasource/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@
<artifactId>caffeine</artifactId>
</dependency>

<dependency>
<groupId>commons-dbcp</groupId>
<artifactId>commons-dbcp</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>test</scope>
</dependency>

</dependencies>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,23 @@
*/
package io.seata.rm.datasource.undo;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;

import com.alibaba.fastjson.JSON;
import io.seata.common.util.StringUtils;
import io.seata.rm.datasource.DataCompareUtils;
import io.seata.rm.datasource.sql.struct.Field;
import io.seata.rm.datasource.sql.struct.KeyType;
import io.seata.rm.datasource.sql.struct.Row;
import io.seata.rm.datasource.sql.struct.TableMeta;
import io.seata.rm.datasource.sql.struct.TableRecords;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;

/**
* The type Abstract undo executor.
Expand All @@ -34,6 +41,18 @@
*/
public abstract class AbstractUndoExecutor {

/**
* Logger for AbstractUndoExecutor
**/
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractUndoExecutor.class);

/**
* template of check sql
*
* TODO support multiple primary key
*/
private static final String CHECK_SQL_TEMPLATE = "SELECT * FROM %s WHERE %s in (%s)";

/**
* The Sql undo log.
*/
Expand Down Expand Up @@ -72,11 +91,11 @@ public SQLUndoLog getSqlUndoLog() {
*/
public void executeOn(Connection conn) throws SQLException {

// no need undo if the before data snapshot is equivalent to the after data snapshot.
if (DataCompareUtils.isRecordsEquals(sqlUndoLog.getBeforeImage(), sqlUndoLog.getAfterImage())) {
// when the data validation is not ok
if (!dataValidation(conn)) {
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we throw some exception explicitly for the code change later?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I don’t get your point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add some log and change the method name for more explicitly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as communicated offline

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@leizhiyuan done.

}
dataValidation(conn);

try {
String undoSQL = buildUndoSQL();

Expand Down Expand Up @@ -145,9 +164,115 @@ protected void undoPrepare(PreparedStatement undoPST, ArrayList<Field> undoValue
* Data validation.
*
* @param conn the conn
* @throws SQLException the sql exception
* @return return true if data validation is ok and need continue
* @throws SQLException the sql exception such as has dirty data
*/
protected void dataValidation(Connection conn) throws SQLException {
protected boolean dataValidation(Connection conn) throws SQLException {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It ‘s better to add a switcher to close the dirty data check manually, the switcher is open default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I will add it in the next standalone PR.

TableRecords beforeRecords = sqlUndoLog.getBeforeImage();
TableRecords afterRecords = sqlUndoLog.getAfterImage();

// Compare current data with before data
// No need undo if the before data snapshot is equivalent to the after data snapshot.
if (DataCompareUtils.isRecordsEquals(beforeRecords, afterRecords)) {
return false;
}

// Validate if data is dirty.
TableRecords currentRecords = queryCurrentRecords(conn);
// compare with current data and after image.
if (!DataCompareUtils.isRecordsEquals(afterRecords, currentRecords)) {

// If current data is not equivalent to the after data, then compare the current data with the before
// data, too. No need continue to undo if current data is equivalent to the before data snapshot
if (DataCompareUtils.isRecordsEquals(beforeRecords, currentRecords)) {
return false;
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("check dirty datas failed, old and new data are not equal," +
"tableName:[" + sqlUndoLog.getTableName() + "]," +
"oldRows:[" + JSON.toJSONString(afterRecords.getRows()) + "]," +
"newRows:[" + JSON.toJSONString(currentRecords.getRows()) + "].");
}
throw new SQLException("Has dirty records when undo.");
}
}
return true;
}

/**
* Query current records.
*
* @param conn the conn
* @return the table records
* @throws SQLException the sql exception
*/
protected TableRecords queryCurrentRecords(Connection conn) throws SQLException {
TableRecords undoRecords = getUndoRows();
TableMeta tableMeta = undoRecords.getTableMeta();
String pkName = tableMeta.getPkName();
int pkType = tableMeta.getColumnMeta(pkName).getDataType();

// pares pk values
Object[] pkValues = parsePkValues(getUndoRows());
if (pkValues.length == 0) {
return TableRecords.empty(tableMeta);
}
StringBuffer replace = new StringBuffer();
for (int i = 0; i < pkValues.length; i++) {
replace.append("?,");
}
// build check sql
String checkSQL = String.format(CHECK_SQL_TEMPLATE, sqlUndoLog.getTableName(), pkName,
replace.substring(0, replace.length() - 1));

PreparedStatement statement = null;
ResultSet checkSet = null;
TableRecords currentRecords;
try {
statement = conn.prepareStatement(checkSQL);
for (int i = 1; i <= pkValues.length; i++) {
statement.setObject(i, pkValues[i - 1], pkType);
}
checkSet = statement.executeQuery();
currentRecords = TableRecords.buildRecords(tableMeta, checkSet);
} finally {
if (checkSet != null) {
try {
checkSet.close();
} catch (Exception e) {
}
}
if (statement != null) {
try {
statement.close();
} catch (Exception e) {
}
}
}
return currentRecords;
}

/**
* Parse pk values object [ ].
*
* @param records the records
* @return the object [ ]
*/
protected Object[] parsePkValues(TableRecords records) {
String pkName = records.getTableMeta().getPkName();
List<Row> undoRows = records.getRows();
Object[] pkValues = new Object[undoRows.size()];
for (int i = 0; i < undoRows.size(); i++) {
List<Field> fields = undoRows.get(i).getFields();
if (fields != null) {
for (Field field : fields) {
if (StringUtils.equalsIgnoreCase(pkName, field.getName())) {
pkValues[i] = field.getValue();
}
}
}
}
return pkValues;
}
}
Loading