diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java index a8822dbea3..c86f334176 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java @@ -385,6 +385,7 @@ private ParameterConstants() { public final static String SYNC_TRIGGERS_THREAD_COUNT_PER_SERVER = "sync.triggers.thread.count.per.server"; public final static String SYNC_TRIGGERS_TIMEOUT_IN_SECONDS = "sync.triggers.timeout.in.seconds"; public final static String SYNC_TRIGGERS_REG_SVR_INSTALL_WITHOUT_CONFIG = "sync.triggers.reg.svr.install.without.config"; + public final static String SYNC_TRIGGERS_FIX_DUPLICATE_ACTIVE_TRIGGER_HISTORIES = "sync.triggers.fix.duplicate.active.trigger.histories"; public static final String SMTP_HOST = "smtp.host"; public static final String SMTP_TRANSPORT = "smtp.transport"; public static final String SMTP_PORT = "smtp.port"; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java index f436f30d75..e2e70d9d7f 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java @@ -1428,6 +1428,7 @@ public boolean syncTriggers(StringBuilder sqlBuffer, boolean force) { + " is set to false, but the sync triggers process will run so that needed changes can be written to a file so they can be applied manually. Once all of the triggers have been successfully applied this process should not show triggers being created"; } log.info("Synchronizing triggers{}", additionalMessage); + fixMultipleActiveTriggerHistories(); // make sure all tables are freshly read in platform.resetCachedTableModel(); clearCache(); @@ -1755,37 +1756,42 @@ public boolean syncTriggers(Table table, boolean force) { public boolean syncTriggers(List tables, boolean force) { if (clusterService.lock(ClusterConstants.SYNC_TRIGGERS)) { - boolean ignoreCase = this.parameterService.is(ParameterConstants.DB_METADATA_IGNORE_CASE); - List triggersForCurrentNode = getTriggersForCurrentNode(); - List activeTriggerHistories = getActiveTriggerHistories(); - Map> triggerToTableSupportingInfo = getTriggerToTableSupportingInfo(triggersForCurrentNode, - activeTriggerHistories, false); - for (Table table : tables) { - IDatabasePlatform targetPlatform = symmetricDialect.getTargetPlatform(table.getName()); - for (Trigger trigger : triggersForCurrentNode) { - if (trigger.matches(table, targetPlatform.getDefaultCatalog(), targetPlatform.getDefaultSchema(), ignoreCase) && - (!trigger.isSourceTableNameWildCarded() || !trigger.isSourceTableNameExpanded() - || !containsExactMatchForSourceTableName(table, triggersForCurrentNode, ignoreCase))) { - List triggerTableSupportingInfoList = triggerToTableSupportingInfo.get(trigger.getTriggerId()); - TriggerTableSupportingInfo triggerTableSupportingInfo = null; - for (TriggerTableSupportingInfo t : triggerTableSupportingInfoList) { - if (t.getTable().getFullyQualifiedTableName().equals(table.getFullyQualifiedTableName())) { - triggerTableSupportingInfo = t; - break; + try { + fixMultipleActiveTriggerHistories(); + boolean ignoreCase = this.parameterService.is(ParameterConstants.DB_METADATA_IGNORE_CASE); + List triggersForCurrentNode = getTriggersForCurrentNode(); + List activeTriggerHistories = getActiveTriggerHistories(); + Map> triggerToTableSupportingInfo = getTriggerToTableSupportingInfo(triggersForCurrentNode, + activeTriggerHistories, false); + for (Table table : tables) { + IDatabasePlatform targetPlatform = symmetricDialect.getTargetPlatform(table.getName()); + for (Trigger trigger : triggersForCurrentNode) { + if (trigger.matches(table, targetPlatform.getDefaultCatalog(), targetPlatform.getDefaultSchema(), ignoreCase) && + (!trigger.isSourceTableNameWildCarded() || !trigger.isSourceTableNameExpanded() + || !containsExactMatchForSourceTableName(table, triggersForCurrentNode, ignoreCase))) { + List triggerTableSupportingInfoList = triggerToTableSupportingInfo.get(trigger.getTriggerId()); + TriggerTableSupportingInfo triggerTableSupportingInfo = null; + for (TriggerTableSupportingInfo t : triggerTableSupportingInfoList) { + if (t.getTable().getFullyQualifiedTableName().equals(table.getFullyQualifiedTableName())) { + triggerTableSupportingInfo = t; + break; + } + } + if (triggerTableSupportingInfo != null) { + log.info("Synchronizing triggers for {}", table.getFullyQualifiedTableName()); + updateOrCreateDatabaseTriggers(trigger, triggerTableSupportingInfo.getTable(), null, force, true, activeTriggerHistories, + triggerTableSupportingInfo); + log.info("Done synchronizing triggers for {}", table.getFullyQualifiedTableName()); + } else { + log.warn("Can't find table {} for trigger {}, make sure table exists.", table.getFullyQualifiedTableName(), trigger.getTriggerId()); } - } - if (triggerTableSupportingInfo != null) { - log.info("Synchronizing triggers for {}", table.getFullyQualifiedTableName()); - updateOrCreateDatabaseTriggers(trigger, triggerTableSupportingInfo.getTable(), null, force, true, activeTriggerHistories, - triggerTableSupportingInfo); - log.info("Done synchronizing triggers for {}", table.getFullyQualifiedTableName()); - } else { - log.warn("Can't find table {} for trigger {}, make sure table exists.", table.getFullyQualifiedTableName(), trigger.getTriggerId()); } } } + return true; + } finally { + clusterService.unlock(ClusterConstants.SYNC_TRIGGERS); } - return true; } return false; } @@ -2000,70 +2006,75 @@ public void syncTrigger(Trigger trigger, ITriggerCreationListener listener, bool public boolean syncTriggers(List triggers, ITriggerCreationListener listener, boolean force, boolean verifyInDatabase) { if (clusterService.lock(ClusterConstants.SYNC_TRIGGERS)) { - StringBuilder sqlBuffer = new StringBuilder(); - clearCache(); - List triggersForCurrentNode = null; - if (verifyInDatabase) { - triggersForCurrentNode = getTriggersForCurrentNode(); - } else { - triggersForCurrentNode = new ArrayList(); - triggersForCurrentNode.addAll(triggers); - } try { - if (listener != null) { - extensionService.addExtensionPoint(listener); - } - log.info("Synchronizing {} triggers", triggers.size()); - triggersToSync = triggers.size(); - triggersSynced = 0; - for (ITriggerCreationListener l : extensionService.getExtensionPointList(ITriggerCreationListener.class)) { - l.syncTriggersStarted(); + fixMultipleActiveTriggerHistories(); + StringBuilder sqlBuffer = new StringBuilder(); + clearCache(); + List triggersForCurrentNode = null; + if (verifyInDatabase) { + triggersForCurrentNode = getTriggersForCurrentNode(); + } else { + triggersForCurrentNode = new ArrayList(); + triggersForCurrentNode.addAll(triggers); } - List allHistories = getActiveTriggerHistories(); - Map> activeHistoryByTriggerId = new HashMap>(); - for (TriggerHistory hist : allHistories) { - List list = activeHistoryByTriggerId.get(hist.getTriggerId()); - if (list == null) { - list = new ArrayList(); - activeHistoryByTriggerId.put(hist.getTriggerId(), list); + try { + if (listener != null) { + extensionService.addExtensionPoint(listener); } - list.add(hist); - } - for (Trigger trigger : triggers) { - if (triggersForCurrentNode.contains(trigger)) { - if (!trigger.isSourceTableNameWildCarded() && !trigger.isSourceTableNameExpanded()) { + log.info("Synchronizing {} triggers", triggers.size()); + triggersToSync = triggers.size(); + triggersSynced = 0; + for (ITriggerCreationListener l : extensionService.getExtensionPointList(ITriggerCreationListener.class)) { + l.syncTriggersStarted(); + } + List allHistories = getActiveTriggerHistories(); + Map> activeHistoryByTriggerId = new HashMap>(); + for (TriggerHistory hist : allHistories) { + List list = activeHistoryByTriggerId.get(hist.getTriggerId()); + if (list == null) { + list = new ArrayList(); + activeHistoryByTriggerId.put(hist.getTriggerId(), list); + } + list.add(hist); + } + for (Trigger trigger : triggers) { + if (triggersForCurrentNode.contains(trigger)) { + if (!trigger.isSourceTableNameWildCarded() && !trigger.isSourceTableNameExpanded()) { + List activeHistories = activeHistoryByTriggerId.get(trigger.getTriggerId()); + if (activeHistories != null) { + for (TriggerHistory triggerHistory : activeHistories) { + if (!triggerHistory.getFullyQualifiedSourceTableName().equals(trigger.getFullyQualifiedSourceTableName())) { + dropTriggers(triggerHistory, sqlBuffer); + } + } + } + } + Map> triggerToTableSupportingInfo = getTriggerToTableSupportingInfo( + Collections.singletonList(trigger), allHistories, true); + updateOrCreateDatabaseTrigger(trigger, triggersForCurrentNode, sqlBuffer, + force, verifyInDatabase, allHistories, false, triggerToTableSupportingInfo); + } else { List activeHistories = activeHistoryByTriggerId.get(trigger.getTriggerId()); if (activeHistories != null) { for (TriggerHistory triggerHistory : activeHistories) { - if (!triggerHistory.getFullyQualifiedSourceTableName().equals(trigger.getFullyQualifiedSourceTableName())) { - dropTriggers(triggerHistory, sqlBuffer); - } + dropTriggers(triggerHistory, sqlBuffer); } } } - Map> triggerToTableSupportingInfo = getTriggerToTableSupportingInfo( - Collections.singletonList(trigger), allHistories, true); - updateOrCreateDatabaseTrigger(trigger, triggersForCurrentNode, sqlBuffer, - force, verifyInDatabase, allHistories, false, triggerToTableSupportingInfo); - } else { - List activeHistories = activeHistoryByTriggerId.get(trigger.getTriggerId()); - if (activeHistories != null) { - for (TriggerHistory triggerHistory : activeHistories) { - dropTriggers(triggerHistory, sqlBuffer); - } - } } + } finally { + for (ITriggerCreationListener l : extensionService.getExtensionPointList(ITriggerCreationListener.class)) { + l.syncTriggersEnded(); + } + if (listener != null) { + extensionService.removeExtensionPoint(listener); + } + log.info("Done synchronizing {} triggers", triggers.size()); } + return true; } finally { - for (ITriggerCreationListener l : extensionService.getExtensionPointList(ITriggerCreationListener.class)) { - l.syncTriggersEnded(); - } - if (listener != null) { - extensionService.removeExtensionPoint(listener); - } - log.info("Done synchronizing {} triggers", triggers.size()); + clusterService.unlock(ClusterConstants.SYNC_TRIGGERS); } - return true; } return false; } @@ -2388,26 +2399,58 @@ static class TriggerHistoryMapper implements ISqlRowMapper { public TriggerHistory mapRow(Row rs) { TriggerHistory hist = new TriggerHistory(); - hist.setTriggerHistoryId(rs.getInt("trigger_hist_id")); - hist.setTriggerId(rs.getString("trigger_id")); - hist.setSourceTableName(rs.getString("source_table_name")); - hist.setTableHash(rs.getInt("table_hash")); - hist.setCreateTime(rs.getDateTime("create_time")); - hist.setPkColumnNames(rs.getString("pk_column_names")); - hist.setColumnNames(rs.getString("column_names")); + if (rs.containsKey("trigger_hist_id")) { + hist.setTriggerHistoryId(rs.getInt("trigger_hist_id")); + } + if (rs.containsKey("trigger_id")) { + hist.setTriggerId(rs.getString("trigger_id")); + } + if (rs.containsKey("source_table_name")) { + hist.setSourceTableName(rs.getString("source_table_name")); + } + if (rs.containsKey("table_hash")) { + hist.setTableHash(rs.getInt("table_hash")); + } + if (rs.containsKey("create_time")) { + hist.setCreateTime(rs.getDateTime("create_time")); + } + if (rs.containsKey("pk_column_names")) { + hist.setPkColumnNames(rs.getString("pk_column_names")); + } + if (rs.containsKey("column_names")) { + hist.setColumnNames(rs.getString("column_names")); + } if (rs.containsKey("is_missing_pk")) { hist.setIsMissingPk(rs.getBoolean("is_missing_pk")); } - hist.setLastTriggerBuildReason(TriggerReBuildReason.fromCode(rs + if (rs.containsKey("last_trigger_build_reason")) { + hist.setLastTriggerBuildReason(TriggerReBuildReason.fromCode(rs .getString("last_trigger_build_reason"))); - hist.setNameForDeleteTrigger(rs.getString("name_for_delete_trigger")); - hist.setNameForInsertTrigger(rs.getString("name_for_insert_trigger")); - hist.setNameForUpdateTrigger(rs.getString("name_for_update_trigger")); - hist.setSourceSchemaName(rs.getString("source_schema_name")); - hist.setSourceCatalogName(rs.getString("source_catalog_name")); - hist.setTriggerRowHash(rs.getLong("trigger_row_hash")); - hist.setTriggerTemplateHash(rs.getLong("trigger_template_hash")); - hist.setErrorMessage(rs.getString("error_message")); + } + if (rs.containsKey("name_for_delete_trigger")) { + hist.setNameForDeleteTrigger(rs.getString("name_for_delete_trigger")); + } + if (rs.containsKey("name_for_insert_trigger")) { + hist.setNameForInsertTrigger(rs.getString("name_for_insert_trigger")); + } + if (rs.containsKey("name_for_update_trigger")) { + hist.setNameForUpdateTrigger(rs.getString("name_for_update_trigger")); + } + if (rs.containsKey("source_schema_name")) { + hist.setSourceSchemaName(rs.getString("source_schema_name")); + } + if (rs.containsKey("source_catalog_name")) { + hist.setSourceCatalogName(rs.getString("source_catalog_name")); + } + if (rs.containsKey("trigger_row_hash")) { + hist.setTriggerRowHash(rs.getLong("trigger_row_hash")); + } + if (rs.containsKey("trigger_template_hash")) { + hist.setTriggerTemplateHash(rs.getLong("trigger_template_hash")); + } + if (rs.containsKey("error_message")) { + hist.setErrorMessage(rs.getString("error_message")); + } if (this.retMap != null) { this.retMap.put((long) hist.getTriggerHistoryId(), hist); } @@ -2716,6 +2759,58 @@ protected void awaitTermination(ExecutorService executor, List> future throw new RuntimeException(e); } } + + protected void fixMultipleActiveTriggerHistories() { + if (parameterService.is(ParameterConstants.SYNC_TRIGGERS_FIX_DUPLICATE_ACTIVE_TRIGGER_HISTORIES)) { + // Get trigger_id, source_table_name, source_schema_name, and source_catalog_name of active ones that have more than one active + List multiples = getMultipleActiveTriggerHistories(); + if (multiples.size() > 0) { + log.info("Fixing " + multiples.size() + " trigger histories with multiple active entries"); + } + for (TriggerHistory triggerHistory : multiples) { + // Get trigger_history_ids in descending order + List th = getTriggerHistoryIds(triggerHistory); + boolean first = true; + for (TriggerHistory thLocal : th) { + // Leave the latest trigger history ID alone + if (first) { + first = false; + continue; + } + log.info("Marking trigger history ID " + thLocal.getTriggerHistoryId() + " inactive."); + inactivateTriggerHistory(thLocal); + } + } + } + } + + protected List getMultipleActiveTriggerHistories() { + return sqlTemplate.query(getSql("multipleActiveTriggerHistSql"), new TriggerHistoryMapper()); + } + + protected List getTriggerHistoryIds(TriggerHistory triggerHistory) { + List values = new ArrayList(); + StringBuilder sb = new StringBuilder(getSql("selectTriggerHistIdSql")).append(" where trigger_id=? and source_table_name=?"); + values.add(triggerHistory.getTriggerId()); + values.add(triggerHistory.getSourceTableName()); + sb.append(" and source_catalog_name"); + if (StringUtils.isBlank(triggerHistory.getSourceCatalogName())) { + sb.append(" is null "); + } else { + sb.append("=? "); + values.add(triggerHistory.getSourceCatalogName()); + } + sb.append(" and source_schema_name"); + if (StringUtils.isBlank(triggerHistory.getSourceSchemaName())) { + sb.append(" is null "); + } else { + sb.append("=? "); + values.add(triggerHistory.getSourceSchemaName()); + } + sb.append(" and inactive_time is null"); + sb.append(" order by trigger_hist_id desc "); + return sqlTemplate.query(sb.toString(), new TriggerHistoryMapper(), values.toArray()); + } class SyncTriggersThreadFactory implements ThreadFactory { AtomicInteger threadNumber = new AtomicInteger(1); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterServiceSqlMap.java index d0016cdc84..a13704a12e 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterServiceSqlMap.java @@ -90,6 +90,8 @@ public TriggerRouterServiceSqlMap(IDatabasePlatform platform, putSql("selectTriggerNameInUseSql", "" + "select count(*) from $(trigger_hist) where (name_for_update_trigger=? or name_for_insert_trigger=? or name_for_delete_trigger=?) and trigger_id != ? and inactive_time is null "); + + putSql("selectTriggerHistIdSql", "select trigger_hist_id from $(trigger_hist) "); putSql("selectGroupTriggersSql", "" + "where r.source_node_group_id = ? or r.target_node_group_id = ? order by t.channel_id "); @@ -106,6 +108,13 @@ public TriggerRouterServiceSqlMap(IDatabasePlatform platform, + " tr.initial_load_order "); putSql("activeTriggerHistSql", "" + "where inactive_time is null "); + + putSql("multipleActiveTriggerHistSql", "" + + "select trigger_id, source_table_name, source_catalog_name, source_schema_name " + + "from $(trigger_hist) " + + "where inactive_time is null " + + "group by trigger_id, source_table_name, source_catalog_name, source_schema_name " + + "having count(*) > 1"); putSql("activeTriggerHistSqlByTriggerId", "" + "where trigger_id=? and inactive_time is null "); diff --git a/symmetric-core/src/main/resources/symmetric-default.properties b/symmetric-core/src/main/resources/symmetric-default.properties index 2118a600eb..d71789981b 100644 --- a/symmetric-core/src/main/resources/symmetric-default.properties +++ b/symmetric-core/src/main/resources/symmetric-default.properties @@ -663,6 +663,14 @@ sync.triggers.timeout.in.seconds=3600 # Type: boolean sync.triggers.reg.svr.install.without.config=true +# Whether or not duplicate active trigger histories will be detected and fixed so that the most recent will end up +# being the only active trigger history. +# +# DatabaseOverridable: true +# Tags: general +# Type: boolean +sync.triggers.fix.duplicate.active.trigger.histories=true + # If this is true, when a configuration change is detected during routing, # symmetric will make sure all triggers in the database are up to date. #