diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java index c7edd376c9..2636a07a44 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java @@ -3268,6 +3268,7 @@ protected void checkInterrupted() throws InterruptedException { public class DataMapper implements ISqlRowMapper { private List triggerRouters; private List activeTriggerHistories; + private Collection allTriggerHistories; private HashMap mismatchedTableName; private HashSet missingConfigTriggerHist; private HashSet mismatchedTriggerHist; @@ -3294,7 +3295,7 @@ public Data mapRow(Row row) { data.putAttribute(CsvData.ATTRIBUTE_TABLE_ID, triggerHistId); TriggerHistory triggerHistory = engine.getTriggerRouterService().getTriggerHistory(triggerHistId); if (triggerHistory == null) { - triggerHistory = findOrCreateTriggerHistory(tableName, triggerHistId, data.getDataId(), false); + triggerHistory = findOrCreateTriggerHistory(tableName, triggerHistId, data, false); } else if (!triggerHistory.getSourceTableName().equals(tableName)) { if (mismatchedTableName == null) { mismatchedTableName = new HashMap(); @@ -3305,7 +3306,7 @@ public Data mapRow(Row row) { + "table name {} for data_id {}. Attempting to look up a valid trigger_hist row by table name", new Object[] { data.getTableName(), triggerHistory.getSourceTableName(), data.getDataId() }); - triggerHistory = findOrCreateTriggerHistory(tableName, triggerHistId, data.getDataId(), true); + triggerHistory = findOrCreateTriggerHistory(tableName, triggerHistId, data, true); mismatchedTableName.put(data.getTableName(), triggerHistory); } else { triggerHistory = cachedTriggerHistory; @@ -3316,10 +3317,11 @@ public Data mapRow(Row row) { return data; } - private TriggerHistory findOrCreateTriggerHistory(String tableName, int triggerHistId, long dataId, boolean isExistingTriggerHist) { + private TriggerHistory findOrCreateTriggerHistory(String tableName, int triggerHistId, Data data, boolean isExistingTriggerHist) { TriggerHistory triggerHistory = null; Trigger trigger = null; Table table = null; + long dataId = data.getDataId(); if (triggerRouters == null) { triggerRouters = engine.getTriggerRouterService().getAllTriggerRoutersForCurrentNode(engine.getNodeService().findIdentity().getNodeGroupId()); } @@ -3333,13 +3335,9 @@ private TriggerHistory findOrCreateTriggerHistory(String tableName, int triggerH if (table != null && trigger != null) { if (activeTriggerHistories == null) { activeTriggerHistories = engine.getTriggerRouterService().getActiveTriggerHistories(); + allTriggerHistories = engine.getTriggerRouterService().getHistoryRecords().values(); } - for (TriggerHistory hist : activeTriggerHistories) { - if (hist.getTriggerId().equals(trigger.getTriggerId()) && hist.getSourceTableName().equalsIgnoreCase(tableName)) { - triggerHistory = hist; - break; - } - } + triggerHistory = findMatchingTriggerHistory(trigger, data, tableName); if (triggerHistory == null) { triggerHistory = new TriggerHistory(table, trigger, engine.getSymmetricDialect().getTriggerTemplate()); triggerHistory.setTriggerHistoryId(isExistingTriggerHist ? 0 : triggerHistId); @@ -3355,6 +3353,7 @@ private TriggerHistory findOrCreateTriggerHistory(String tableName, int triggerH triggerHistId, tableName, dataId); engine.getTriggerRouterService().insert(triggerHistory); activeTriggerHistories.add(triggerHistory); + allTriggerHistories.add(triggerHistory); } else { if (mismatchedTriggerHist == null) { mismatchedTriggerHist = new HashSet(); @@ -3377,6 +3376,34 @@ private TriggerHistory findOrCreateTriggerHistory(String tableName, int triggerH } return triggerHistory; } + + private TriggerHistory findMatchingTriggerHistory(Trigger trigger, Data data, String tableName) { + TriggerHistory triggerHistory = null; + int columnCount = 0, pkColumnCount = 0; + if (data.getDataEventType() == DataEventType.INSERT || data.getDataEventType() == DataEventType.UPDATE) { + columnCount = data.getParsedData(CsvData.ROW_DATA).length; + } + if (data.getDataEventType() == DataEventType.DELETE || data.getDataEventType() == DataEventType.UPDATE) { + pkColumnCount = data.getParsedData(CsvData.PK_DATA).length; + } + for (TriggerHistory hist : allTriggerHistories) { + if (hist.getTriggerId().equals(trigger.getTriggerId()) && hist.getSourceTableName().equalsIgnoreCase(tableName) + && (columnCount == 0 || columnCount == hist.getParsedColumnNames().length) + && (pkColumnCount == 0 || pkColumnCount == hist.getParsedPkColumnNames().length)) { + triggerHistory = hist; + break; + } + } + if (triggerHistory == null) { + for (TriggerHistory hist : activeTriggerHistories) { + if (hist.getTriggerId().equals(trigger.getTriggerId()) && hist.getSourceTableName().equalsIgnoreCase(tableName)) { + triggerHistory = hist; + break; + } + } + } + return triggerHistory; + } } public int resendBatchAsReload(long batchId, String nodeId) {