From 368ae3ef77d23ae093a9ec738d9264e572101b8d Mon Sep 17 00:00:00 2001 From: Eric Long Date: Fri, 23 Oct 2020 08:50:34 -0400 Subject: [PATCH] 0004591: Multi-primary out of sync race condition with NEWER_WINS conflicts --- .../load/DefaultDataLoaderFactory.java | 41 ++++++++++++++----- ...DefaultDatabaseWriterConflictResolver.java | 12 +++--- 2 files changed, 37 insertions(+), 16 deletions(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/DefaultDataLoaderFactory.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/DefaultDataLoaderFactory.java index c432186e81..8dd77b881a 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/DefaultDataLoaderFactory.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/DefaultDataLoaderFactory.java @@ -21,7 +21,6 @@ package org.jumpmind.symmetric.load; import java.util.ArrayList; -import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -202,17 +201,39 @@ protected void handleWinnerForNewerCaptureWins(ISqlTransaction transaction, CsvD String channelId = csvData.getAttribute(CsvData.ATTRIBUTE_CHANNEL_ID); if (channelId != null && !channelId.equals(Constants.CHANNEL_RELOAD)) { - String sourceNodeId = csvData.getAttribute(CsvData.ATTRIBUTE_SOURCE_NODE_ID); - String script = "if (context != void && context != null) { " + - "engine.getDataService().sendNewerDataToNode(context.findTransaction(), SOURCE_NODE_ID, \"" + - tableName + "\", " + CsvUtils.escapeCsvData(csvData.getCsvData(CsvData.PK_DATA)) + ", new Date(" + - ((Date) csvData.getAttribute(CsvData.ATTRIBUTE_CREATE_TIME)).getTime() +"L), \"" + sourceNodeId + "\"); }"; - Data scriptData = new Data(tableName, DataEventType.BSH, - CsvUtils.escapeCsvData(script), null, hist, Constants.CHANNEL_RELOAD, null, null); - scriptData.setSourceNodeId(sourceNodeId); - engine.getDataService().insertData(transaction, scriptData); + String pkCsvData = CsvUtils.escapeCsvData(getPkCsvData(csvData, hist)); + if (pkCsvData != null) { + String sourceNodeId = csvData.getAttribute(CsvData.ATTRIBUTE_SOURCE_NODE_ID); + long createTime = data.getCreateTime() != null ? data.getCreateTime().getTime() : 0; + String script = "if (context != void && context != null) { " + + "engine.getDataService().sendNewerDataToNode(context.findTransaction(), SOURCE_NODE_ID, \"" + + tableName + "\", " + CsvUtils.escapeCsvData(getPkCsvData(csvData, hist)) + ", new Date(" + + createTime +"L), \"" + sourceNodeId + "\"); }"; + Data scriptData = new Data(tableName, DataEventType.BSH, + CsvUtils.escapeCsvData(script), null, hist, Constants.CHANNEL_RELOAD, null, null); + scriptData.setSourceNodeId(sourceNodeId); + engine.getDataService().insertData(transaction, scriptData); + } + } + } + } + + protected String getPkCsvData(CsvData csvData, TriggerHistory hist) { + String pkCsvData = null; + if (csvData.getDataEventType() == DataEventType.INSERT) { + if (hist.getParsedPkColumnNames() != null && hist.getParsedPkColumnNames().length > 0) { + String[] pkData = new String[hist.getParsedPkColumnNames().length]; + Map values = csvData.toColumnNameValuePairs(hist.getParsedPkColumnNames(), CsvData.ROW_DATA); + int i = 0; + for (String name : hist.getParsedPkColumnNames()) { + pkData[i++] = values.get(name); + } + pkCsvData = CsvUtils.escapeCsvData(pkData); } + } else { + pkCsvData = csvData.getCsvData(CsvData.PK_DATA); } + return pkCsvData; } }, buildDatabaseWriterSettings(filters, errorHandlers, conflictSettings, resolvedData)); diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DefaultDatabaseWriterConflictResolver.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DefaultDatabaseWriterConflictResolver.java index d3db942487..4780cdcd43 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DefaultDatabaseWriterConflictResolver.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DefaultDatabaseWriterConflictResolver.java @@ -145,18 +145,18 @@ protected boolean isCaptureTimeNewer(Conflict conflict, AbstractDatabaseWriter w "(event_type in ('U', 'D') and pk_data like ?)) and create_time >= ? order by create_time desc"; Object[] args = new Object[] { targetTable.getName(), pkCsv + "%", pkCsv, loadingTs }; - List rows = null; + Row row = null; if (databaseWriter.getPlatform(targetTable.getName()).supportsMultiThreadedTransactions()) { // we may have waited for another transaction to commit, so query with a new transaction - rows = databaseWriter.getPlatform(targetTable.getName()).getSqlTemplate().query(sql, args); + row = databaseWriter.getPlatform(targetTable.getName()).getSqlTemplate().queryForRow(sql, args); } else { - writer.getContext().findTransaction().queryForRow(sql, args); + row = writer.getContext().findTransaction().queryForRow(sql, args); } - if (rows != null && rows.size() > 0) { - existingTs = rows.get(0).getDateTime("create_time"); - existingNodeId = rows.get(0).getString("source_node_id"); + if (row != null) { + existingTs = row.getDateTime("create_time"); + existingNodeId = row.getString("source_node_id"); if (existingNodeId == null || existingNodeId.equals("")) { existingNodeId = writer.getContext().getBatch().getTargetNodeId(); }