Skip to content

Commit

Permalink
0004591: Multi-primary out of sync race condition with NEWER_WINS
Browse files Browse the repository at this point in the history
conflicts
  • Loading branch information
erilong committed Oct 23, 2020
1 parent 9ae2c90 commit 368ae3e
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 16 deletions.
Expand Up @@ -21,7 +21,6 @@
package org.jumpmind.symmetric.load;

import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -202,17 +201,39 @@ protected void handleWinnerForNewerCaptureWins(ISqlTransaction transaction, CsvD

String channelId = csvData.getAttribute(CsvData.ATTRIBUTE_CHANNEL_ID);
if (channelId != null && !channelId.equals(Constants.CHANNEL_RELOAD)) {
String sourceNodeId = csvData.getAttribute(CsvData.ATTRIBUTE_SOURCE_NODE_ID);
String script = "if (context != void && context != null) { " +
"engine.getDataService().sendNewerDataToNode(context.findTransaction(), SOURCE_NODE_ID, \"" +
tableName + "\", " + CsvUtils.escapeCsvData(csvData.getCsvData(CsvData.PK_DATA)) + ", new Date(" +
((Date) csvData.getAttribute(CsvData.ATTRIBUTE_CREATE_TIME)).getTime() +"L), \"" + sourceNodeId + "\"); }";
Data scriptData = new Data(tableName, DataEventType.BSH,
CsvUtils.escapeCsvData(script), null, hist, Constants.CHANNEL_RELOAD, null, null);
scriptData.setSourceNodeId(sourceNodeId);
engine.getDataService().insertData(transaction, scriptData);
String pkCsvData = CsvUtils.escapeCsvData(getPkCsvData(csvData, hist));
if (pkCsvData != null) {
String sourceNodeId = csvData.getAttribute(CsvData.ATTRIBUTE_SOURCE_NODE_ID);
long createTime = data.getCreateTime() != null ? data.getCreateTime().getTime() : 0;
String script = "if (context != void && context != null) { " +
"engine.getDataService().sendNewerDataToNode(context.findTransaction(), SOURCE_NODE_ID, \"" +
tableName + "\", " + CsvUtils.escapeCsvData(getPkCsvData(csvData, hist)) + ", new Date(" +
createTime +"L), \"" + sourceNodeId + "\"); }";
Data scriptData = new Data(tableName, DataEventType.BSH,
CsvUtils.escapeCsvData(script), null, hist, Constants.CHANNEL_RELOAD, null, null);
scriptData.setSourceNodeId(sourceNodeId);
engine.getDataService().insertData(transaction, scriptData);
}
}
}
}

protected String getPkCsvData(CsvData csvData, TriggerHistory hist) {
String pkCsvData = null;
if (csvData.getDataEventType() == DataEventType.INSERT) {
if (hist.getParsedPkColumnNames() != null && hist.getParsedPkColumnNames().length > 0) {
String[] pkData = new String[hist.getParsedPkColumnNames().length];
Map<String, String> values = csvData.toColumnNameValuePairs(hist.getParsedPkColumnNames(), CsvData.ROW_DATA);
int i = 0;
for (String name : hist.getParsedPkColumnNames()) {
pkData[i++] = values.get(name);
}
pkCsvData = CsvUtils.escapeCsvData(pkData);
}
} else {
pkCsvData = csvData.getCsvData(CsvData.PK_DATA);
}
return pkCsvData;
}
}, buildDatabaseWriterSettings(filters, errorHandlers, conflictSettings, resolvedData));

Expand Down
Expand Up @@ -145,18 +145,18 @@ protected boolean isCaptureTimeNewer(Conflict conflict, AbstractDatabaseWriter w
"(event_type in ('U', 'D') and pk_data like ?)) and create_time >= ? order by create_time desc";

Object[] args = new Object[] { targetTable.getName(), pkCsv + "%", pkCsv, loadingTs };
List<Row> rows = null;
Row row = null;

if (databaseWriter.getPlatform(targetTable.getName()).supportsMultiThreadedTransactions()) {
// we may have waited for another transaction to commit, so query with a new transaction
rows = databaseWriter.getPlatform(targetTable.getName()).getSqlTemplate().query(sql, args);
row = databaseWriter.getPlatform(targetTable.getName()).getSqlTemplate().queryForRow(sql, args);
} else {
writer.getContext().findTransaction().queryForRow(sql, args);
row = writer.getContext().findTransaction().queryForRow(sql, args);
}

if (rows != null && rows.size() > 0) {
existingTs = rows.get(0).getDateTime("create_time");
existingNodeId = rows.get(0).getString("source_node_id");
if (row != null) {
existingTs = row.getDateTime("create_time");
existingNodeId = row.getString("source_node_id");
if (existingNodeId == null || existingNodeId.equals("")) {
existingNodeId = writer.getContext().getBatch().getTargetNodeId();
}
Expand Down

0 comments on commit 368ae3e

Please sign in to comment.