Skip to content

Commit

Permalink
0004838: New extension point that can be used while obtaining a list of
Browse files Browse the repository at this point in the history
tables for a given trigger configuration
  • Loading branch information
joshhicks committed Feb 17, 2021
1 parent 3aff6be commit 9ccc7e9
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 10 deletions.
Expand Up @@ -130,4 +130,7 @@ This property is currently only supported on MySQL, DB2, SQL Server, and Oracle.
include::table-triggers/wildcards.ad[]
include::table-triggers/external-select.ad[]
include::table-triggers/trigger-variables.ad[]
include::table-triggers/load-only.ad[]
include::table-triggers/load-only.ad[]
ifdef::pro[]
include::table-triggers/expandable.ad[]
endif::pro[]
@@ -0,0 +1,25 @@

==== Expandable Triggers

When a set of tables is repeated for each node and the tables all contain a prefix which groups them together an expandable trigger can be used. The prefix will be resolved to the external id that the source node replicates with either through a push or pull.

.Variable
[cols=".^2,2"]
|===

|$(targetExternalId)|Resolved by looking up all the nodes the source node replicates with and will be replaced with each external id to represent a table.

|===

.Example
----
$(targetExternalId)-Item

If there are nodes present with external ids of client1, client2, client3 then this trigger configuration would match the following 3 tables.

client1-Item
client2-Item
client3-Item

This reduces configuration if there are for example 10 tables that are repeated for each client then only table trigger configurations would be needed and as new nodes are added (along with their tables) there is no need to add any more configuration.
----
@@ -0,0 +1,15 @@
package org.jumpmind.symmetric.config;

import java.util.Set;

import org.jumpmind.db.model.Table;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.extension.IExtensionPoint;
import org.jumpmind.symmetric.model.Trigger;
import org.jumpmind.symmetric.service.INodeService;

public interface ITableResolver extends IExtensionPoint {

public void resolve(String catalog, String schema, Set<Table> tables, IDatabasePlatform platform,
INodeService nodeService, Trigger trigger);
}
Expand Up @@ -61,6 +61,8 @@ public class Trigger implements Serializable {
private boolean isSourceSchemaWildCarded;

private boolean isSourceCatalogWildCarded;

private boolean isSourceTableNameExpanded;

private String sourceTableNameUnescaped;

Expand Down Expand Up @@ -344,7 +346,15 @@ public boolean isSourceTableNameWildCarded() {
return isSourceTableNameWildCarded;
}

public boolean isSourceCatalogNameWildCarded() {
public boolean isSourceTableNameExpanded() {
return isSourceTableNameExpanded;
}

public void setSourceTableNameExpanded(boolean isSourceTableNameExpanded) {
this.isSourceTableNameExpanded = isSourceTableNameExpanded;
}

public boolean isSourceCatalogNameWildCarded() {
return isSourceCatalogWildCarded;
}

Expand Down
Expand Up @@ -120,7 +120,7 @@ public TriggerHistory(Table table, Trigger trigger, AbstractTriggerTemplate trig
}

this.lastTriggerBuildReason = reason;
this.sourceTableName = trigger.isSourceTableNameWildCarded() ? table.getName() :
this.sourceTableName = (trigger.isSourceTableNameWildCarded() || trigger.isSourceTableNameExpanded()) ? table.getName() :
trigger.getSourceTableNameUnescaped();
this.columnNames = Table.getCommaDeliminatedColumns(trigger.orderColumnsForTable(table));
this.sourceSchemaName = trigger.isSourceSchemaNameWildCarded() ? table.getSchema() :
Expand Down
Expand Up @@ -42,6 +42,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.jumpmind.db.model.Column;
import org.jumpmind.db.model.Database;
Expand All @@ -56,6 +57,7 @@
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.common.TableConstants;
import org.jumpmind.symmetric.config.ITableResolver;
import org.jumpmind.symmetric.config.ITriggerCreationListener;
import org.jumpmind.symmetric.config.TriggerFailureListener;
import org.jumpmind.symmetric.config.TriggerSelector;
Expand Down Expand Up @@ -1379,7 +1381,7 @@ public void run() {
: trigger.getSourceSchemaNameUnescaped(), history.getSourceSchemaName(),
ignoreCase);
boolean matchesTable = isEqual(
trigger.isSourceTableNameWildCarded() ? table.getName()
(trigger.isSourceTableNameWildCarded() || trigger.isSourceTableNameExpanded()) ? table.getName()
: trigger.getSourceTableNameUnescaped(), history.getSourceTableName(),
ignoreCase);
foundTable |= matchesCatalog && matchesSchema && matchesTable;
Expand Down Expand Up @@ -1538,14 +1540,19 @@ protected Set<Table> getTablesForTrigger(Trigger trigger, List<Trigger> triggers
tables.add(table);
}
}
} else if (!trigger.getSourceTableName().startsWith(parameterService.getTablePrefix() + "_")
&& CollectionUtils.isNotEmpty(extensionService.getExtensionPointList(ITableResolver.class))) {
for (ITableResolver resolver : extensionService.getExtensionPointList(ITableResolver.class)) {
resolver.resolve(catalogName, schemaName, tables, sourcePlatform, nodeService, trigger);
}
} else {
Table table = sourcePlatform.getTableFromCache(
catalogName, schemaName,
trigger.getSourceTableNameUnescaped(), !useTableCache);
if (table != null) {
tables.add(table);
}
}
}
}
}

Expand Down Expand Up @@ -1589,7 +1596,8 @@ public void syncTriggers(Table table, boolean force) {
List<TriggerHistory> activeTriggerHistories = getActiveTriggerHistories();
for (Trigger trigger : triggersForCurrentNode) {
if (trigger.matches(table, platform.getDefaultCatalog(), platform.getDefaultSchema(), ignoreCase) &&
(!trigger.isSourceTableNameWildCarded() || !containsExactMatchForSourceTableName(table, triggersForCurrentNode, ignoreCase))) {
(!trigger.isSourceTableNameWildCarded() || !trigger.isSourceTableNameExpanded()
|| !containsExactMatchForSourceTableName(table, triggersForCurrentNode, ignoreCase))) {
log.info("Synchronizing triggers for {}", table.getFullyQualifiedTableName());
updateOrCreateDatabaseTriggers(trigger, table, null, force, true, activeTriggerHistories);
log.info("Done synchronizing triggers for {}", table.getFullyQualifiedTableName());
Expand Down Expand Up @@ -1668,7 +1676,7 @@ public void syncTrigger(Trigger trigger, ITriggerCreationListener listener, bool

List<TriggerHistory> allHistories = getActiveTriggerHistories();
if (triggersForCurrentNode.contains(trigger)) {
if (!trigger.isSourceTableNameWildCarded()) {
if (!trigger.isSourceTableNameWildCarded() && !trigger.isSourceTableNameExpanded()) {
for (TriggerHistory triggerHistory : getActiveTriggerHistories(trigger)) {
if (!triggerHistory.getFullyQualifiedSourceTableName().equals(trigger.getFullyQualifiedSourceTableName())) {
dropTriggers(triggerHistory, sqlBuffer);
Expand Down Expand Up @@ -1721,7 +1729,7 @@ protected void updateOrCreateDatabaseTriggers(Trigger trigger, Table table,
trigger.getTriggerId(),
trigger.isSourceCatalogNameWildCarded() ? table.getCatalog() : trigger.getSourceCatalogNameUnescaped(),
trigger.isSourceSchemaNameWildCarded() ? table.getSchema() : trigger.getSourceSchemaNameUnescaped(),
trigger.isSourceTableNameWildCarded() ? table.getName() : trigger.getSourceTableNameUnescaped());
(trigger.isSourceTableNameWildCarded() || trigger.isSourceTableNameExpanded()) ? table.getName() : trigger.getSourceTableNameUnescaped());

boolean forceRebuildOfTriggers = false;
if (latestHistoryBeforeRebuild == null) {
Expand Down Expand Up @@ -1827,7 +1835,7 @@ protected TriggerHistory rebuildTriggerIfNecessary(StringBuilder sqlBuffer,
hist = getNewestTriggerHistoryForTrigger(trigger.getTriggerId(),
trigger.isSourceCatalogNameWildCarded() ? table.getCatalog() : trigger.getSourceCatalogNameUnescaped(),
trigger.isSourceSchemaNameWildCarded() ? table.getSchema() : trigger.getSourceSchemaNameUnescaped(),
trigger.isSourceTableNameWildCarded() ? table.getName() : trigger.getSourceTableNameUnescaped());
(trigger.isSourceTableNameWildCarded() || trigger.isSourceTableNameExpanded()) ? table.getName() : trigger.getSourceTableNameUnescaped());
}
return hist;
}
Expand Down Expand Up @@ -1893,7 +1901,7 @@ protected TriggerHistory rebuildTriggerIfNecessary(StringBuilder sqlBuffer,
hist = getNewestTriggerHistoryForTrigger(trigger.getTriggerId(),
trigger.isSourceCatalogNameWildCarded() ? table.getCatalog() : trigger.getSourceCatalogNameUnescaped(),
trigger.isSourceSchemaNameWildCarded() ? table.getSchema() : trigger.getSourceSchemaNameUnescaped(),
trigger.isSourceTableNameWildCarded() ? table.getName() : trigger.getSourceTableNameUnescaped());
trigger.isSourceTableNameWildCarded() || trigger.isSourceTableNameExpanded() ? table.getName() : trigger.getSourceTableNameUnescaped());
}

try {
Expand Down

0 comments on commit 9ccc7e9

Please sign in to comment.