From 23b798715b2bff648078c4531374311fe01e861a Mon Sep 17 00:00:00 2001 From: chenson42 Date: Wed, 9 Jul 2014 13:30:19 +0000 Subject: [PATCH] 0001798: Add support for wildcarded catalogs and schemas --- .../symmetric/db/h2/H2SymmetricDialect.java | 8 +- .../symmetric/AbstractSymmetricEngine.java | 8 +- .../db/AbstractSymmetricDialect.java | 2 +- .../symmetric/db/AbstractTriggerTemplate.java | 15 +- .../org/jumpmind/symmetric/model/Trigger.java | 45 ++++-- .../symmetric/model/TriggerHistory.java | 6 +- .../service/impl/TriggerRouterService.java | 137 ++++++++++++------ .../impl/TriggerRouterServiceSqlMap.java | 5 +- .../src/main/resources/symmetric-schema.xml | 6 +- 9 files changed, 148 insertions(+), 84 deletions(-) diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/db/h2/H2SymmetricDialect.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/db/h2/H2SymmetricDialect.java index 00e3df54d2..c9b354d142 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/db/h2/H2SymmetricDialect.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/db/h2/H2SymmetricDialect.java @@ -51,10 +51,10 @@ public H2SymmetricDialect(IParameterService parameterService, IDatabasePlatform protected boolean doesTriggerExistOnPlatform(String catalogName, String schemaName, String tableName, String triggerName) { boolean exists = (platform.getSqlTemplate() - .queryForInt("select count(*) from INFORMATION_SCHEMA.TRIGGERS WHERE TRIGGER_NAME = ?", - new Object[] { triggerName }) > 0) - && (platform.getSqlTemplate().queryForInt("select count(*) from INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = ?", - new Object[] { String.format("%s_CONFIG", triggerName) }) > 0); + .queryForInt("select count(*) from INFORMATION_SCHEMA.TRIGGERS WHERE TRIGGER_NAME = ? and (TRIGGER_CATALOG=? or ? is null) and (TRIGGER_SCHEMA=? or ? is null)", + new Object[] { triggerName, catalogName, catalogName, schemaName, schemaName }) > 0) + && (platform.getSqlTemplate().queryForInt("select count(*) from INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = ? and (TABLE_CATALOG=? or ? is null) and (TABLE_SCHEMA=? or ? is null)", + new Object[] { String.format("%s_CONFIG", triggerName), catalogName, catalogName, schemaName, schemaName }) > 0); if (!exists && !StringUtils.isBlank(triggerName)) { removeTrigger(new StringBuilder(), catalogName, schemaName, triggerName, tableName); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java index 57b9c5b36e..b91da29033 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java @@ -722,13 +722,9 @@ public NodeStatus getNodeStatus() { } public void removeAndCleanupNode(String nodeId) { - log.warn("Removing node {}", nodeId); + log.info("Removing node {}", nodeId); nodeService.deleteNode(nodeId, false); - log.warn("Marking outgoing batch records as OK for node ID {}", nodeId); - outgoingBatchService.markAllAsSentForNode(nodeId, true); - log.warn("Removing incoming batch records for node ID {}", nodeId); - incomingBatchService.removingIncomingBatches(nodeId); - log.warn("Done removing node ID {}", nodeId); + log.info("Done removing node ID {}", nodeId); } public RemoteNodeStatuses pull() { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/db/AbstractSymmetricDialect.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/db/AbstractSymmetricDialect.java index d8b1205541..e8b1279668 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/db/AbstractSymmetricDialect.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/db/AbstractSymmetricDialect.java @@ -292,7 +292,7 @@ public void createTrigger(final StringBuilder sqlBuffer, final DataEventType dml table.getFullyQualifiedTableName()); String previousCatalog = null; - String sourceCatalogName = trigger.getSourceCatalogName(); + String sourceCatalogName = table.getCatalog(); String defaultCatalog = platform.getDefaultCatalog(); String defaultSchema = platform.getDefaultSchema(); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/db/AbstractTriggerTemplate.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/db/AbstractTriggerTemplate.java index 2b68ca8bab..a17c10eb92 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/db/AbstractTriggerTemplate.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/db/AbstractTriggerTemplate.java @@ -153,7 +153,7 @@ public String createInitalLoadSql(Node node, TriggerRouter triggerRouter, Table , sql); sql = FormatUtils.replace("tableName", SymmetricUtils.quote(symmetricDialect, table.getName()), sql); sql = FormatUtils.replace("schemaName", - triggerHistory == null ? getSourceTablePrefix(triggerRouter.getTrigger()) + triggerHistory == null ? getSourceTablePrefix(originalTable) : getSourceTablePrefix(triggerHistory), sql); sql = FormatUtils.replace( "primaryKeyWhereString", @@ -174,11 +174,10 @@ public String createInitalLoadSql(Node node, TriggerRouter triggerRouter, Table return sql; } - protected String getSourceTablePrefix(Trigger trigger) { - String schemaPlus = (trigger.getSourceSchemaName() != null ? trigger.getSourceSchemaName() + protected String getSourceTablePrefix(Table table) { + String schemaPlus = (table.getSchema() != null ? table.getSchema() + "." : ""); - String catalogPlus = (trigger.getSourceCatalogName() != null ? trigger - .getSourceCatalogName() + "." : "") + String catalogPlus = (table.getCatalog() != null ? table.getCatalog() + "." : "") + schemaPlus; return catalogPlus; } @@ -218,7 +217,7 @@ public String createCsvDataSql(Trigger trigger, TriggerHistory triggerHistory, T sql = FormatUtils.replace("tableName", SymmetricUtils.quote(symmetricDialect, table.getName()), sql); sql = FormatUtils.replace("schemaName", - triggerHistory == null ? getSourceTablePrefix(trigger) + triggerHistory == null ? getSourceTablePrefix(originalTable) : getSourceTablePrefix(triggerHistory), sql); sql = FormatUtils.replace("whereClause", whereClause, sql); @@ -246,7 +245,7 @@ public String createCsvPrimaryKeySql(Trigger trigger, TriggerHistory triggerHist trigger.isUseCaptureLobs() ? "to_clob('')||" : "", sql); sql = FormatUtils.replace("tableName", SymmetricUtils.quote(symmetricDialect, table.getName()), sql); sql = FormatUtils.replace("schemaName", - triggerHistory == null ? getSourceTablePrefix(trigger) + triggerHistory == null ? getSourceTablePrefix(table) : getSourceTablePrefix(triggerHistory), sql); sql = FormatUtils.replace("whereClause", whereClause, sql); sql = FormatUtils.replace( @@ -379,7 +378,7 @@ protected String replaceTemplateVariables(DataEventType dml, Trigger trigger, // some column templates need tableName and schemaName ddl = FormatUtils.replace("tableName", SymmetricUtils.quote(symmetricDialect, table.getName()), ddl); - ddl = FormatUtils.replace("schemaName", history == null ? getSourceTablePrefix(trigger) + ddl = FormatUtils.replace("schemaName", history == null ? getSourceTablePrefix(originalTable) : getSourceTablePrefix(history), ddl); Column[] primaryKeyColumns = table.getPrimaryKeyColumns(); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Trigger.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Trigger.java index 07e7bddaa4..408d07b752 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Trigger.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Trigger.java @@ -256,7 +256,15 @@ public String getSourceTableName() { } public boolean isSourceTableNameWildCarded() { - return sourceTableName != null && sourceTableName.contains(FormatUtils.WILDCARD); + return sourceTableName != null && (sourceTableName.contains(FormatUtils.WILDCARD) || sourceTableName.contains(",")); + } + + public boolean isSourceCatalogNameWildCarded() { + return sourceCatalogName != null && (sourceCatalogName.contains(FormatUtils.WILDCARD) || sourceCatalogName.contains(",")); + } + + public boolean isSourceSchemaNameWildCarded() { + return sourceSchemaName != null && (sourceSchemaName.contains(FormatUtils.WILDCARD) || sourceSchemaName.contains(",")); } public String getChannelExpression() { @@ -595,6 +603,30 @@ public long toHashedValue() { return hashedValue; } + + public boolean matchesCatalogName(String catalogName, boolean ignoreCase) { + return matches(sourceCatalogName, catalogName, ignoreCase); + } + + public boolean matchesSchemaName(String schemaName, boolean ignoreCase) { + return matches(sourceSchemaName, schemaName, ignoreCase); + } + + protected boolean matches(String match, String target, boolean ignoreCase) { + boolean matches = false; + String[] wildcardTokens = match.split(","); + for (String wildcardToken : wildcardTokens) { + if (FormatUtils.isWildCardMatch(target, wildcardToken, ignoreCase)) { + if (!wildcardToken.startsWith(FormatUtils.NEGATE_TOKEN)) { + matches = true; + } else { + matches = false; + break; + } + } + } + return matches; + } public boolean matches(Table table, String defaultCatalog, String defaultSchema, boolean ignoreCase) { @@ -609,16 +641,7 @@ public boolean matches(Table table, String defaultCatalog, String defaultSchema, : table.getName().equals(sourceTableName); if (!tableMatches && isSourceTableNameWildCarded()) { - String[] wildcardTokens = sourceTableName.split(","); - for (String wildcardToken : wildcardTokens) { - if (FormatUtils.isWildCardMatch(table.getName(), wildcardToken, ignoreCase)) { - if (!wildcardToken.startsWith(FormatUtils.NEGATE_TOKEN)) { - tableMatches = true; - } else { - tableMatches = false; - } - } - } + tableMatches = matches(sourceTableName, table.getName(), ignoreCase); } return schemaAndCatalogMatch && tableMatches; } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/TriggerHistory.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/TriggerHistory.java index 0c8cb29c4c..4599f4b22d 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/TriggerHistory.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/TriggerHistory.java @@ -115,8 +115,10 @@ public TriggerHistory(Table table, Trigger trigger, AbstractTriggerTemplate trig this.sourceTableName = trigger.isSourceTableNameWildCarded() ? table.getName() : trigger .getSourceTableName(); this.columnNames = Table.getCommaDeliminatedColumns(trigger.orderColumnsForTable(table)); - this.sourceSchemaName = trigger.getSourceSchemaName(); - this.sourceCatalogName = trigger.getSourceCatalogName(); + this.sourceSchemaName = trigger.isSourceSchemaNameWildCarded() ? table.getSchema() : + trigger.getSourceSchemaName(); + this.sourceCatalogName = trigger.isSourceCatalogNameWildCarded() ? table.getCatalog() : + trigger.getSourceCatalogName(); this.triggerId = trigger.getTriggerId(); this.pkColumnNames = Table.getCommaDeliminatedColumns(trigger.filterExcludedColumns(trigger .getSyncKeysColumnsForTable(table))); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java index d07d4adfc6..eed2092fbf 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java @@ -1113,24 +1113,26 @@ protected void inactivateTriggers(List triggersThatShouldBeActive, if (tables == null || tables.size() == 0 || trigger == null) { removeTrigger = true; } else { - if (!StringUtils.equals(trigger.getSourceCatalogName(), - history.getSourceCatalogName()) - || !StringUtils.equals(trigger.getSourceSchemaName(), - history.getSourceSchemaName())) { + boolean foundTable = false; + + for (Table table : tables) { + boolean matchesCatalog = isEqual( + trigger.isSourceCatalogNameWildCarded() ? table.getCatalog() + : trigger.getSourceCatalogName(), + history.getSourceCatalogName(), ignoreCase); + boolean matchesSchema = isEqual( + trigger.isSourceSchemaNameWildCarded() ? table.getSchema() + : trigger.getSourceSchemaName(), history.getSourceSchemaName(), + ignoreCase); + boolean matchesTable = isEqual( + trigger.isSourceTableNameWildCarded() ? table.getName() + : trigger.getSourceTableName(), history.getSourceTableName(), + ignoreCase); + foundTable |= matchesCatalog && matchesSchema && matchesTable; + } + + if (!foundTable) { removeTrigger = true; - } else { - if (trigger.isSourceTableNameWildCarded()) { - boolean foundMatch = false; - for (Table table : tables) { - foundMatch |= ignoreCase ? StringUtils.equalsIgnoreCase( - table.getName(), history.getSourceTableName()) : StringUtils - .equals(table.getName(), history.getSourceTableName()); - } - removeTrigger = !foundMatch; - } else if (!StringUtils.equals(trigger.getSourceTableName(), - history.getSourceTableName())) { - removeTrigger = true; - } } } @@ -1142,6 +1144,14 @@ protected void inactivateTriggers(List triggersThatShouldBeActive, } } + protected boolean isEqual(String one, String two, boolean ignoreCase) { + if (ignoreCase) { + return StringUtils.equalsIgnoreCase(one, two); + } else { + return StringUtils.equals(one, two); + } + } + public void dropTriggers(TriggerHistory history) { dropTriggers(history, null); } @@ -1210,32 +1220,67 @@ protected List getTriggersForCurrentNode() { protected Set getTablesForTrigger(Trigger trigger, List triggers) { Set
tables = new HashSet
(); try { - if (trigger.isSourceTableNameWildCarded()) { - boolean ignoreCase = this.parameterService - .is(ParameterConstants.DB_METADATA_IGNORE_CASE); - - Database database = symmetricDialect.getPlatform().readDatabase( - trigger.getSourceCatalogName(), trigger.getSourceSchemaName(), - new String[] { "TABLE" }); - Table[] tableArray = database.getTables(); - - for (Table table : tableArray) { - if (trigger.matches(table, platform.getDefaultCatalog(), - platform.getDefaultSchema(), ignoreCase) - && !containsExactMatchForSourceTableName(table, triggers, - ignoreCase) - && !table.getName().toLowerCase().startsWith(tablePrefix)) { - tables.add(table); + boolean ignoreCase = this.parameterService + .is(ParameterConstants.DB_METADATA_IGNORE_CASE); + + List catalogNames = new ArrayList(); + if (trigger.isSourceCatalogNameWildCarded()) { + List all = platform.getDdlReader().getCatalogNames(); + for (String catalogName : all) { + if (trigger.matchesCatalogName(catalogName, ignoreCase)) { + catalogNames.add(catalogName); } } + if (catalogNames.size() == 0) { + catalogNames.add(null); + } } else { - Table table = symmetricDialect.getPlatform().getTableFromCache( - trigger.getSourceCatalogName(), trigger.getSourceSchemaName(), - trigger.getSourceTableName(), false); - if (table != null) { - tables.add(table); + catalogNames.add(trigger.getSourceCatalogName()); + } + + for (String catalogName : catalogNames) { + List schemaNames = new ArrayList(); + if (trigger.isSourceSchemaNameWildCarded()) { + List all = platform.getDdlReader().getSchemaNames(catalogName); + for (String schemaName : all) { + if (trigger.matchesSchemaName(schemaName, ignoreCase)) { + schemaNames.add(schemaName); + } + } + if (schemaNames.size() == 0) { + schemaNames.add(null); + } + } else { + schemaNames.add(trigger.getSourceSchemaName()); } - } + + for (String schemaName : schemaNames) { + if (trigger.isSourceTableNameWildCarded()) { + Database database = symmetricDialect.getPlatform().readDatabase( + catalogName, schemaName, + new String[] { "TABLE" }); + Table[] tableArray = database.getTables(); + + for (Table table : tableArray) { + if (trigger.matches(table, catalogName, + schemaName, ignoreCase) + && !containsExactMatchForSourceTableName(table, triggers, + ignoreCase) + && !table.getName().toLowerCase().startsWith(tablePrefix)) { + tables.add(table); + } + } + } else { + Table table = symmetricDialect.getPlatform().getTableFromCache( + catalogName, schemaName, + trigger.getSourceTableName(), false); + if (table != null) { + tables.add(table); + } + } + } + } + } catch (Exception ex) { log.error(String.format("Failed to retrieve tables for trigger with id of %s", trigger.getTriggerId()), ex); } @@ -1381,8 +1426,8 @@ protected void updateOrCreateDatabaseTriggers(Trigger trigger, Table table, TriggerHistory latestHistoryBeforeRebuild = getNewestTriggerHistoryForTrigger( trigger.getTriggerId(), - trigger.getSourceCatalogName(), - trigger.getSourceSchemaName(), + trigger.isSourceCatalogNameWildCarded() ? table.getCatalog() : trigger.getSourceCatalogName(), + trigger.isSourceSchemaNameWildCarded() ? table.getSchema() : trigger.getSourceSchemaName(), trigger.isSourceTableNameWildCarded() ? table.getName() : trigger .getSourceTableName()); @@ -1505,11 +1550,11 @@ protected TriggerHistory rebuildTriggerIfNecessary(StringBuilder sqlBuffer, // We had no trigger_hist row, lets validate that the trigger as // defined in the trigger row data does not exist as well. oldTriggerName = newTriggerHist.getTriggerNameForDmlType(dmlType); - oldSourceSchema = trigger.getSourceSchemaName(); - oldCatalogName = trigger.getSourceCatalogName(); + oldSourceSchema = table.getSchema(); + oldCatalogName = table.getCatalog(); if (StringUtils.isNotBlank(oldTriggerName)) { triggerExists = symmetricDialect.doesTriggerExist(oldCatalogName, oldSourceSchema, - trigger.getSourceTableName(), oldTriggerName); + table.getName(), oldTriggerName); } } @@ -1533,8 +1578,8 @@ protected TriggerHistory rebuildTriggerIfNecessary(StringBuilder sqlBuffer, insert(newTriggerHist); hist = getNewestTriggerHistoryForTrigger( trigger.getTriggerId(), - trigger.getSourceCatalogName(), - trigger.getSourceSchemaName(), + trigger.isSourceCatalogNameWildCarded() ? table.getCatalog() : trigger.getSourceCatalogName(), + trigger.isSourceSchemaNameWildCarded() ? table.getSchema() : trigger.getSourceSchemaName(), trigger.isSourceTableNameWildCarded() ? table.getName() : trigger .getSourceTableName()); } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterServiceSqlMap.java index f44176d0c9..8ef7a2eaa4 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterServiceSqlMap.java @@ -107,10 +107,9 @@ public TriggerRouterServiceSqlMap(IDatabasePlatform platform, putSql("latestTriggerHistSql", "" - + "select " + + "select " + " trigger_hist_id,trigger_id,source_table_name,table_hash,create_time,pk_column_names,column_names,last_trigger_build_reason,name_for_delete_trigger,name_for_insert_trigger,name_for_update_trigger,source_schema_name,source_catalog_name,trigger_row_hash,trigger_template_hash,error_message " - + " from $(trigger_hist) where trigger_hist_id = (select max(trigger_hist_id) " - + " from $(trigger_hist) where trigger_id=? and source_table_name=? and inactive_time is null) "); + + " from $(trigger_hist) where trigger_id=? and source_table_name=? and inactive_time is null order by trigger_hist_id desc "); putSql("triggerHistSql", "" diff --git a/symmetric-core/src/main/resources/symmetric-schema.xml b/symmetric-core/src/main/resources/symmetric-schema.xml index a62f98ddf5..1ebe3d3b2d 100644 --- a/symmetric-core/src/main/resources/symmetric-schema.xml +++ b/symmetric-core/src/main/resources/symmetric-schema.xml @@ -670,9 +670,9 @@
- - - + + +