Skip to content

Commit

Permalink
0001798: Add support for wildcarded catalogs and schemas
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Jul 9, 2014
1 parent 2ceed70 commit 23b7987
Show file tree
Hide file tree
Showing 9 changed files with 148 additions and 84 deletions.
Expand Up @@ -51,10 +51,10 @@ public H2SymmetricDialect(IParameterService parameterService, IDatabasePlatform
protected boolean doesTriggerExistOnPlatform(String catalogName, String schemaName, String tableName,
String triggerName) {
boolean exists = (platform.getSqlTemplate()
.queryForInt("select count(*) from INFORMATION_SCHEMA.TRIGGERS WHERE TRIGGER_NAME = ?",
new Object[] { triggerName }) > 0)
&& (platform.getSqlTemplate().queryForInt("select count(*) from INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = ?",
new Object[] { String.format("%s_CONFIG", triggerName) }) > 0);
.queryForInt("select count(*) from INFORMATION_SCHEMA.TRIGGERS WHERE TRIGGER_NAME = ? and (TRIGGER_CATALOG=? or ? is null) and (TRIGGER_SCHEMA=? or ? is null)",
new Object[] { triggerName, catalogName, catalogName, schemaName, schemaName }) > 0)
&& (platform.getSqlTemplate().queryForInt("select count(*) from INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = ? and (TABLE_CATALOG=? or ? is null) and (TABLE_SCHEMA=? or ? is null)",
new Object[] { String.format("%s_CONFIG", triggerName), catalogName, catalogName, schemaName, schemaName }) > 0);

if (!exists && !StringUtils.isBlank(triggerName)) {
removeTrigger(new StringBuilder(), catalogName, schemaName, triggerName, tableName);
Expand Down
Expand Up @@ -722,13 +722,9 @@ public NodeStatus getNodeStatus() {
}

public void removeAndCleanupNode(String nodeId) {
log.warn("Removing node {}", nodeId);
log.info("Removing node {}", nodeId);
nodeService.deleteNode(nodeId, false);
log.warn("Marking outgoing batch records as OK for node ID {}", nodeId);
outgoingBatchService.markAllAsSentForNode(nodeId, true);
log.warn("Removing incoming batch records for node ID {}", nodeId);
incomingBatchService.removingIncomingBatches(nodeId);
log.warn("Done removing node ID {}", nodeId);
log.info("Done removing node ID {}", nodeId);
}

public RemoteNodeStatuses pull() {
Expand Down
Expand Up @@ -292,7 +292,7 @@ public void createTrigger(final StringBuilder sqlBuffer, final DataEventType dml
table.getFullyQualifiedTableName());

String previousCatalog = null;
String sourceCatalogName = trigger.getSourceCatalogName();
String sourceCatalogName = table.getCatalog();
String defaultCatalog = platform.getDefaultCatalog();
String defaultSchema = platform.getDefaultSchema();

Expand Down
Expand Up @@ -153,7 +153,7 @@ public String createInitalLoadSql(Node node, TriggerRouter triggerRouter, Table
, sql);
sql = FormatUtils.replace("tableName", SymmetricUtils.quote(symmetricDialect, table.getName()), sql);
sql = FormatUtils.replace("schemaName",
triggerHistory == null ? getSourceTablePrefix(triggerRouter.getTrigger())
triggerHistory == null ? getSourceTablePrefix(originalTable)
: getSourceTablePrefix(triggerHistory), sql);
sql = FormatUtils.replace(
"primaryKeyWhereString",
Expand All @@ -174,11 +174,10 @@ public String createInitalLoadSql(Node node, TriggerRouter triggerRouter, Table
return sql;
}

protected String getSourceTablePrefix(Trigger trigger) {
String schemaPlus = (trigger.getSourceSchemaName() != null ? trigger.getSourceSchemaName()
protected String getSourceTablePrefix(Table table) {
String schemaPlus = (table.getSchema() != null ? table.getSchema()
+ "." : "");
String catalogPlus = (trigger.getSourceCatalogName() != null ? trigger
.getSourceCatalogName() + "." : "")
String catalogPlus = (table.getCatalog() != null ? table.getCatalog() + "." : "")
+ schemaPlus;
return catalogPlus;
}
Expand Down Expand Up @@ -218,7 +217,7 @@ public String createCsvDataSql(Trigger trigger, TriggerHistory triggerHistory, T

sql = FormatUtils.replace("tableName", SymmetricUtils.quote(symmetricDialect, table.getName()), sql);
sql = FormatUtils.replace("schemaName",
triggerHistory == null ? getSourceTablePrefix(trigger)
triggerHistory == null ? getSourceTablePrefix(originalTable)
: getSourceTablePrefix(triggerHistory), sql);

sql = FormatUtils.replace("whereClause", whereClause, sql);
Expand Down Expand Up @@ -246,7 +245,7 @@ public String createCsvPrimaryKeySql(Trigger trigger, TriggerHistory triggerHist
trigger.isUseCaptureLobs() ? "to_clob('')||" : "", sql);
sql = FormatUtils.replace("tableName", SymmetricUtils.quote(symmetricDialect, table.getName()), sql);
sql = FormatUtils.replace("schemaName",
triggerHistory == null ? getSourceTablePrefix(trigger)
triggerHistory == null ? getSourceTablePrefix(table)
: getSourceTablePrefix(triggerHistory), sql);
sql = FormatUtils.replace("whereClause", whereClause, sql);
sql = FormatUtils.replace(
Expand Down Expand Up @@ -379,7 +378,7 @@ protected String replaceTemplateVariables(DataEventType dml, Trigger trigger,

// some column templates need tableName and schemaName
ddl = FormatUtils.replace("tableName", SymmetricUtils.quote(symmetricDialect, table.getName()), ddl);
ddl = FormatUtils.replace("schemaName", history == null ? getSourceTablePrefix(trigger)
ddl = FormatUtils.replace("schemaName", history == null ? getSourceTablePrefix(originalTable)
: getSourceTablePrefix(history), ddl);

Column[] primaryKeyColumns = table.getPrimaryKeyColumns();
Expand Down
Expand Up @@ -256,7 +256,15 @@ public String getSourceTableName() {
}

public boolean isSourceTableNameWildCarded() {
return sourceTableName != null && sourceTableName.contains(FormatUtils.WILDCARD);
return sourceTableName != null && (sourceTableName.contains(FormatUtils.WILDCARD) || sourceTableName.contains(","));
}

public boolean isSourceCatalogNameWildCarded() {
return sourceCatalogName != null && (sourceCatalogName.contains(FormatUtils.WILDCARD) || sourceCatalogName.contains(","));
}

public boolean isSourceSchemaNameWildCarded() {
return sourceSchemaName != null && (sourceSchemaName.contains(FormatUtils.WILDCARD) || sourceSchemaName.contains(","));
}

public String getChannelExpression() {
Expand Down Expand Up @@ -595,6 +603,30 @@ public long toHashedValue() {

return hashedValue;
}

public boolean matchesCatalogName(String catalogName, boolean ignoreCase) {
return matches(sourceCatalogName, catalogName, ignoreCase);
}

public boolean matchesSchemaName(String schemaName, boolean ignoreCase) {
return matches(sourceSchemaName, schemaName, ignoreCase);
}

protected boolean matches(String match, String target, boolean ignoreCase) {
boolean matches = false;
String[] wildcardTokens = match.split(",");
for (String wildcardToken : wildcardTokens) {
if (FormatUtils.isWildCardMatch(target, wildcardToken, ignoreCase)) {
if (!wildcardToken.startsWith(FormatUtils.NEGATE_TOKEN)) {
matches = true;
} else {
matches = false;
break;
}
}
}
return matches;
}

public boolean matches(Table table, String defaultCatalog, String defaultSchema,
boolean ignoreCase) {
Expand All @@ -609,16 +641,7 @@ public boolean matches(Table table, String defaultCatalog, String defaultSchema,
: table.getName().equals(sourceTableName);

if (!tableMatches && isSourceTableNameWildCarded()) {
String[] wildcardTokens = sourceTableName.split(",");
for (String wildcardToken : wildcardTokens) {
if (FormatUtils.isWildCardMatch(table.getName(), wildcardToken, ignoreCase)) {
if (!wildcardToken.startsWith(FormatUtils.NEGATE_TOKEN)) {
tableMatches = true;
} else {
tableMatches = false;
}
}
}
tableMatches = matches(sourceTableName, table.getName(), ignoreCase);
}
return schemaAndCatalogMatch && tableMatches;
}
Expand Down
Expand Up @@ -115,8 +115,10 @@ public TriggerHistory(Table table, Trigger trigger, AbstractTriggerTemplate trig
this.sourceTableName = trigger.isSourceTableNameWildCarded() ? table.getName() : trigger
.getSourceTableName();
this.columnNames = Table.getCommaDeliminatedColumns(trigger.orderColumnsForTable(table));
this.sourceSchemaName = trigger.getSourceSchemaName();
this.sourceCatalogName = trigger.getSourceCatalogName();
this.sourceSchemaName = trigger.isSourceSchemaNameWildCarded() ? table.getSchema() :
trigger.getSourceSchemaName();
this.sourceCatalogName = trigger.isSourceCatalogNameWildCarded() ? table.getCatalog() :
trigger.getSourceCatalogName();
this.triggerId = trigger.getTriggerId();
this.pkColumnNames = Table.getCommaDeliminatedColumns(trigger.filterExcludedColumns(trigger
.getSyncKeysColumnsForTable(table)));
Expand Down
Expand Up @@ -1113,24 +1113,26 @@ protected void inactivateTriggers(List<Trigger> triggersThatShouldBeActive,
if (tables == null || tables.size() == 0 || trigger == null) {
removeTrigger = true;
} else {
if (!StringUtils.equals(trigger.getSourceCatalogName(),
history.getSourceCatalogName())
|| !StringUtils.equals(trigger.getSourceSchemaName(),
history.getSourceSchemaName())) {
boolean foundTable = false;

for (Table table : tables) {
boolean matchesCatalog = isEqual(
trigger.isSourceCatalogNameWildCarded() ? table.getCatalog()
: trigger.getSourceCatalogName(),
history.getSourceCatalogName(), ignoreCase);
boolean matchesSchema = isEqual(
trigger.isSourceSchemaNameWildCarded() ? table.getSchema()
: trigger.getSourceSchemaName(), history.getSourceSchemaName(),
ignoreCase);
boolean matchesTable = isEqual(
trigger.isSourceTableNameWildCarded() ? table.getName()
: trigger.getSourceTableName(), history.getSourceTableName(),
ignoreCase);
foundTable |= matchesCatalog && matchesSchema && matchesTable;
}

if (!foundTable) {
removeTrigger = true;
} else {
if (trigger.isSourceTableNameWildCarded()) {
boolean foundMatch = false;
for (Table table : tables) {
foundMatch |= ignoreCase ? StringUtils.equalsIgnoreCase(
table.getName(), history.getSourceTableName()) : StringUtils
.equals(table.getName(), history.getSourceTableName());
}
removeTrigger = !foundMatch;
} else if (!StringUtils.equals(trigger.getSourceTableName(),
history.getSourceTableName())) {
removeTrigger = true;
}
}
}

Expand All @@ -1142,6 +1144,14 @@ protected void inactivateTriggers(List<Trigger> triggersThatShouldBeActive,
}
}

protected boolean isEqual(String one, String two, boolean ignoreCase) {
if (ignoreCase) {
return StringUtils.equalsIgnoreCase(one, two);
} else {
return StringUtils.equals(one, two);
}
}

public void dropTriggers(TriggerHistory history) {
dropTriggers(history, null);
}
Expand Down Expand Up @@ -1210,32 +1220,67 @@ protected List<Trigger> getTriggersForCurrentNode() {
protected Set<Table> getTablesForTrigger(Trigger trigger, List<Trigger> triggers) {
Set<Table> tables = new HashSet<Table>();
try {
if (trigger.isSourceTableNameWildCarded()) {
boolean ignoreCase = this.parameterService
.is(ParameterConstants.DB_METADATA_IGNORE_CASE);

Database database = symmetricDialect.getPlatform().readDatabase(
trigger.getSourceCatalogName(), trigger.getSourceSchemaName(),
new String[] { "TABLE" });
Table[] tableArray = database.getTables();

for (Table table : tableArray) {
if (trigger.matches(table, platform.getDefaultCatalog(),
platform.getDefaultSchema(), ignoreCase)
&& !containsExactMatchForSourceTableName(table, triggers,
ignoreCase)
&& !table.getName().toLowerCase().startsWith(tablePrefix)) {
tables.add(table);
boolean ignoreCase = this.parameterService
.is(ParameterConstants.DB_METADATA_IGNORE_CASE);

List<String> catalogNames = new ArrayList<String>();
if (trigger.isSourceCatalogNameWildCarded()) {
List<String> all = platform.getDdlReader().getCatalogNames();
for (String catalogName : all) {
if (trigger.matchesCatalogName(catalogName, ignoreCase)) {
catalogNames.add(catalogName);
}
}
if (catalogNames.size() == 0) {
catalogNames.add(null);
}
} else {
Table table = symmetricDialect.getPlatform().getTableFromCache(
trigger.getSourceCatalogName(), trigger.getSourceSchemaName(),
trigger.getSourceTableName(), false);
if (table != null) {
tables.add(table);
catalogNames.add(trigger.getSourceCatalogName());
}

for (String catalogName : catalogNames) {
List<String> schemaNames = new ArrayList<String>();
if (trigger.isSourceSchemaNameWildCarded()) {
List<String> all = platform.getDdlReader().getSchemaNames(catalogName);
for (String schemaName : all) {
if (trigger.matchesSchemaName(schemaName, ignoreCase)) {
schemaNames.add(schemaName);
}
}
if (schemaNames.size() == 0) {
schemaNames.add(null);
}
} else {
schemaNames.add(trigger.getSourceSchemaName());
}
}

for (String schemaName : schemaNames) {
if (trigger.isSourceTableNameWildCarded()) {
Database database = symmetricDialect.getPlatform().readDatabase(
catalogName, schemaName,
new String[] { "TABLE" });
Table[] tableArray = database.getTables();

for (Table table : tableArray) {
if (trigger.matches(table, catalogName,
schemaName, ignoreCase)
&& !containsExactMatchForSourceTableName(table, triggers,
ignoreCase)
&& !table.getName().toLowerCase().startsWith(tablePrefix)) {
tables.add(table);
}
}
} else {
Table table = symmetricDialect.getPlatform().getTableFromCache(
catalogName, schemaName,
trigger.getSourceTableName(), false);
if (table != null) {
tables.add(table);
}
}
}
}

} catch (Exception ex) {
log.error(String.format("Failed to retrieve tables for trigger with id of %s", trigger.getTriggerId()), ex);
}
Expand Down Expand Up @@ -1381,8 +1426,8 @@ protected void updateOrCreateDatabaseTriggers(Trigger trigger, Table table,

TriggerHistory latestHistoryBeforeRebuild = getNewestTriggerHistoryForTrigger(
trigger.getTriggerId(),
trigger.getSourceCatalogName(),
trigger.getSourceSchemaName(),
trigger.isSourceCatalogNameWildCarded() ? table.getCatalog() : trigger.getSourceCatalogName(),
trigger.isSourceSchemaNameWildCarded() ? table.getSchema() : trigger.getSourceSchemaName(),
trigger.isSourceTableNameWildCarded() ? table.getName() : trigger
.getSourceTableName());

Expand Down Expand Up @@ -1505,11 +1550,11 @@ protected TriggerHistory rebuildTriggerIfNecessary(StringBuilder sqlBuffer,
// We had no trigger_hist row, lets validate that the trigger as
// defined in the trigger row data does not exist as well.
oldTriggerName = newTriggerHist.getTriggerNameForDmlType(dmlType);
oldSourceSchema = trigger.getSourceSchemaName();
oldCatalogName = trigger.getSourceCatalogName();
oldSourceSchema = table.getSchema();
oldCatalogName = table.getCatalog();
if (StringUtils.isNotBlank(oldTriggerName)) {
triggerExists = symmetricDialect.doesTriggerExist(oldCatalogName, oldSourceSchema,
trigger.getSourceTableName(), oldTriggerName);
table.getName(), oldTriggerName);
}
}

Expand All @@ -1533,8 +1578,8 @@ protected TriggerHistory rebuildTriggerIfNecessary(StringBuilder sqlBuffer,
insert(newTriggerHist);
hist = getNewestTriggerHistoryForTrigger(
trigger.getTriggerId(),
trigger.getSourceCatalogName(),
trigger.getSourceSchemaName(),
trigger.isSourceCatalogNameWildCarded() ? table.getCatalog() : trigger.getSourceCatalogName(),
trigger.isSourceSchemaNameWildCarded() ? table.getSchema() : trigger.getSourceSchemaName(),
trigger.isSourceTableNameWildCarded() ? table.getName() : trigger
.getSourceTableName());
}
Expand Down

0 comments on commit 23b7987

Please sign in to comment.