From f30c7045e9249d10464f848b6e5ad18790c7ac50 Mon Sep 17 00:00:00 2001 From: chenson42 Date: Sat, 11 Aug 2012 14:00:20 +0000 Subject: [PATCH] 0000757: All tables are synchronized during a CREATE batch when initial.load.create.first is true resulting in a lot of table missing --- .../symmetric/AbstractSymmetricEngine.java | 2 +- .../load/ConfigurationChangedFilter.java | 6 +- .../org/jumpmind/symmetric/model/Trigger.java | 16 +- .../symmetric/model/TriggerRouter.java | 2 +- .../service/ITriggerRouterService.java | 6 +- .../service/impl/TriggerRouterService.java | 249 +++++++++--------- 6 files changed, 151 insertions(+), 130 deletions(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java index c22015e2c5..e60c604b1c 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java @@ -548,7 +548,7 @@ public void syncTriggers() { public void forceTriggerRebuild() { MDC.put("engineName", getEngineName()); - triggerRouterService.syncTriggers(null, true); + triggerRouterService.syncTriggers(true); } public NodeStatus getNodeStatus() { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/ConfigurationChangedFilter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/ConfigurationChangedFilter.java index cf937abb08..2930c18299 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/ConfigurationChangedFilter.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/ConfigurationChangedFilter.java @@ -80,9 +80,13 @@ public void afterWrite(DataContext context, Table table, CsvData data) { } private void recordSyncNeeded(DataContext context, Table table, CsvData data) { - if (isSyncTriggersNeeded(table) || data.getDataEventType() == DataEventType.CREATE) { + if (isSyncTriggersNeeded(table)) { context.put(CTX_KEY_RESYNC_NEEDED, true); } + + if (data.getDataEventType() == DataEventType.CREATE) { + engine.getTriggerRouterService().syncTriggers(table, false); + } } private void recordJobManagerRestartNeeded(DataContext context, Table table, CsvData data) { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Trigger.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Trigger.java index a1ed469e6c..56da107ef0 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Trigger.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Trigger.java @@ -462,18 +462,20 @@ public long toHashedValue() { } return hashedValue; + } + + public boolean matches(Table table) { + return StringUtils.equals(sourceCatalogName, table.getCatalog()) + && StringUtils.equals(sourceSchemaName, table.getSchema()) + && table.getName().equalsIgnoreCase(sourceTableName); } - public boolean isSame(Trigger trigger) { - return isSame(sourceCatalogName, trigger.sourceCatalogName) - && isSame(sourceSchemaName, trigger.sourceSchemaName) + public boolean matches(Trigger trigger) { + return StringUtils.equals(sourceCatalogName, trigger.sourceCatalogName) + && StringUtils.equals(sourceSchemaName, trigger.sourceSchemaName) && trigger.sourceTableName.equalsIgnoreCase(sourceTableName); } - protected boolean isSame(String one, String two) { - return (one == null && two == null) || (one != null && two != null && one.equals(two)); - } - @Override public boolean equals(Object obj) { if (obj instanceof Trigger && triggerId != null) { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/TriggerRouter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/TriggerRouter.java index 2b25281ccf..60cf0f89b9 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/TriggerRouter.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/TriggerRouter.java @@ -205,7 +205,7 @@ public boolean isPingBackEnabled() { public boolean isSame(TriggerRouter triggerRouter) { return (this.trigger == null && triggerRouter.trigger == null) || (this.trigger != null && triggerRouter.trigger != null && this.trigger - .isSame(triggerRouter.trigger)) + .matches(triggerRouter.trigger)) && (this.router == null && triggerRouter.router == null) || (this.router != null && triggerRouter.router != null && this.router .equals(triggerRouter.router)); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/ITriggerRouterService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/ITriggerRouterService.java index e7d32c2d7e..5cff2057d4 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/ITriggerRouterService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/ITriggerRouterService.java @@ -136,7 +136,11 @@ public interface ITriggerRouterService { public void saveTriggerRouter(TriggerRouter triggerRouter, boolean updateTriggerRouterTableOnly); - public void saveTriggerRouter(TriggerRouter triggerRouter); + public void saveTriggerRouter(TriggerRouter triggerRouter); + + public void syncTriggers(Table table, boolean genAlways); + + public void syncTriggers(boolean genAlways); public void syncTriggers(); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java index caee404411..ee23e6936c 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java @@ -742,10 +742,14 @@ public void saveTrigger(Trigger trigger) { } public void syncTriggers() { - syncTriggers(null, false); + syncTriggers((StringBuilder) null, false); } - public void syncTriggers(StringBuilder sqlBuffer, boolean genAlways) { + public void syncTriggers(boolean force) { + syncTriggers((StringBuilder) null, false); + } + + public void syncTriggers(StringBuilder sqlBuffer, boolean force) { if (parameterService.is(ParameterConstants.AUTO_SYNC_TRIGGERS) || isCalledFromSymmetricAdminTool()) { if (clusterService.lock(ClusterConstants.SYNCTRIGGERS)) { @@ -768,7 +772,7 @@ public void syncTriggers(StringBuilder sqlBuffer, boolean genAlways) { List triggersForCurrentNode = getTriggersForCurrentNode(); inactivateTriggers(triggersForCurrentNode, sqlBuffer); - updateOrCreateDatabaseTriggers(triggersForCurrentNode, sqlBuffer, genAlways); + updateOrCreateDatabaseTriggers(triggersForCurrentNode, sqlBuffer, force); resetTriggerRouterCacheByNodeGroupId(); } finally { clusterService.unlock(ClusterConstants.SYNCTRIGGERS); @@ -794,8 +798,7 @@ protected Set getTriggerIdsFrom(List triggersThatShouldBeActive protected void inactivateTriggers(List triggersThatShouldBeActive, StringBuilder sqlBuffer) { - boolean ignoreCase = this.parameterService - .is(ParameterConstants.DB_METADATA_IGNORE_CASE); + boolean ignoreCase = this.parameterService.is(ParameterConstants.DB_METADATA_IGNORE_CASE); List activeHistories = getActiveTriggerHistories(); Map> tablesByTriggerId = new HashMap>(); for (TriggerHistory history : activeHistories) { @@ -806,7 +809,7 @@ protected void inactivateTriggers(List triggersThatShouldBeActive, tables = getTablesForTrigger(trigger, triggersThatShouldBeActive); tablesByTriggerId.put(trigger.getTriggerId(), tables); } - + if (tables == null || trigger == null) { removeTrigger = true; } else { @@ -819,13 +822,15 @@ protected void inactivateTriggers(List triggersThatShouldBeActive, if (trigger.isSourceTableNameWildCarded()) { boolean foundMatch = false; for (Table table : tables) { - foundMatch |= ignoreCase ? StringUtils.equalsIgnoreCase(table.getName(), history.getSourceTableName()) : StringUtils.equals(table.getName(), history.getSourceTableName()); + foundMatch |= ignoreCase ? StringUtils.equalsIgnoreCase( + table.getName(), history.getSourceTableName()) : StringUtils + .equals(table.getName(), history.getSourceTableName()); } removeTrigger = !foundMatch; } } } - + if (removeTrigger) { log.info("About to remove triggers for inactivated table: {}", history.getFullyQualifiedSourceTableName()); @@ -901,9 +906,9 @@ protected Set getTablesForTrigger(Trigger trigger, List triggers ignoreCase) && !table.getName().toLowerCase().startsWith(tablePrefix)) { if (!wildcardToken.startsWith(FormatUtils.NEGATE_TOKEN)) { - tables.add(table); + tables.add(table); } else { - tables.remove(table); + tables.remove(table); } } } @@ -931,135 +936,141 @@ private boolean containsExactMatchForSourceTableName(String tableName, List triggersForCurrentNode = getTriggersForCurrentNode(); + for (Trigger trigger : triggersForCurrentNode) { + if (trigger.matches(table)) { + updateOrCreateDatabaseTriggers(trigger, table, null, force); + } + } + } + protected void updateOrCreateDatabaseTriggers(List triggers, StringBuilder sqlBuffer, - boolean genAlways) { + boolean force) { for (Trigger trigger : triggers) { - TriggerHistory newestHistory = null; - TriggerReBuildReason reason = TriggerReBuildReason.NEW_TRIGGERS; - - String errorMessage = null; - Channel channel = configurationService.getChannel(trigger.getChannelId()); - if (channel == null) { - errorMessage = String - .format("Trigger %s had an unrecognized channel_id of '%s'. Please check to make sure the channel exists. Creating trigger on the '%s' channel", - trigger.getTriggerId(), trigger.getChannelId(), - Constants.CHANNEL_DEFAULT); - log.error(errorMessage); - trigger.setChannelId(Constants.CHANNEL_DEFAULT); - } Set
tables = getTablesForTrigger(trigger, triggers); if (tables.size() > 0) { for (Table table : tables) { - try { + updateOrCreateDatabaseTriggers(trigger, table, sqlBuffer, force); + } - if (table.getPrimaryKeyColumnCount() == 0) { - table = table.copy(); - table.makeAllColumnsPrimaryKeys(); - } + } else { + log.error( + "Could not find any database tables matching '{}' in the datasource that is configured", + trigger.qualifiedSourceTableName()); - TriggerHistory latestHistoryBeforeRebuild = getNewestTriggerHistoryForTrigger( - trigger.getTriggerId(), - trigger.getSourceCatalogName(), - trigger.getSourceSchemaName(), - trigger.isSourceTableNameWildCarded() ? table.getName() : trigger - .getSourceTableName()); - - boolean forceRebuildOfTriggers = false; - if (latestHistoryBeforeRebuild == null) { - reason = TriggerReBuildReason.NEW_TRIGGERS; - forceRebuildOfTriggers = true; - - } else if (table.calculateTableHashcode() != latestHistoryBeforeRebuild - .getTableHash()) { - reason = TriggerReBuildReason.TABLE_SCHEMA_CHANGED; - forceRebuildOfTriggers = true; - - } else if (trigger - .hasChangedSinceLastTriggerBuild(latestHistoryBeforeRebuild - .getCreateTime()) - || trigger.toHashedValue() != latestHistoryBeforeRebuild - .getTriggerRowHash()) { - reason = TriggerReBuildReason.TABLE_SYNC_CONFIGURATION_CHANGED; - forceRebuildOfTriggers = true; - } else if (genAlways) { - reason = TriggerReBuildReason.FORCED; - forceRebuildOfTriggers = true; - } + if (this.triggerCreationListeners != null) { + for (ITriggerCreationListener l : this.triggerCreationListeners) { + l.tableDoesNotExist(trigger); + } + } + } + } + } - boolean supportsTriggers = symmetricDialect.getPlatform().getDatabaseInfo() - .isTriggersSupported(); + protected void updateOrCreateDatabaseTriggers(Trigger trigger, Table table, + StringBuilder sqlBuffer, boolean force) { + TriggerHistory newestHistory = null; + TriggerReBuildReason reason = TriggerReBuildReason.NEW_TRIGGERS; + + String errorMessage = null; + Channel channel = configurationService.getChannel(trigger.getChannelId()); + if (channel == null) { + errorMessage = String + .format("Trigger %s had an unrecognized channel_id of '%s'. Please check to make sure the channel exists. Creating trigger on the '%s' channel", + trigger.getTriggerId(), trigger.getChannelId(), + Constants.CHANNEL_DEFAULT); + log.error(errorMessage); + trigger.setChannelId(Constants.CHANNEL_DEFAULT); + } - newestHistory = rebuildTriggerIfNecessary(sqlBuffer, - forceRebuildOfTriggers, trigger, DataEventType.INSERT, reason, - latestHistoryBeforeRebuild, null, trigger.isSyncOnInsert() - && supportsTriggers, table); + try { - newestHistory = rebuildTriggerIfNecessary(sqlBuffer, - forceRebuildOfTriggers, trigger, DataEventType.UPDATE, reason, - latestHistoryBeforeRebuild, newestHistory, trigger.isSyncOnUpdate() - && supportsTriggers, table); + if (table.getPrimaryKeyColumnCount() == 0) { + table = table.copy(); + table.makeAllColumnsPrimaryKeys(); + } - newestHistory = rebuildTriggerIfNecessary(sqlBuffer, - forceRebuildOfTriggers, trigger, DataEventType.DELETE, reason, - latestHistoryBeforeRebuild, newestHistory, trigger.isSyncOnDelete() - && supportsTriggers, table); + TriggerHistory latestHistoryBeforeRebuild = getNewestTriggerHistoryForTrigger( + trigger.getTriggerId(), + trigger.getSourceCatalogName(), + trigger.getSourceSchemaName(), + trigger.isSourceTableNameWildCarded() ? table.getName() : trigger + .getSourceTableName()); - if (latestHistoryBeforeRebuild != null && newestHistory != null) { - inactivateTriggerHistory(latestHistoryBeforeRebuild); - } + boolean forceRebuildOfTriggers = false; + if (latestHistoryBeforeRebuild == null) { + reason = TriggerReBuildReason.NEW_TRIGGERS; + forceRebuildOfTriggers = true; + + } else if (table.calculateTableHashcode() != latestHistoryBeforeRebuild.getTableHash()) { + reason = TriggerReBuildReason.TABLE_SCHEMA_CHANGED; + forceRebuildOfTriggers = true; + + } else if (trigger.hasChangedSinceLastTriggerBuild(latestHistoryBeforeRebuild + .getCreateTime()) + || trigger.toHashedValue() != latestHistoryBeforeRebuild.getTriggerRowHash()) { + reason = TriggerReBuildReason.TABLE_SYNC_CONFIGURATION_CHANGED; + forceRebuildOfTriggers = true; + } else if (force) { + reason = TriggerReBuildReason.FORCED; + forceRebuildOfTriggers = true; + } - if (newestHistory != null) { - newestHistory.setErrorMessage(errorMessage); - if (parameterService.is(ParameterConstants.AUTO_SYNC_TRIGGERS)) { - if (this.triggerCreationListeners != null) { - for (ITriggerCreationListener l : this.triggerCreationListeners) { - l.triggerCreated(trigger, newestHistory); - } - } - } - } + boolean supportsTriggers = symmetricDialect.getPlatform().getDatabaseInfo() + .isTriggersSupported(); - } catch (Exception ex) { - log.error( - String.format("Failed to create triggers for %s", - trigger.qualifiedSourceTableName()), ex); - - if (newestHistory != null) { - // Make sure all the triggers are removed from the - // table - symmetricDialect.removeTrigger(null, trigger.getSourceCatalogName(), - trigger.getSourceSchemaName(), - newestHistory.getNameForInsertTrigger(), - trigger.getSourceTableName(), newestHistory); - symmetricDialect.removeTrigger(null, trigger.getSourceCatalogName(), - trigger.getSourceSchemaName(), - newestHistory.getNameForUpdateTrigger(), - trigger.getSourceTableName(), newestHistory); - symmetricDialect.removeTrigger(null, trigger.getSourceCatalogName(), - trigger.getSourceSchemaName(), - newestHistory.getNameForDeleteTrigger(), - trigger.getSourceTableName(), newestHistory); - } + newestHistory = rebuildTriggerIfNecessary(sqlBuffer, forceRebuildOfTriggers, trigger, + DataEventType.INSERT, reason, latestHistoryBeforeRebuild, null, + trigger.isSyncOnInsert() && supportsTriggers, table); + + newestHistory = rebuildTriggerIfNecessary(sqlBuffer, forceRebuildOfTriggers, trigger, + DataEventType.UPDATE, reason, latestHistoryBeforeRebuild, newestHistory, + trigger.isSyncOnUpdate() && supportsTriggers, table); + + newestHistory = rebuildTriggerIfNecessary(sqlBuffer, forceRebuildOfTriggers, trigger, + DataEventType.DELETE, reason, latestHistoryBeforeRebuild, newestHistory, + trigger.isSyncOnDelete() && supportsTriggers, table); + + if (latestHistoryBeforeRebuild != null && newestHistory != null) { + inactivateTriggerHistory(latestHistoryBeforeRebuild); + } - if (this.triggerCreationListeners != null) { - for (ITriggerCreationListener l : this.triggerCreationListeners) { - l.triggerFailed(trigger, ex); - } + if (newestHistory != null) { + newestHistory.setErrorMessage(errorMessage); + if (parameterService.is(ParameterConstants.AUTO_SYNC_TRIGGERS)) { + if (this.triggerCreationListeners != null) { + for (ITriggerCreationListener l : this.triggerCreationListeners) { + l.triggerCreated(trigger, newestHistory); } } } + } - } else { - log.error( - "Could not find any database tables matching '{}' in the datasource that is configured", - trigger.qualifiedSourceTableName()); + } catch (Exception ex) { + log.error( + String.format("Failed to create triggers for %s", + trigger.qualifiedSourceTableName()), ex); + + if (newestHistory != null) { + // Make sure all the triggers are removed from the + // table + symmetricDialect.removeTrigger(null, trigger.getSourceCatalogName(), + trigger.getSourceSchemaName(), newestHistory.getNameForInsertTrigger(), + trigger.getSourceTableName(), newestHistory); + symmetricDialect.removeTrigger(null, trigger.getSourceCatalogName(), + trigger.getSourceSchemaName(), newestHistory.getNameForUpdateTrigger(), + trigger.getSourceTableName(), newestHistory); + symmetricDialect.removeTrigger(null, trigger.getSourceCatalogName(), + trigger.getSourceSchemaName(), newestHistory.getNameForDeleteTrigger(), + trigger.getSourceTableName(), newestHistory); + } - if (this.triggerCreationListeners != null) { - for (ITriggerCreationListener l : this.triggerCreationListeners) { - l.tableDoesNotExist(trigger); - } + if (this.triggerCreationListeners != null) { + for (ITriggerCreationListener l : this.triggerCreationListeners) { + l.triggerFailed(trigger, ex); } } } @@ -1108,8 +1119,8 @@ protected TriggerHistory rebuildTriggerIfNecessary(StringBuilder sqlBuffer, if ((forceRebuild || !triggerIsActive) && triggerExists) { symmetricDialect.removeTrigger(sqlBuffer, oldCatalogName, oldSourceSchema, - oldTriggerName, trigger.isSourceTableNameWildCarded() ? table.getName() : trigger - .getSourceTableName(), oldhist); + oldTriggerName, trigger.isSourceTableNameWildCarded() ? table.getName() + : trigger.getSourceTableName(), oldhist); triggerExists = false; triggerRemoved = true; }