diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/util/SnapshotUtil.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/util/SnapshotUtil.java index 4dbff56b87..5e3845c5b8 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/util/SnapshotUtil.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/util/SnapshotUtil.java @@ -147,7 +147,7 @@ public static File createSnapshot(ISymmetricEngine engine) { } } - List triggers = triggerRouterService.getTriggers(); + List triggers = triggerRouterService.getTriggers(true); for (Trigger trigger : triggers) { Table table = engine.getDatabasePlatform().getTableFromCache(trigger.getSourceCatalogName(), trigger.getSourceSchemaName(), trigger.getSourceTableName(), diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java index 90f735997a..2f817eece9 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java @@ -608,7 +608,7 @@ public synchronized void uninstall() { groupletService.deleteGrouplet(grouplet); } - List triggerRouters = triggerRouterService.getTriggerRouters(true); + List triggerRouters = triggerRouterService.getTriggerRouters(false, true); for (TriggerRouter triggerRouter : triggerRouters) { triggerRouterService.deleteTriggerRouter(triggerRouter); } @@ -618,7 +618,7 @@ public synchronized void uninstall() { fileSyncService.deleteFileTriggerRouter(fileTriggerRouter); } - List routers = triggerRouterService.getRouters(); + List routers = triggerRouterService.getRouters(true); for (Router router : routers) { triggerRouterService.deleteRouter(router); } @@ -652,7 +652,7 @@ public synchronized void uninstall() { table = platform.readTableFromDatabase(null, null, TableConstants.getTableName( parameterService.getTablePrefix(), TableConstants.SYM_ROUTER)); if (table != null) { - List objects = triggerRouterService.getRouters(); + List objects = triggerRouterService.getRouters(true); for (Router router : objects) { triggerRouterService.deleteRouter(router); } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConfigurationChangedDataRouter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConfigurationChangedDataRouter.java index ff1a762e49..bba66db1b6 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConfigurationChangedDataRouter.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConfigurationChangedDataRouter.java @@ -330,14 +330,14 @@ protected void queueSyncTriggers(SimpleRouterContext routingContext, DataMetaDat String triggerId = columnValues.get("TRIGGER_ID"); if (tableMatches(dataMetaData, TableConstants.SYM_TRIGGER_ROUTER)) { String routerId = columnValues.get("ROUTER_ID"); - TriggerRouter tr = triggerRouterService.findTriggerRouterById(triggerId, + TriggerRouter tr = triggerRouterService.findTriggerRouterById(true, triggerId, routerId); if (tr != null) { trigger = tr.getTrigger(); lastUpdateTime = tr.getLastUpdateTime(); } } else { - trigger = triggerRouterService.getTriggerById(triggerId); + trigger = triggerRouterService.getTriggerById(true, triggerId); if (trigger != null) { lastUpdateTime = trigger.getLastUpdateTime(); } @@ -493,7 +493,7 @@ public void contextCommitted(SimpleRouterContext routingContext) { log.info("About to sync the " + trigger.getTriggerId() + " trigger because a change was detected by the config data router"); - engine.getTriggerRouterService().syncTrigger(trigger, null, false); + engine.getTriggerRouterService().syncTrigger(trigger.getTriggerId(), null, false); } } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/ITriggerRouterService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/ITriggerRouterService.java index 1052c0b1fc..e9ddda00bf 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/ITriggerRouterService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/ITriggerRouterService.java @@ -48,7 +48,7 @@ public interface ITriggerRouterService { public List getActiveTriggerHistories(String tableName); - public List getTriggerRouters(boolean refreshCache); + public List getTriggerRouters(boolean substituteParameters, boolean refreshCache); /** * Return a list of triggers used when extraction configuration data during @@ -59,21 +59,17 @@ public interface ITriggerRouterService { public List buildTriggerRoutersForSymmetricTables(String version, NodeGroupLink nodeGroupLink, String... tablesToExclude); public String buildSymmetricTableRouterId(String triggerId, String sourceNodeGroupId, String targetNodeGroupId); - - public Trigger getTriggerForCurrentNodeById(String triggerId); - + public TriggerRouter getTriggerRouterForCurrentNode(String triggerId, String routerId, boolean refreshCache); /** - * Returns a list of triggers that should be active for the current node. + * Returns a list of triggers that should be active for the current node. * @param refreshCache Indicates that the cache should be refreshed */ public List getTriggersForCurrentNode(boolean refreshCache); - - public Map> getTriggerRoutersByChannel(String nodeGroupId); - + /** - * Returns a map of trigger routers keyed by trigger id. + * Returns a map of trigger routers keyed by trigger id. * @param refreshCache Indicates that the cache should be refreshed */ public Map> getTriggerRoutersForCurrentNode(boolean refreshCache); @@ -85,11 +81,11 @@ public interface ITriggerRouterService { */ public Router getActiveRouterByIdForCurrentNode(String routerId, boolean refreshCache); - public Router getRouterById(String routerId); + public Router getRouterById(boolean substituteParameters, String routerId); - public Router getRouterById(String routerId, boolean refreshCache); + public Router getRouterById(boolean substituteParameters, String routerId, boolean refreshCache); - public List getRouters(); + public List getRouters(boolean substituteParameters); /** * Get a list of routers for a specific node group link. @@ -102,12 +98,10 @@ public interface ITriggerRouterService { public void saveRouter(Router router); - public List getAllTriggerRoutersForCurrentNode(String sourceNodeGroupId); - /** * Get a list of all the triggers that have been defined for the system. */ - public List getTriggers(); + public List getTriggers(boolean substituteParameters); public void saveTrigger(Trigger trigger); @@ -130,11 +124,11 @@ public List createTriggersOnChannelForTablesWithReturn(String channelId public List getAllTriggerRoutersForReloadForCurrentNode(String sourceNodeGroupId, String targetNodeGroupId); - public Set getTriggerRouterForTableForCurrentNode(NodeGroupLink link, String catalogName, String schemaName, String tableName, boolean refreshCache); + public Set getTriggerRouterForTableForCurrentNode(boolean substituteParameters, NodeGroupLink link, String catalogName, String schemaName, String tableName, boolean refreshCache); - public Set getTriggerRouterForTableForCurrentNode(String catalog, String schema, String tableName, boolean refreshCache); + public Set getTriggerRouterForTableForCurrentNode(boolean substituteParameters, String catalog, String schema, String tableName, boolean refreshCache); - public TriggerRouter findTriggerRouterById(String triggerId, String routerId); + public TriggerRouter findTriggerRouterById(boolean substituteParameters, String triggerId, String routerId); public void inactivateTriggerHistory(TriggerHistory history); @@ -147,9 +141,9 @@ public TriggerHistory getNewestTriggerHistoryForTrigger(String triggerId, String public TriggerHistory findTriggerHistory(String catalogName, String schemaName, String tableName); - public Trigger getTriggerById(String triggerId); + public Trigger getTriggerById(boolean substituteParameters, String triggerId); - public Trigger getTriggerById(String triggerId, boolean refreshCache); + public Trigger getTriggerById(boolean substituteParameters, String triggerId, boolean refreshCache); public void insert(TriggerHistory newAuditRecord); @@ -163,9 +157,9 @@ public TriggerHistory getNewestTriggerHistoryForTrigger(String triggerId, String public void saveTriggerRouter(TriggerRouter triggerRouter); - public void syncTrigger(Trigger trigger, ITriggerCreationListener listener, boolean force); + public void syncTrigger(String triggerId, ITriggerCreationListener listener, boolean force); - public void syncTrigger(Trigger trigger, ITriggerCreationListener listener, boolean force, boolean verifyTrigger); + public void syncTrigger(String triggerId, ITriggerCreationListener listener, boolean force, boolean verifyTrigger); public void syncTriggers(Table table, boolean genAlways); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index b8896d7935..8ada301648 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -955,7 +955,7 @@ protected Table lookupAndOrderColumnsAccordingToTriggerHistory(String routerId, table.setPrimaryKeys(triggerHistory.getParsedPkColumnNames()); } - Router router = triggerRouterService.getRouterById(routerId, false); + Router router = triggerRouterService.getRouterById(true, routerId, false); if (router != null && setTargetTableName) { if (router.isUseSourceCatalogSchema()) { table.setCatalog(catalogName); @@ -1175,7 +1175,7 @@ public ExtractRequest mapRow(Row row) { request.setStatus(ExtractStatus.valueOf(row.getString("status").toUpperCase())); request.setCreateTime(row.getDateTime("create_time")); request.setLastUpdateTime(row.getDateTime("last_update_time")); - request.setTriggerRouter(triggerRouterService.findTriggerRouterById( + request.setTriggerRouter(triggerRouterService.findTriggerRouterById(true, row.getString("trigger_id"), row.getString("router_id"))); return request; } @@ -1411,7 +1411,7 @@ public CsvData next() { return next(); } } else { - Trigger trigger = triggerRouterService.getTriggerById( + Trigger trigger = triggerRouterService.getTriggerById(true, triggerHistory.getTriggerId(), false); if (trigger != null) { if (lastTriggerHistory == null || lastTriggerHistory diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java index f27adee5e6..698750fec7 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java @@ -111,8 +111,8 @@ public boolean insertReloadEvent(TableReloadRequest request, boolean deleteAtCli INodeService nodeService = engine.getNodeService(); Node targetNode = nodeService.findNode(request.getTargetNodeId()); if (targetNode != null) { - TriggerRouter triggerRouter = triggerRouterService.getTriggerRouterForCurrentNode( - request.getTriggerId(), request.getRouterId(), false); + TriggerRouter triggerRouter = triggerRouterService.getTriggerRouterForCurrentNode(request.getTriggerId(), + request.getRouterId(), false); if (triggerRouter != null) { Trigger trigger = triggerRouter.getTrigger(); Router router = triggerRouter.getRouter(); @@ -678,8 +678,8 @@ private void insertFileSyncBatchForReload(Node targetNode, long loadId, String c fileSyncSnapshotHistory.getTriggerId(), parameterService.getNodeGroupId(), targetNode.getNodeGroupId()); TriggerRouter fileSyncSnapshotTriggerRouter = triggerRouterService - .getTriggerRouterForCurrentNode(fileSyncSnapshotHistory.getTriggerId(), - routerid, true); + .getTriggerRouterForCurrentNode(fileSyncSnapshotHistory.getTriggerId(), routerid, + true); List channels = engine.getConfigurationService().getFileSyncChannels(); for (Channel channel : channels) { @@ -748,7 +748,7 @@ public void insertSqlEvent(Node targetNode, String sql, boolean isLoad, long loa String createBy) { TriggerHistory history = engine.getTriggerRouterService() .findTriggerHistoryForGenericSync(); - Trigger trigger = engine.getTriggerRouterService().getTriggerById(history.getTriggerId(), + Trigger trigger = engine.getTriggerRouterService().getTriggerById(true, history.getTriggerId(), false); String reloadChannelId = getReloadChannelIdForTrigger(trigger, engine .getConfigurationService().getChannels(false)); @@ -776,7 +776,7 @@ public void insertSqlEvent(ISqlTransaction transaction, Node targetNode, String protected void insertSqlEvent(ISqlTransaction transaction, TriggerHistory history, String channelId, Node targetNode, String sql, boolean isLoad, long loadId, String createBy) { - Trigger trigger = engine.getTriggerRouterService().getTriggerById(history.getTriggerId(), + Trigger trigger = engine.getTriggerRouterService().getTriggerById(true, history.getTriggerId(), false); String reloadChannelId = getReloadChannelIdForTrigger(trigger, engine .getConfigurationService().getChannels(false)); @@ -796,7 +796,7 @@ public void insertScriptEvent(ISqlTransaction transaction, String channelId, Node targetNode, String script, boolean isLoad, long loadId, String createBy) { TriggerHistory history = engine.getTriggerRouterService() .findTriggerHistoryForGenericSync(); - Trigger trigger = engine.getTriggerRouterService().getTriggerById(history.getTriggerId(), + Trigger trigger = engine.getTriggerRouterService().getTriggerById(true, history.getTriggerId(), false); String reloadChannelId = getReloadChannelIdForTrigger(trigger, engine .getConfigurationService().getChannels(false)); @@ -853,7 +853,7 @@ public void insertCreateEvent(final Node targetNode, TriggerHistory triggerHisto public void insertCreateEvent(ISqlTransaction transaction, Node targetNode, TriggerHistory triggerHistory, String routerId, boolean isLoad, long loadId, String createBy) { - Trigger trigger = engine.getTriggerRouterService().getTriggerById( + Trigger trigger = engine.getTriggerRouterService().getTriggerById(true, triggerHistory.getTriggerId(), false); String reloadChannelId = getReloadChannelIdForTrigger(trigger, engine .getConfigurationService().getChannels(false)); @@ -1027,7 +1027,7 @@ public long insertDataAndDataEventAndOutgoingBatch(ISqlTransaction transaction, if (isLoad) { TriggerHistory history = data.getTriggerHistory(); if (history != null && channelId == null) { - Trigger trigger = engine.getTriggerRouterService().getTriggerById( + Trigger trigger = engine.getTriggerRouterService().getTriggerById(true, history.getTriggerId()); channelId = getReloadChannelIdForTrigger(trigger, engine.getConfigurationService() .getChannels(false)); @@ -1148,7 +1148,7 @@ public String sendSQL(String nodeId, String catalogName, String schemaName, Stri return "Trigger for table " + tableName + " does not exist from node " + sourceNode.getNodeGroupId(); } else { - Trigger trigger = triggerRouterService.getTriggerById(triggerHistory.getTriggerId()); + Trigger trigger = triggerRouterService.getTriggerById(true, triggerHistory.getTriggerId()); if (trigger != null) { ISqlTransaction transaction = null; try { @@ -1256,7 +1256,7 @@ public void insertHeartbeatEvent(Node node, boolean isReload) { for (NodeGroupLink nodeGroupLink : links) { if (nodeGroupLink.getDataEventAction() == NodeGroupLinkAction.P) { Set triggerRouters = engine.getTriggerRouterService() - .getTriggerRouterForTableForCurrentNode(nodeGroupLink, null, null, + .getTriggerRouterForTableForCurrentNode(true, nodeGroupLink, null, null, tableName, false); if (triggerRouters != null && triggerRouters.size() > 0) { Data data = createData(transaction, triggerRouters.iterator().next() @@ -1323,7 +1323,7 @@ public Data createData(ISqlTransaction transaction, String catalogName, String s String tableName, String whereClause) { Data data = null; Set triggerRouters = engine.getTriggerRouterService() - .getTriggerRouterForTableForCurrentNode(catalogName, schemaName, tableName, false); + .getTriggerRouterForTableForCurrentNode(true, catalogName, schemaName, tableName, false); if (triggerRouters != null && triggerRouters.size() > 0) { data = createData(transaction, triggerRouters.iterator().next().getTrigger(), whereClause); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncService.java index 6b5940eae1..d64b8b85db 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncService.java @@ -903,7 +903,7 @@ public FileTriggerRouter mapRow(Row rs) { fileTriggerRouter.setTargetBaseDir((rs.getString("target_base_dir") == null) ? null : rs.getString("target_base_dir").replace('\\', '/')); fileTriggerRouter.setRouter(engine.getTriggerRouterService().getRouterById( - rs.getString("router_id"))); + true, rs.getString("router_id"))); return fileTriggerRouter; } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java index 4eea89e8dc..3fb35e915e 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java @@ -459,7 +459,7 @@ protected int routeDataForChannel(ProcessInfo processInfo, final NodeChannel nod long ts = System.currentTimeMillis(); int dataCount = -1; try { - List triggerRouters = engine.getTriggerRouterService().getTriggerRouters(false); + List triggerRouters = engine.getTriggerRouterService().getTriggerRouters(true, false); boolean producesCommonBatches = producesCommonBatches(nodeChannel.getChannel(), parameterService.getNodeGroupId(), triggerRouters); boolean onlyDefaultRoutersAssigned = onlyDefaultRoutersAssigned(nodeChannel.getChannel(), diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TransformService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TransformService.java index 3ed6f03fc5..ca46620093 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TransformService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TransformService.java @@ -30,6 +30,7 @@ import org.jumpmind.db.sql.ISqlRowMapper; import org.jumpmind.db.sql.ISqlTransaction; import org.jumpmind.db.sql.Row; +import org.jumpmind.properties.TypedProperties; import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.db.ISymmetricDialect; import org.jumpmind.symmetric.io.data.transform.AdditiveColumnTransform; @@ -65,6 +66,7 @@ import org.jumpmind.symmetric.service.IExtensionService; import org.jumpmind.symmetric.service.IParameterService; import org.jumpmind.symmetric.service.ITransformService; +import org.jumpmind.util.FormatUtils; public class TransformService extends AbstractService implements ITransformService { @@ -204,6 +206,15 @@ private Map byTransformPoint.put(transformTable.getTransformPoint(), byTableName); } + /* Allow for parameterized source and target names */ + TypedProperties parameters = parameterService.getAllParameters(); + transformTable.setSourceCatalogName(FormatUtils.replaceTokens(transformTable.getSourceCatalogName(), parameters, true)); + transformTable.setSourceSchemaName(FormatUtils.replaceTokens(transformTable.getSourceSchemaName(), parameters, true)); + transformTable.setSourceTableName(FormatUtils.replaceTokens(transformTable.getSourceTableName(), parameters, true)); + transformTable.setTargetCatalogName(FormatUtils.replaceTokens(transformTable.getTargetCatalogName(), parameters, true)); + transformTable.setTargetSchemaName(FormatUtils.replaceTokens(transformTable.getTargetSchemaName(), parameters, true)); + transformTable.setTargetTableName(FormatUtils.replaceTokens(transformTable.getTargetTableName(), parameters, true)); + byTableName.add(transformTable); } lastCacheTimeInMs = System.currentTimeMillis(); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java index 24b2df47e4..44f1c63489 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java @@ -30,6 +30,7 @@ import java.util.Date; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -41,6 +42,7 @@ import org.jumpmind.db.model.Table; import org.jumpmind.db.sql.ISqlRowMapper; import org.jumpmind.db.sql.Row; +import org.jumpmind.properties.TypedProperties; import org.jumpmind.symmetric.ISymmetricEngine; import org.jumpmind.symmetric.Version; import org.jumpmind.symmetric.common.Constants; @@ -48,7 +50,6 @@ import org.jumpmind.symmetric.common.TableConstants; import org.jumpmind.symmetric.config.ITriggerCreationListener; import org.jumpmind.symmetric.config.TriggerFailureListener; -import org.jumpmind.symmetric.config.TriggerSelector; import org.jumpmind.symmetric.io.data.DataEventType; import org.jumpmind.symmetric.model.Channel; import org.jumpmind.symmetric.model.NodeGroupLink; @@ -70,6 +71,8 @@ import org.jumpmind.symmetric.service.ITriggerRouterService; import org.jumpmind.symmetric.statistic.IStatisticManager; import org.jumpmind.util.FormatUtils; +import org.jumpmind.util.KeyedCache; +import org.jumpmind.util.KeyedCache.IRefreshCache; /** * @see ITriggerRouterService @@ -79,31 +82,21 @@ public class TriggerRouterService extends AbstractService implements ITriggerRou private IClusterService clusterService; private IConfigurationService configurationService; - + private ISequenceService sequenceService; private IExtensionService extensionService; - - private Map routersCache; - - private long routersCacheTime; - private Map triggersCache; + private Map> routersCache = new HashMap>(); - private long triggersCacheTime; - - private long triggerRoutersCacheTime; + private Map> triggersCache = new HashMap>();; private Map triggerRouterCacheByNodeGroupId = new HashMap(); - private Map> triggerRouterCacheByChannel = new HashMap>(); - - private List triggerRoutersCache = new ArrayList(); + private Map> triggerRoutersCache = new HashMap>(); private long triggerRouterPerNodeCacheTime; - private long triggerRouterPerChannelCacheTime; - private TriggerFailureListener failureListener = new TriggerFailureListener(); private IStatisticManager statisticManager; @@ -117,7 +110,7 @@ public class TriggerRouterService extends AbstractService implements ITriggerRou private Date lastUpdateTime; private Object cacheLock = new Object(); - + /** * Cache the history for performance. History never changes and does not * grow big so this should be OK. @@ -134,8 +127,7 @@ public TriggerRouterService(ISymmetricEngine engine) { this.sequenceService = engine.getSequenceService(); this.extensionService = engine.getExtensionService(); engine.getExtensionService().addExtensionPoint(failureListener); - setSqlMap(new TriggerRouterServiceSqlMap(symmetricDialect.getPlatform(), - createSqlReplacementTokens())); + setSqlMap(new TriggerRouterServiceSqlMap(symmetricDialect.getPlatform(), createSqlReplacementTokens())); } public boolean refreshFromDatabase() { @@ -147,7 +139,7 @@ public boolean refreshFromDatabase() { if (date != null) { if (lastUpdateTime == null || lastUpdateTime.before(date)) { if (lastUpdateTime != null) { - log.info("Newer trigger router settings were detected"); + log.info("Newer trigger router settings were detected"); } lastUpdateTime = date; clearCache(); @@ -157,9 +149,8 @@ public boolean refreshFromDatabase() { return false; } - public List getTriggers() { - return sqlTemplate.query("select " - + getSql("selectTriggersColumnList", "selectTriggersSql"), new TriggerMapper()); + public List getTriggers(boolean substituteParameters) { + return sqlTemplate.query("select " + getSql("selectTriggersColumnList", "selectTriggersSql"), new TriggerMapper(substituteParameters)); } public boolean isTriggerBeingUsed(String triggerId) { @@ -181,8 +172,7 @@ public void deleteTrigger(Trigger trigger) { public void dropTriggers() { List activeHistories = getActiveTriggerHistories(); for (TriggerHistory history : activeHistories) { - if (!TableConstants.getTables(symmetricDialect.getTablePrefix()).contains( - history.getSourceTableName())) { + if (!TableConstants.getTables(symmetricDialect.getTablePrefix()).contains(history.getSourceTableName())) { dropTriggers(history, (StringBuilder) null); } } @@ -204,17 +194,17 @@ protected void deleteTriggerHistory(TriggerHistory history) { sqlTemplate.update(getSql("deleteTriggerHistorySql"), history.getTriggerHistoryId()); } - public void createTriggersOnChannelForTables(String channelId, String catalogName, - String schemaName, List tables, String lastUpdateBy) { + public void createTriggersOnChannelForTables(String channelId, String catalogName, String schemaName, List tables, + String lastUpdateBy) { createTriggersOnChannelForTablesWithReturn(channelId, catalogName, schemaName, tables, lastUpdateBy); } - - public List createTriggersOnChannelForTablesWithReturn(String channelId, String catalogName, - String schemaName, List tables, String lastUpdateBy) { - + + public List createTriggersOnChannelForTablesWithReturn(String channelId, String catalogName, String schemaName, + List tables, String lastUpdateBy) { + List createdTriggers = new ArrayList(); - - List existingTriggers = getTriggers(); + + List existingTriggers = getTriggers(false); for (String table : tables) { Trigger trigger = new Trigger(); trigger.setChannelId(channelId); @@ -223,9 +213,9 @@ public List createTriggersOnChannelForTablesWithReturn(String channelId trigger.setSourceTableName(table); String triggerId = table; if (table.length() > 50) { - triggerId = table.substring(0,13) + "_" + UUID.randomUUID().toString(); + triggerId = table.substring(0, 13) + "_" + UUID.randomUUID().toString(); } - + boolean uniqueNameCreated = false; int suffix = 0; while (!uniqueNameCreated) { @@ -236,19 +226,17 @@ public List createTriggersOnChannelForTablesWithReturn(String channelId if (suffix == 0) { triggerId = triggerId + suffixString; } else { - triggerId = triggerId.substring(0, triggerId.length() - - ("_" + (suffix - 1)).length()) - + suffixString; + triggerId = triggerId.substring(0, triggerId.length() - ("_" + (suffix - 1)).length()) + suffixString; } suffix++; } } - + if (triggerId.equals(triggerIdPriorToCheck)) { uniqueNameCreated = true; } } - + trigger.setTriggerId(triggerId); trigger.setLastUpdateBy(lastUpdateBy); trigger.setLastUpdateTime(new Date()); @@ -257,27 +245,21 @@ public List createTriggersOnChannelForTablesWithReturn(String channelId createdTriggers.add(trigger); } return createdTriggers; - + } - - public Collection findMatchingTriggers(List triggers, String catalog, String schema, - String table) { + + public Collection findMatchingTriggers(List triggers, String catalog, String schema, String table) { Set matches = new HashSet(); for (Trigger trigger : triggers) { - boolean catalogMatches = trigger.isSourceCatalogNameWildCarded() + boolean catalogMatches = trigger.isSourceCatalogNameWildCarded() || (catalog == null && trigger.getSourceCatalogName() == null) - || (StringUtils.isBlank(trigger.getSourceCatalogName()) - && StringUtils.isNotBlank(catalog) && catalog.equals(platform.getDefaultCatalog())) - || (StringUtils.isNotBlank(catalog) && catalog.equals(trigger - .getSourceCatalogName())); + || (StringUtils.isBlank(trigger.getSourceCatalogName()) && StringUtils.isNotBlank(catalog) && catalog.equals(platform + .getDefaultCatalog())) || (StringUtils.isNotBlank(catalog) && catalog.equals(trigger.getSourceCatalogName())); boolean schemaMatches = trigger.isSourceSchemaNameWildCarded() || (schema == null && trigger.getSourceSchemaName() == null) - || (StringUtils.isBlank(trigger.getSourceSchemaName()) - && StringUtils.isNotBlank(schema) && schema.equals(platform.getDefaultSchema())) - || (StringUtils.isNotBlank(schema) && schema.equals(trigger - .getSourceSchemaName())); - boolean tableMatches = trigger.isSourceTableNameWildCarded() - || table.equalsIgnoreCase(trigger.getSourceTableName()); + || (StringUtils.isBlank(trigger.getSourceSchemaName()) && StringUtils.isNotBlank(schema) && schema.equals(platform + .getDefaultSchema())) || (StringUtils.isNotBlank(schema) && schema.equals(trigger.getSourceSchemaName())); + boolean tableMatches = trigger.isSourceTableNameWildCarded() || table.equalsIgnoreCase(trigger.getSourceTableName()); if (catalogMatches && schemaMatches && tableMatches) { matches.add(trigger); } @@ -286,8 +268,7 @@ public Collection findMatchingTriggers(List triggers, String c } public void inactivateTriggerHistory(TriggerHistory history) { - sqlTemplate.update(getSql("inactivateTriggerHistorySql"), - new Object[] { history.getErrorMessage(), history.getTriggerHistoryId() }, + sqlTemplate.update(getSql("inactivateTriggerHistorySql"), new Object[] { history.getErrorMessage(), history.getTriggerHistoryId() }, new int[] { Types.VARCHAR, Types.INTEGER }); } @@ -299,10 +280,11 @@ public Map getHistoryRecords() { protected boolean isTriggerNameInUse(List activeTriggerHistories, String triggerId, String triggerName) { for (TriggerHistory triggerHistory : activeTriggerHistories) { - if (!triggerHistory.getTriggerId().equals(triggerId) && ( - (triggerHistory.getNameForDeleteTrigger() != null && triggerHistory.getNameForDeleteTrigger().equals(triggerName)) || - (triggerHistory.getNameForInsertTrigger() != null && triggerHistory.getNameForInsertTrigger().equals(triggerName)) || - (triggerHistory.getNameForUpdateTrigger() != null && triggerHistory.getNameForUpdateTrigger().equals(triggerName)))) { + if (!triggerHistory.getTriggerId().equals(triggerId) + && ((triggerHistory.getNameForDeleteTrigger() != null && triggerHistory.getNameForDeleteTrigger().equals(triggerName)) + || (triggerHistory.getNameForInsertTrigger() != null && triggerHistory.getNameForInsertTrigger().equals( + triggerName)) || (triggerHistory.getNameForUpdateTrigger() != null && triggerHistory + .getNameForUpdateTrigger().equals(triggerName)))) { return true; } } @@ -314,8 +296,7 @@ public TriggerHistory findTriggerHistory(String catalogName, String schemaName, return list.size() > 0 ? list.get(0) : null; } - public List findTriggerHistories(String catalogName, String schemaName, - String tableName) { + public List findTriggerHistories(String catalogName, String schemaName, String tableName) { List listToReturn = new ArrayList(); List triggerHistories = getActiveTriggerHistories(); if (triggerHistories != null && triggerHistories.size() > 0) { @@ -330,10 +311,10 @@ public List findTriggerHistories(String catalogName, String sche } if (matches && StringUtils.isNotBlank(tableName)) { - boolean ignoreCase = parameterService.is(ParameterConstants.DB_METADATA_IGNORE_CASE) && - !FormatUtils.isMixedCase(tableName); - matches = ignoreCase ? triggerHistory.getSourceTableName().equalsIgnoreCase(tableName) : - triggerHistory.getSourceTableName().equals(tableName); + boolean ignoreCase = parameterService.is(ParameterConstants.DB_METADATA_IGNORE_CASE) + && !FormatUtils.isMixedCase(tableName); + matches = ignoreCase ? triggerHistory.getSourceTableName().equalsIgnoreCase(tableName) : triggerHistory + .getSourceTableName().equals(tableName); } if (matches) { @@ -348,8 +329,7 @@ public List findTriggerHistories(String catalogName, String sche public TriggerHistory getTriggerHistory(int histId) { TriggerHistory history = historyMap.get(histId); if (history == null && histId >= 0) { - history = (TriggerHistory) sqlTemplate.queryForObject(getSql("triggerHistSql"), - new TriggerHistoryMapper(), histId); + history = (TriggerHistory) sqlTemplate.queryForObject(getSql("triggerHistSql"), new TriggerHistoryMapper(), histId); historyMap.put(histId, history); } return history; @@ -366,29 +346,24 @@ public List getActiveTriggerHistories(Trigger trigger) { return list; } - public TriggerHistory getNewestTriggerHistoryForTrigger(String triggerId, String catalogName, - String schemaName, String tableName) { - List triggerHistories = sqlTemplate.query(getSql("latestTriggerHistSql"), - new TriggerHistoryMapper(), triggerId, tableName); + public TriggerHistory getNewestTriggerHistoryForTrigger(String triggerId, String catalogName, String schemaName, String tableName) { + List triggerHistories = sqlTemplate.query(getSql("latestTriggerHistSql"), new TriggerHistoryMapper(), triggerId, + tableName); for (TriggerHistory triggerHistory : triggerHistories) { - if ((StringUtils.isBlank(catalogName) && StringUtils.isBlank(triggerHistory - .getSourceCatalogName())) - || (StringUtils.isNotBlank(catalogName) && catalogName.equals(triggerHistory - .getSourceCatalogName()))) { - if ((StringUtils.isBlank(schemaName) && StringUtils.isBlank(triggerHistory - .getSourceSchemaName())) - || (StringUtils.isNotBlank(schemaName) && schemaName.equals(triggerHistory - .getSourceSchemaName()))) { + if ((StringUtils.isBlank(catalogName) && StringUtils.isBlank(triggerHistory.getSourceCatalogName())) + || (StringUtils.isNotBlank(catalogName) && catalogName.equals(triggerHistory.getSourceCatalogName()))) { + if ((StringUtils.isBlank(schemaName) && StringUtils.isBlank(triggerHistory.getSourceSchemaName())) + || (StringUtils.isNotBlank(schemaName) && schemaName.equals(triggerHistory.getSourceSchemaName()))) { return triggerHistory; } } } return null; } - + @SuppressWarnings("unchecked") @Override - public List getActiveTriggerHistoriesFromCache() { + public List getActiveTriggerHistoriesFromCache() { return new ArrayList(historyMap != null ? historyMap.values() : Collections.EMPTY_LIST); } @@ -396,37 +371,32 @@ public List getActiveTriggerHistoriesFromCache() { * Get a list of trigger histories that are currently active */ public List getActiveTriggerHistories() { - List histories = sqlTemplate.query(getSql("allTriggerHistSql", "activeTriggerHistSql"), - new TriggerHistoryMapper()); + List histories = sqlTemplate.query(getSql("allTriggerHistSql", "activeTriggerHistSql"), new TriggerHistoryMapper()); for (TriggerHistory triggerHistory : histories) { historyMap.put(triggerHistory.getTriggerHistoryId(), triggerHistory); } return histories; - } + } public List getActiveTriggerHistories(String tableName) { - return sqlTemplate.query(getSql("allTriggerHistSql", "triggerHistBySourceTableWhereSql"), - new TriggerHistoryMapper(), tableName); + return sqlTemplate.query(getSql("allTriggerHistSql", "triggerHistBySourceTableWhereSql"), new TriggerHistoryMapper(), tableName); } - protected List buildTriggersForSymmetricTables(String version, - String... tablesToExclude) { + protected List buildTriggersForSymmetricTables(String version, String... tablesToExclude) { List triggers = new ArrayList(); - List tables = new ArrayList(TableConstants.getConfigTables(symmetricDialect - .getTablePrefix())); + List tables = new ArrayList(TableConstants.getConfigTables(symmetricDialect.getTablePrefix())); if (extraConfigTables != null) { for (String extraTable : extraConfigTables) { tables.add(extraTable); } } - - List definedTriggers = getTriggers(); + + List definedTriggers = getTriggers(false); for (Trigger trigger : definedTriggers) { if (tables.remove(trigger.getSourceTableName())) { - logOnce(String - .format("Not generating virtual triggers for %s because there is a user defined trigger already defined", - trigger.getSourceTableName())); + logOnce(String.format("Not generating virtual triggers for %s because there is a user defined trigger already defined", + trigger.getSourceTableName())); } } @@ -449,13 +419,11 @@ protected List buildTriggersForSymmetricTables(String version, } protected Trigger buildTriggerForSymmetricTable(String tableName) { - boolean syncChanges = !TableConstants.getTablesThatDoNotSync(tablePrefix).contains( - tableName) + boolean syncChanges = !TableConstants.getTablesThatDoNotSync(tablePrefix).contains(tableName) && parameterService.is(ParameterConstants.AUTO_SYNC_CONFIGURATION); - boolean syncOnIncoming = !configurationService.isMasterToMaster() && (parameterService.is( - ParameterConstants.AUTO_SYNC_CONFIGURATION_ON_INCOMING, true) - || tableName.equals(TableConstants.getTableName(tablePrefix, - TableConstants.SYM_TABLE_RELOAD_REQUEST))); + boolean syncOnIncoming = !configurationService.isMasterToMaster() + && (parameterService.is(ParameterConstants.AUTO_SYNC_CONFIGURATION_ON_INCOMING, true) || tableName.equals(TableConstants + .getTableName(tablePrefix, TableConstants.SYM_TABLE_RELOAD_REQUEST))); Trigger trigger = new Trigger(); trigger.setTriggerId(tableName); trigger.setSyncOnDelete(syncChanges); @@ -464,11 +432,9 @@ protected Trigger buildTriggerForSymmetricTable(String tableName) { trigger.setSyncOnIncomingBatch(syncOnIncoming); trigger.setSourceTableName(tableName); trigger.setUseCaptureOldData(false); - if (TableConstants.getTableName(tablePrefix, TableConstants.SYM_NODE_HOST) - .equals(tableName)) { + if (TableConstants.getTableName(tablePrefix, TableConstants.SYM_NODE_HOST).equals(tableName)) { trigger.setChannelId(Constants.CHANNEL_HEARTBEAT); - } else if (TableConstants.getTableName(tablePrefix, TableConstants.SYM_FILE_SNAPSHOT) - .equals(tableName)) { + } else if (TableConstants.getTableName(tablePrefix, TableConstants.SYM_FILE_SNAPSHOT).equals(tableName)) { trigger.setChannelId(Constants.CHANNEL_DYNAMIC); trigger.setChannelExpression("$(curTriggerValue).$(curColumnPrefix)" + platform.alterCaseToMatchDatabaseDefaultCase("channel_id")); trigger.setReloadChannelId(Constants.CHANNEL_FILESYNC_RELOAD); @@ -476,20 +442,18 @@ protected Trigger buildTriggerForSymmetricTable(String tableName) { trigger.setSyncOnIncomingBatch(false); boolean syncEnabled = parameterService.is(ParameterConstants.FILE_SYNC_ENABLE); trigger.setSyncOnInsert(syncEnabled); - trigger.setSyncOnUpdate(syncEnabled); // Changed to false because of issues with the traffic file + trigger.setSyncOnUpdate(syncEnabled); // Changed to false because of + // issues with the traffic + // file trigger.setSyncOnDelete(false); } else { trigger.setChannelId(Constants.CHANNEL_CONFIG); } - if (!TableConstants.getTableName(tablePrefix, TableConstants.SYM_NODE_HOST) - .equals(tableName) && - !TableConstants.getTableName(tablePrefix, TableConstants.SYM_NODE) - .equals(tableName) && - !TableConstants.getTableName(tablePrefix, TableConstants.SYM_NODE_SECURITY) - .equals(tableName) && - !TableConstants.getTableName(tablePrefix, TableConstants.SYM_TABLE_RELOAD_REQUEST) - .equals(tableName)) { + if (!TableConstants.getTableName(tablePrefix, TableConstants.SYM_NODE_HOST).equals(tableName) + && !TableConstants.getTableName(tablePrefix, TableConstants.SYM_NODE).equals(tableName) + && !TableConstants.getTableName(tablePrefix, TableConstants.SYM_NODE_SECURITY).equals(tableName) + && !TableConstants.getTableName(tablePrefix, TableConstants.SYM_TABLE_RELOAD_REQUEST).equals(tableName)) { trigger.setUseCaptureLobs(true); } // little trick to force the rebuild of SymmetricDS triggers every time @@ -498,8 +462,7 @@ protected Trigger buildTriggerForSymmetricTable(String tableName) { return trigger; } - public List buildTriggerRoutersForSymmetricTables(String version, - NodeGroupLink nodeGroupLink, String... tablesToExclude) { + public List buildTriggerRoutersForSymmetricTables(String version, NodeGroupLink nodeGroupLink, String... tablesToExclude) { int initialLoadOrder = 1; List triggers = buildTriggersForSymmetricTables(version, tablesToExclude); @@ -507,8 +470,7 @@ public List buildTriggerRoutersForSymmetricTables(String version, for (int j = 0; j < triggers.size(); j++) { Trigger trigger = triggers.get(j); - TriggerRouter triggerRouter = buildTriggerRoutersForSymmetricTables(version, trigger, - nodeGroupLink); + TriggerRouter triggerRouter = buildTriggerRoutersForSymmetricTables(version, trigger, nodeGroupLink); triggerRouter.setInitialLoadOrder(initialLoadOrder++); triggerRouters.add(triggerRouter); } @@ -519,15 +481,14 @@ public String buildSymmetricTableRouterId(String triggerId, String sourceNodeGro return replaceCharsToShortenName(String.format("%s_%s_2_%s", triggerId, sourceNodeGroupId, targetNodeGroupId)); } - protected TriggerRouter buildTriggerRoutersForSymmetricTables(String version, Trigger trigger, - NodeGroupLink nodeGroupLink) { + protected TriggerRouter buildTriggerRoutersForSymmetricTables(String version, Trigger trigger, NodeGroupLink nodeGroupLink) { TriggerRouter triggerRouter = new TriggerRouter(); triggerRouter.setTrigger(trigger); Router router = triggerRouter.getRouter(); - router.setRouterId(buildSymmetricTableRouterId(trigger.getTriggerId(), nodeGroupLink.getSourceNodeGroupId(), nodeGroupLink.getTargetNodeGroupId())); - if (TableConstants.getTableName(tablePrefix, TableConstants.SYM_FILE_SNAPSHOT).equals( - trigger.getSourceTableName())) { + router.setRouterId(buildSymmetricTableRouterId(trigger.getTriggerId(), nodeGroupLink.getSourceNodeGroupId(), + nodeGroupLink.getTargetNodeGroupId())); + if (TableConstants.getTableName(tablePrefix, TableConstants.SYM_FILE_SNAPSHOT).equals(trigger.getSourceTableName())) { router.setRouterType(FileSyncDataRouter.ROUTER_TYPE); } else { router.setRouterType(ConfigurationChangedDataRouter.ROUTER_TYPE); @@ -540,21 +501,20 @@ protected TriggerRouter buildTriggerRoutersForSymmetricTables(String version, Tr return triggerRouter; } - public Set getTriggerRouterForTableForCurrentNode(String catalogName, - String schemaName, String tableName, boolean refreshCache) { - return getTriggerRouterForTableForCurrentNode(null, catalogName, schemaName, tableName, - refreshCache); + @Override + public Set getTriggerRouterForTableForCurrentNode(boolean substituteParameters, String catalogName, String schemaName, + String tableName, boolean refreshCache) { + return getTriggerRouterForTableForCurrentNode(substituteParameters, null, catalogName, schemaName, tableName, refreshCache); } - public Set getTriggerRouterForTableForCurrentNode(NodeGroupLink link, - String catalogName, String schemaName, String tableName, boolean refreshCache) { + public Set getTriggerRouterForTableForCurrentNode(boolean substituteParameters, NodeGroupLink link, String catalogName, + String schemaName, String tableName, boolean refreshCache) { TriggerRoutersCache cache = getTriggerRoutersCacheForCurrentNode(refreshCache); Collection> triggerRouters = cache.triggerRoutersByTriggerId.values(); HashSet returnList = new HashSet(); for (List list : triggerRouters) { for (TriggerRouter triggerRouter : list) { - if (isMatch(link, triggerRouter) - && isMatch(catalogName, schemaName, tableName, triggerRouter.getTrigger())) { + if (isMatch(link, triggerRouter) && isMatch(catalogName, schemaName, tableName, triggerRouter.getTrigger())) { returnList.add(triggerRouter); } } @@ -564,33 +524,25 @@ && isMatch(catalogName, schemaName, tableName, triggerRouter.getTrigger())) { protected boolean isMatch(NodeGroupLink link, TriggerRouter router) { if (link != null && router != null && router.getRouter() != null) { - return link.getSourceNodeGroupId().equals( - router.getRouter().getNodeGroupLink().getSourceNodeGroupId()) - && link.getTargetNodeGroupId().equals( - router.getRouter().getNodeGroupLink().getTargetNodeGroupId()); + return link.getSourceNodeGroupId().equals(router.getRouter().getNodeGroupLink().getSourceNodeGroupId()) + && link.getTargetNodeGroupId().equals(router.getRouter().getNodeGroupLink().getTargetNodeGroupId()); } else { return true; } } - protected boolean isMatch(String catalogName, String schemaName, String tableName, - Trigger trigger) { + protected boolean isMatch(String catalogName, String schemaName, String tableName, Trigger trigger) { if (!StringUtils.isBlank(tableName) && !tableName.equals(trigger.getSourceTableName())) { return false; - } else if (StringUtils.isBlank(tableName) - && !StringUtils.isBlank(trigger.getSourceTableName())) { + } else if (StringUtils.isBlank(tableName) && !StringUtils.isBlank(trigger.getSourceTableName())) { return false; - } else if (!StringUtils.isBlank(catalogName) - && !catalogName.equals(trigger.getSourceCatalogName())) { + } else if (!StringUtils.isBlank(catalogName) && !catalogName.equals(trigger.getSourceCatalogName())) { return false; - } else if (StringUtils.isBlank(catalogName) - && !StringUtils.isBlank(trigger.getSourceCatalogName())) { + } else if (StringUtils.isBlank(catalogName) && !StringUtils.isBlank(trigger.getSourceCatalogName())) { return false; - } else if (!StringUtils.isBlank(schemaName) - && !schemaName.equals(trigger.getSourceSchemaName())) { + } else if (!StringUtils.isBlank(schemaName) && !schemaName.equals(trigger.getSourceSchemaName())) { return false; - } else if (StringUtils.isBlank(schemaName) - && !StringUtils.isBlank(trigger.getSourceSchemaName())) { + } else if (StringUtils.isBlank(schemaName) && !StringUtils.isBlank(trigger.getSourceSchemaName())) { return false; } else { return true; @@ -601,31 +553,26 @@ protected boolean isMatch(String catalogName, String schemaName, String tableNam * Create a list of {@link TriggerRouter} for the SymmetricDS tables that * should have triggers created for them on the current node. */ - protected List getConfigurationTablesTriggerRoutersForCurrentNode( - String sourceNodeGroupId) { + protected List getConfigurationTablesTriggerRoutersForCurrentNode(String sourceNodeGroupId) { List triggerRouters = new ArrayList(); List links = configurationService.getNodeGroupLinksFor(sourceNodeGroupId, false); for (NodeGroupLink nodeGroupLink : links) { - triggerRouters.addAll(buildTriggerRoutersForSymmetricTables(Version.version(), - nodeGroupLink)); + triggerRouters.addAll(buildTriggerRoutersForSymmetricTables(Version.version(), nodeGroupLink)); } return triggerRouters; } - protected void mergeInConfigurationTablesTriggerRoutersForCurrentNode(String sourceNodeGroupId, - List configuredInDatabase) { + protected void mergeInConfigurationTablesTriggerRoutersForCurrentNode(String sourceNodeGroupId, List configuredInDatabase) { List virtualConfigTriggers = getConfigurationTablesTriggerRoutersForCurrentNode(sourceNodeGroupId); for (TriggerRouter trigger : virtualConfigTriggers) { - if (trigger.getRouter().getNodeGroupLink().getSourceNodeGroupId() - .equalsIgnoreCase(sourceNodeGroupId) + if (trigger.getRouter().getNodeGroupLink().getSourceNodeGroupId().equalsIgnoreCase(sourceNodeGroupId) && !doesTriggerRouterExistInList(configuredInDatabase, trigger)) { configuredInDatabase.add(trigger); } } } - protected boolean doesTriggerRouterExistInList(List triggerRouters, - TriggerRouter triggerRouter) { + protected boolean doesTriggerRouterExistInList(List triggerRouters, TriggerRouter triggerRouter) { for (TriggerRouter checkMe : triggerRouters) { if (checkMe.isSame(triggerRouter)) { return true; @@ -639,9 +586,8 @@ public TriggerRouter getTriggerRouterForCurrentNode(String triggerId, String rou List triggerRouters = getTriggerRoutersForCurrentNode(refreshCache).get(triggerId); if (triggerRouters != null) { for (TriggerRouter testTriggerRouter : triggerRouters) { - if (ConfigurationChangedDataRouter.ROUTER_TYPE.equals(testTriggerRouter.getRouter().getRouterType()) || - testTriggerRouter.getRouter().getRouterId().equals(routerId) - || routerId.equals(Constants.UNKNOWN_ROUTER_ID)) { + if (ConfigurationChangedDataRouter.ROUTER_TYPE.equals(testTriggerRouter.getRouter().getRouterType()) + || testTriggerRouter.getRouter().getRouterId().equals(routerId) || routerId.equals(Constants.UNKNOWN_ROUTER_ID)) { triggerRouter = testTriggerRouter; break; } @@ -649,7 +595,8 @@ public TriggerRouter getTriggerRouterForCurrentNode(String triggerId, String rou } if (triggerRouter == null) { - log.warn("Could not find trigger router [{}:{}] in list {}", new Object[] {triggerId, routerId, triggerRouters == null ? 0 : triggerRouters.toString()}); + log.warn("Could not find trigger router [{}:{}] in list {}", new Object[] { triggerId, routerId, + triggerRouters == null ? 0 : triggerRouters.toString() }); } return triggerRouter; @@ -670,23 +617,16 @@ public List getTriggersForCurrentNode(boolean refreshCache) { return triggers; } - - protected TriggerRoutersCache getTriggerRoutersCacheForCurrentNode(boolean refreshCache) { String myNodeGroupId = parameterService.getNodeGroupId(); - long triggerRouterCacheTimeoutInMs = parameterService - .getLong(ParameterConstants.CACHE_TIMEOUT_TRIGGER_ROUTER_IN_MS); - TriggerRoutersCache cache = triggerRouterCacheByNodeGroupId == null ? null - : triggerRouterCacheByNodeGroupId.get(myNodeGroupId); - if (cache == null - || refreshCache - || System.currentTimeMillis() - this.triggerRouterPerNodeCacheTime > triggerRouterCacheTimeoutInMs) { + long triggerRouterCacheTimeoutInMs = parameterService.getLong(ParameterConstants.CACHE_TIMEOUT_TRIGGER_ROUTER_IN_MS); + TriggerRoutersCache cache = triggerRouterCacheByNodeGroupId == null ? null : triggerRouterCacheByNodeGroupId.get(myNodeGroupId); + if (cache == null || refreshCache || System.currentTimeMillis() - this.triggerRouterPerNodeCacheTime > triggerRouterCacheTimeoutInMs) { synchronized (cacheLock) { this.triggerRouterPerNodeCacheTime = System.currentTimeMillis(); Map newTriggerRouterCacheByNodeGroupId = new HashMap(); List triggerRouters = getAllTriggerRoutersForCurrentNode(myNodeGroupId); - Map> triggerRoutersByTriggerId = new HashMap>( - triggerRouters.size()); + Map> triggerRoutersByTriggerId = new HashMap>(triggerRouters.size()); Map routers = new HashMap(triggerRouters.size()); for (TriggerRouter triggerRouter : triggerRouters) { if (triggerRouter.isEnabled()) { @@ -699,21 +639,19 @@ protected TriggerRoutersCache getTriggerRoutersCacheForCurrentNode(boolean refre triggerRoutersByTriggerId.put(triggerId, list); } list.add(triggerRouter); - routers.put(triggerRouter.getRouter().getRouterId(), - triggerRouter.getRouter()); + routers.put(triggerRouter.getRouter().getRouterId(), triggerRouter.getRouter()); } } } - newTriggerRouterCacheByNodeGroupId.put(myNodeGroupId, new TriggerRoutersCache( - triggerRoutersByTriggerId, routers)); + newTriggerRouterCacheByNodeGroupId.put(myNodeGroupId, new TriggerRoutersCache(triggerRoutersByTriggerId, routers)); this.triggerRouterCacheByNodeGroupId = newTriggerRouterCacheByNodeGroupId; - cache = triggerRouterCacheByNodeGroupId == null ? null - : triggerRouterCacheByNodeGroupId.get(myNodeGroupId); + cache = triggerRouterCacheByNodeGroupId == null ? null : triggerRouterCacheByNodeGroupId.get(myNodeGroupId); } } return cache; } + /** * @see ITriggerRouterService#getActiveRouterByIdForCurrentNode(String, * boolean) @@ -726,202 +664,168 @@ public Router getActiveRouterByIdForCurrentNode(String routerId, boolean refresh * @see ITriggerRouterService#getRoutersByGroupLink(NodeGroupLink) */ public List getRoutersByGroupLink(NodeGroupLink link) { - return sqlTemplate.query( - getSql("select", "selectRoutersColumnList", "selectRouterByNodeGroupLinkWhereSql"), - new RouterMapper(configurationService.getNodeGroupLinks(false)), link.getSourceNodeGroupId(), link.getTargetNodeGroupId()); - } - - public Trigger getTriggerForCurrentNodeById(String triggerId) { - List triggers = getTriggersForCurrentNode(); - for (Trigger trigger : triggers) { - if (trigger.getTriggerId().equals(triggerId)) { - return trigger; - } - } - return null; + return sqlTemplate.query(getSql("select", "selectRoutersColumnList", "selectRouterByNodeGroupLinkWhereSql"), new RouterMapper(true, + configurationService.getNodeGroupLinks(false)), link.getSourceNodeGroupId(), link.getTargetNodeGroupId()); } - public Trigger getTriggerById(String triggerId) { - return getTriggerById(triggerId, true); + public Trigger getTriggerById(boolean substituteParameters, String triggerId) { + return getTriggerById(substituteParameters, triggerId, true); } - public Trigger getTriggerById(String triggerId, boolean refreshCache) { + public Trigger getTriggerById(final boolean substituteParameters, String triggerId, boolean refreshCache) { Trigger trigger = null; - final long triggerCacheTimeoutInMs = parameterService - .getLong(ParameterConstants.CACHE_TIMEOUT_TRIGGER_ROUTER_IN_MS); - Map cache = this.triggersCache; - if (cache == null || !cache.containsKey(triggerId) || refreshCache - || (System.currentTimeMillis() - this.triggersCacheTime) > triggerCacheTimeoutInMs) { + KeyedCache cache = triggersCache.get(substituteParameters); + if (cache == null) { synchronized (cacheLock) { - this.triggersCacheTime = System.currentTimeMillis(); - List triggers = new ArrayList(getTriggers()); - triggers.addAll(buildTriggersForSymmetricTables(Version.version())); - cache = new HashMap(triggers.size()); - for (Trigger t : triggers) { - cache.put(t.getTriggerId(), t); + if (cache == null) { + cache = new KeyedCache(parameterService.getLong(ParameterConstants.CACHE_TIMEOUT_TRIGGER_ROUTER_IN_MS), + new IRefreshCache() { + @Override + public LinkedHashMap refresh() { + List triggers = new ArrayList(getTriggers(substituteParameters)); + triggers.addAll(buildTriggersForSymmetricTables(Version.version())); + LinkedHashMap cache = new LinkedHashMap(triggers.size()); + for (Trigger t : triggers) { + cache.put(t.getTriggerId(), t); + } + return cache; + } + }); + triggersCache.put(substituteParameters, cache); } - this.triggersCache = cache; } } - trigger = cache.get(triggerId); + trigger = cache.find(triggerId, refreshCache); if (trigger == null && !refreshCache) { - trigger = getTriggerById(triggerId, true); + trigger = cache.find(triggerId, true); } return trigger; } - public Router getRouterById(String routerId) { - return getRouterById(routerId, true); + public Router getRouterById(boolean substituteParameters, String routerId) { + return getRouterById(substituteParameters, routerId, true); } - public Router getRouterById(String routerId, boolean refreshCache) { - final long routerCacheTimeoutInMs = parameterService - .getLong(ParameterConstants.CACHE_TIMEOUT_TRIGGER_ROUTER_IN_MS); - Map cache = this.routersCache; - if (cache == null || refreshCache - || System.currentTimeMillis() - this.routersCacheTime > routerCacheTimeoutInMs) { + public Router getRouterById(final boolean substituteParameters, String routerId, boolean refreshCache) { + Router router = null; + KeyedCache cache = routersCache.get(substituteParameters); + if (cache == null) { synchronized (cacheLock) { - this.routersCacheTime = System.currentTimeMillis(); - List routers = getRouters(); - cache = new HashMap(routers.size()); - for (Router router : routers) { - cache.put(router.getRouterId(), router); + if (cache == null) { + cache = new KeyedCache(parameterService.getLong(ParameterConstants.CACHE_TIMEOUT_TRIGGER_ROUTER_IN_MS), + new IRefreshCache() { + @Override + public LinkedHashMap refresh() { + List routers = getRouters(substituteParameters); + LinkedHashMap cache = new LinkedHashMap(routers.size()); + for (Router router : routers) { + cache.put(router.getRouterId(), router); + } + return cache; + } + }); + routersCache.put(substituteParameters, cache); } - this.routersCache = cache; } } - return cache.get(routerId); + router = cache.find(routerId, refreshCache); + if (router == null && !refreshCache) { + router = cache.find(routerId, true); + } + return router; } - public List getRouters() { + public List getRouters(boolean substituteParameters) { return sqlTemplate.query(getSql("select ", "selectRoutersColumnList", "selectRoutersSql"), - new RouterMapper(configurationService.getNodeGroupLinks(false))); + new RouterMapper(substituteParameters, configurationService.getNodeGroupLinks(false))); } - + private String getTriggerRouterSql(String sql) { return getSql("select ", "selectTriggerRoutersColumnList", "selectTriggerRoutersSql", sql); } - public List getTriggerRouters(boolean refreshCache) { - long triggerRouterCacheTimeoutInMs = parameterService - .getLong(ParameterConstants.CACHE_TIMEOUT_TRIGGER_ROUTER_IN_MS); - List testValue = triggerRoutersCache; - if (testValue == null - || refreshCache - || System.currentTimeMillis() - this.triggerRoutersCacheTime > triggerRouterCacheTimeoutInMs) { + public List getTriggerRouters(final boolean substituteParameters, boolean refreshCache) { + KeyedCache cache = triggerRoutersCache.get(substituteParameters); + if (cache == null) { synchronized (cacheLock) { - List newValue = enhanceTriggerRouters(sqlTemplate.query( - getTriggerRouterSql(null), new TriggerRouterMapper())); - triggerRoutersCache = newValue; - testValue = newValue; - triggerRoutersCacheTime = System.currentTimeMillis(); + if (cache == null) { + cache = new KeyedCache(parameterService.getLong(ParameterConstants.CACHE_TIMEOUT_TRIGGER_ROUTER_IN_MS), + new IRefreshCache() { + @Override + public LinkedHashMap refresh() { + List triggerRouters = enhanceTriggerRouters(substituteParameters, + sqlTemplate.query(getTriggerRouterSql(null), new TriggerRouterMapper())); + LinkedHashMap cache = new LinkedHashMap(triggerRouters.size()); + for (TriggerRouter triggerRouter : triggerRouters) { + cache.put(triggerRouter.hashCode(), triggerRouter); + } + return cache; + } + }); + triggerRoutersCache.put(substituteParameters, cache); + } } } - return testValue; + return cache.getAll(refreshCache); } public List getAllTriggerRoutersForCurrentNode(String sourceNodeGroupId) { - List triggerRouters = enhanceTriggerRouters(sqlTemplate.query( - getTriggerRouterSql("activeTriggersForSourceNodeGroupSql"), - new TriggerRouterMapper(), sourceNodeGroupId)); + List triggerRouters = enhanceTriggerRouters(true, + sqlTemplate.query(getTriggerRouterSql("activeTriggersForSourceNodeGroupSql"), new TriggerRouterMapper(), sourceNodeGroupId)); mergeInConfigurationTablesTriggerRoutersForCurrentNode(sourceNodeGroupId, triggerRouters); return triggerRouters; } - public List getAllTriggerRoutersForReloadForCurrentNode( - String sourceNodeGroupId, String targetNodeGroupId) { - return enhanceTriggerRouters(sqlTemplate.query( - getTriggerRouterSql("activeTriggersForReloadSql"), new TriggerRouterMapper(), - sourceNodeGroupId, targetNodeGroupId, Constants.CHANNEL_CONFIG)); + public List getAllTriggerRoutersForReloadForCurrentNode(String sourceNodeGroupId, String targetNodeGroupId) { + return enhanceTriggerRouters(true, sqlTemplate.query(getTriggerRouterSql("activeTriggersForReloadSql"), + new TriggerRouterMapper(), sourceNodeGroupId, targetNodeGroupId, Constants.CHANNEL_CONFIG)); } - public TriggerRouter findTriggerRouterById(String triggerId, String routerId) { - List configs = (List) sqlTemplate.query( - getTriggerRouterSql("selectTriggerRouterSql"), new TriggerRouterMapper(), - triggerId, routerId); + public TriggerRouter findTriggerRouterById(boolean substituteParameters, String triggerId, String routerId) { + List configs = (List) sqlTemplate.query(getTriggerRouterSql("selectTriggerRouterSql"), + new TriggerRouterMapper(), triggerId, routerId); if (configs.size() > 0) { TriggerRouter triggerRouter = configs.get(0); - triggerRouter.setRouter(getRouterById(triggerRouter.getRouter().getRouterId())); - triggerRouter.setTrigger(getTriggerById(triggerRouter.getTrigger().getTriggerId())); + triggerRouter.setRouter(getRouterById(substituteParameters, triggerRouter.getRouter().getRouterId())); + triggerRouter.setTrigger(getTriggerById(substituteParameters, triggerRouter.getTrigger().getTriggerId())); return triggerRouter; } else { return null; } } - - private List enhanceTriggerRouters(List triggerRouters) { - HashMap routersById = new HashMap(); - for (Router router : getRouters()) { - routersById.put(router.getRouterId().trim().toUpperCase(), router); - } - HashMap triggersById = new HashMap(); - for (Trigger trigger : getTriggers()) { - triggersById.put(trigger.getTriggerId().trim().toUpperCase(), trigger); - } - for (TriggerRouter triggerRouter : triggerRouters) { - triggerRouter.setTrigger(triggersById.get(triggerRouter.getTrigger().getTriggerId().trim().toUpperCase())); - triggerRouter.setRouter(routersById.get(triggerRouter.getRouter().getRouterId().trim().toUpperCase())); - } - return triggerRouters; - } - - public Map> getTriggerRoutersByChannel(String nodeGroupId) { - return getTriggerRoutersByChannel(nodeGroupId, false); - } - - public Map> getTriggerRoutersByChannel(String nodeGroupId, - boolean refreshCache) { - long triggerRouterCacheTimeoutInMs = parameterService - .getLong(ParameterConstants.CACHE_TIMEOUT_TRIGGER_ROUTER_IN_MS); - Map> testValue = triggerRouterCacheByChannel; - if (testValue == null - || refreshCache - || System.currentTimeMillis() - this.triggerRouterPerChannelCacheTime > triggerRouterCacheTimeoutInMs) { - synchronized (cacheLock) { - testValue = triggerRouterCacheByChannel; - if (testValue == null || refreshCache - || System.currentTimeMillis() - this.triggerRouterPerChannelCacheTime > triggerRouterCacheTimeoutInMs) { - final Map> newValue = new HashMap>(); - this.triggerRouterPerChannelCacheTime = System.currentTimeMillis(); - List triggerRouters = enhanceTriggerRouters(sqlTemplate.query( - getTriggerRouterSql("selectGroupTriggersSql"), new TriggerRouterMapper(), nodeGroupId, nodeGroupId)); - for (TriggerRouter triggerRouter : triggerRouters) { - List list = newValue.get(triggerRouter.getTrigger().getChannelId()); - if (list == null) { - list = new ArrayList(); - newValue.put(triggerRouter.getTrigger().getChannelId(), list); - } - list.add(triggerRouter); - } - triggerRouterCacheByChannel = newValue; - testValue = newValue; - } - } + + private List enhanceTriggerRouters(boolean substituteParameters, List triggerRouters) { + HashMap routersById = new HashMap(); + for (Router router : getRouters(substituteParameters)) { + routersById.put(router.getRouterId().trim().toUpperCase(), router); + } + HashMap triggersById = new HashMap(); + for (Trigger trigger : getTriggers(substituteParameters)) { + triggersById.put(trigger.getTriggerId().trim().toUpperCase(), trigger); } - return testValue; + for (TriggerRouter triggerRouter : triggerRouters) { + triggerRouter.setTrigger(triggersById.get(triggerRouter.getTrigger().getTriggerId().trim().toUpperCase())); + triggerRouter.setRouter(routersById.get(triggerRouter.getRouter().getRouterId().trim().toUpperCase())); + } + return triggerRouters; } public void insert(TriggerHistory newHistRecord) { - newHistRecord.setTriggerHistoryId((int)sequenceService.nextVal(Constants.SEQUENCE_TRIGGER_HIST)); + newHistRecord.setTriggerHistoryId((int) sequenceService.nextVal(Constants.SEQUENCE_TRIGGER_HIST)); historyMap.put(newHistRecord.getTriggerHistoryId(), newHistRecord); sqlTemplate.update( getSql("insertTriggerHistorySql"), new Object[] { newHistRecord.getTriggerHistoryId(), newHistRecord.getTriggerId(), newHistRecord.getSourceTableName(), - newHistRecord.getTableHash(), newHistRecord.getCreateTime(), - newHistRecord.getColumnNames(), newHistRecord.getPkColumnNames(), - newHistRecord.getLastTriggerBuildReason().getCode(), - newHistRecord.getNameForDeleteTrigger(), - newHistRecord.getNameForInsertTrigger(), - newHistRecord.getNameForUpdateTrigger(), - newHistRecord.getSourceSchemaName(), newHistRecord.getSourceCatalogName(), - newHistRecord.getTriggerRowHash(), newHistRecord.getTriggerTemplateHash(), - newHistRecord.getErrorMessage() }, - new int[] { Types.INTEGER, Types.VARCHAR, Types.VARCHAR, Types.BIGINT, Types.TIMESTAMP, - Types.VARCHAR, Types.VARCHAR, Types.CHAR, Types.VARCHAR, Types.VARCHAR, - Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.BIGINT, Types.BIGINT, + newHistRecord.getTableHash(), newHistRecord.getCreateTime(), newHistRecord.getColumnNames(), + newHistRecord.getPkColumnNames(), newHistRecord.getLastTriggerBuildReason().getCode(), + newHistRecord.getNameForDeleteTrigger(), newHistRecord.getNameForInsertTrigger(), + newHistRecord.getNameForUpdateTrigger(), newHistRecord.getSourceSchemaName(), newHistRecord.getSourceCatalogName(), + newHistRecord.getTriggerRowHash(), newHistRecord.getTriggerTemplateHash(), newHistRecord.getErrorMessage() }, + new int[] { Types.INTEGER, Types.VARCHAR, Types.VARCHAR, Types.BIGINT, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, + Types.CHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.BIGINT, Types.BIGINT, Types.VARCHAR }); } - + @Override public void deleteTriggerRouter(String triggerId, String routerId) { sqlTemplate.update(getSql("deleteTriggerRouterSql"), triggerId, routerId); @@ -929,8 +833,8 @@ public void deleteTriggerRouter(String triggerId, String routerId) { } public void deleteTriggerRouter(TriggerRouter triggerRouter) { - sqlTemplate.update(getSql("deleteTriggerRouterSql"), (Object) triggerRouter.getTrigger() - .getTriggerId(), triggerRouter.getRouter().getRouterId()); + sqlTemplate.update(getSql("deleteTriggerRouterSql"), (Object) triggerRouter.getTrigger().getTriggerId(), triggerRouter.getRouter() + .getRouterId()); clearCache(); } @@ -944,37 +848,27 @@ public void saveTriggerRouter(TriggerRouter triggerRouter, boolean updateTrigger saveRouter(triggerRouter.getRouter()); } triggerRouter.setLastUpdateTime(new Date()); - if (0 == sqlTemplate.update( - getSql("updateTriggerRouterSql"), - new Object[] { triggerRouter.getInitialLoadOrder(), - triggerRouter.getInitialLoadSelect(), - triggerRouter.getInitialLoadDeleteStmt(), - triggerRouter.getInitialLoadBatchCount(), - triggerRouter.isPingBackEnabled() ? 1 : 0, - triggerRouter.getLastUpdateBy(), - triggerRouter.getLastUpdateTime(), - triggerRouter.isEnabled() ? 1 : 0, - triggerRouter.getTrigger().getTriggerId(), - triggerRouter.getRouter().getRouterId() }, new int[] { Types.NUMERIC, - Types.VARCHAR, Types.VARCHAR, Types.INTEGER, Types.SMALLINT, Types.VARCHAR, - Types.TIMESTAMP, Types.SMALLINT, Types.VARCHAR, Types.VARCHAR })) { + if (0 == sqlTemplate + .update(getSql("updateTriggerRouterSql"), + new Object[] { triggerRouter.getInitialLoadOrder(), triggerRouter.getInitialLoadSelect(), + triggerRouter.getInitialLoadDeleteStmt(), triggerRouter.getInitialLoadBatchCount(), + triggerRouter.isPingBackEnabled() ? 1 : 0, triggerRouter.getLastUpdateBy(), + triggerRouter.getLastUpdateTime(), triggerRouter.isEnabled() ? 1 : 0, + triggerRouter.getTrigger().getTriggerId(), triggerRouter.getRouter().getRouterId() }, new int[] { + Types.NUMERIC, Types.VARCHAR, Types.VARCHAR, Types.INTEGER, Types.SMALLINT, Types.VARCHAR, Types.TIMESTAMP, + Types.SMALLINT, Types.VARCHAR, Types.VARCHAR })) { triggerRouter.setCreateTime(triggerRouter.getLastUpdateTime()); sqlTemplate.update( getSql("insertTriggerRouterSql"), - new Object[] { triggerRouter.getInitialLoadOrder(), - triggerRouter.getInitialLoadSelect(), - triggerRouter.getInitialLoadDeleteStmt(), - triggerRouter.getInitialLoadBatchCount(), - triggerRouter.isPingBackEnabled() ? 1 : 0, - triggerRouter.getCreateTime(), triggerRouter.getLastUpdateBy(), - triggerRouter.getLastUpdateTime(), - triggerRouter.isEnabled() ? 1 : 0, - triggerRouter.getTrigger().getTriggerId(), - triggerRouter.getRouter().getRouterId() }, new int[] { Types.NUMERIC, - Types.VARCHAR, Types.VARCHAR, Types.INTEGER, Types.SMALLINT, Types.TIMESTAMP, - Types.VARCHAR, Types.TIMESTAMP, Types.SMALLINT, Types.VARCHAR, Types.VARCHAR }); + new Object[] { triggerRouter.getInitialLoadOrder(), triggerRouter.getInitialLoadSelect(), + triggerRouter.getInitialLoadDeleteStmt(), triggerRouter.getInitialLoadBatchCount(), + triggerRouter.isPingBackEnabled() ? 1 : 0, triggerRouter.getCreateTime(), triggerRouter.getLastUpdateBy(), + triggerRouter.getLastUpdateTime(), triggerRouter.isEnabled() ? 1 : 0, triggerRouter.getTrigger().getTriggerId(), + triggerRouter.getRouter().getRouterId() }, new int[] { Types.NUMERIC, Types.VARCHAR, Types.VARCHAR, + Types.INTEGER, Types.SMALLINT, Types.TIMESTAMP, Types.VARCHAR, Types.TIMESTAMP, Types.SMALLINT, Types.VARCHAR, + Types.VARCHAR }); } - + clearCache(); } @@ -987,36 +881,24 @@ public void saveRouter(Router router) { router.nullOutBlankFields(); if (0 == sqlTemplate.update( getSql("updateRouterSql"), - new Object[] { router.getTargetCatalogName(), router.getTargetSchemaName(), - router.getTargetTableName(), - router.getNodeGroupLink().getSourceNodeGroupId(), - router.getNodeGroupLink().getTargetNodeGroupId(), router.getRouterType(), - router.getRouterExpression(), router.isSyncOnUpdate() ? 1 : 0, - router.isSyncOnInsert() ? 1 : 0, router.isSyncOnDelete() ? 1 : 0, - router.isUseSourceCatalogSchema() ? 1 : 0, - router.getLastUpdateBy(), router.getLastUpdateTime(), - router.getRouterId() }, new int[] { - Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, - Types.VARCHAR, Types.VARCHAR, Types.SMALLINT, Types.SMALLINT, - Types.SMALLINT, Types.SMALLINT, Types.VARCHAR, Types.TIMESTAMP, - Types.VARCHAR })) { + new Object[] { router.getTargetCatalogName(), router.getTargetSchemaName(), router.getTargetTableName(), + router.getNodeGroupLink().getSourceNodeGroupId(), router.getNodeGroupLink().getTargetNodeGroupId(), + router.getRouterType(), router.getRouterExpression(), router.isSyncOnUpdate() ? 1 : 0, + router.isSyncOnInsert() ? 1 : 0, router.isSyncOnDelete() ? 1 : 0, router.isUseSourceCatalogSchema() ? 1 : 0, + router.getLastUpdateBy(), router.getLastUpdateTime(), router.getRouterId() }, new int[] { Types.VARCHAR, + Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.SMALLINT, + Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR })) { router.setCreateTime(router.getLastUpdateTime()); sqlTemplate.update( getSql("insertRouterSql"), - new Object[] { router.getTargetCatalogName(), router.getTargetSchemaName(), - router.getTargetTableName(), - router.getNodeGroupLink().getSourceNodeGroupId(), - router.getNodeGroupLink().getTargetNodeGroupId(), - router.getRouterType(), router.getRouterExpression(), - router.isSyncOnUpdate() ? 1 : 0, router.isSyncOnInsert() ? 1 : 0, - router.isSyncOnDelete() ? 1 : 0, router.isUseSourceCatalogSchema() ? 1 : 0, - router.getCreateTime(), - router.getLastUpdateBy(), router.getLastUpdateTime(), router.getRouterId() }, - new int[] { - Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, - Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.SMALLINT, - Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.TIMESTAMP, Types.VARCHAR, - Types.TIMESTAMP, Types.VARCHAR }); + new Object[] { router.getTargetCatalogName(), router.getTargetSchemaName(), router.getTargetTableName(), + router.getNodeGroupLink().getSourceNodeGroupId(), router.getNodeGroupLink().getTargetNodeGroupId(), + router.getRouterType(), router.getRouterExpression(), router.isSyncOnUpdate() ? 1 : 0, + router.isSyncOnInsert() ? 1 : 0, router.isSyncOnDelete() ? 1 : 0, router.isUseSourceCatalogSchema() ? 1 : 0, + router.getCreateTime(), router.getLastUpdateBy(), router.getLastUpdateTime(), router.getRouterId() }, new int[] { + Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, + Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.TIMESTAMP, Types.VARCHAR, Types.TIMESTAMP, + Types.VARCHAR }); } clearCache(); } @@ -1036,54 +918,40 @@ public void saveTrigger(Trigger trigger) { trigger.nullOutBlankFields(); if (0 == sqlTemplate.update( getSql("updateTriggerSql"), - new Object[] { trigger.getSourceCatalogName(), trigger.getSourceSchemaName(), - trigger.getSourceTableName(), trigger.getChannelId(), trigger.getReloadChannelId(), - trigger.isSyncOnUpdate() ? 1 : 0, trigger.isSyncOnInsert() ? 1 : 0, - trigger.isSyncOnDelete() ? 1 : 0, trigger.isSyncOnIncomingBatch() ? 1 : 0, - trigger.isUseStreamLobs() ? 1 : 0, trigger.isUseCaptureLobs() ? 1 : 0, - trigger.isUseCaptureOldData() ? 1 : 0, trigger.isUseHandleKeyUpdates() ? 1 : 0, - trigger.getNameForUpdateTrigger(), trigger.getNameForInsertTrigger(), - trigger.getNameForDeleteTrigger(), trigger.getSyncOnUpdateCondition(), - trigger.getSyncOnInsertCondition(), trigger.getSyncOnDeleteCondition(), - trigger.getCustomOnUpdateText(), trigger.getCustomOnInsertText(), - trigger.getCustomOnDeleteText(), trigger.getTxIdExpression(), - trigger.getExcludedColumnNames(), trigger.getSyncKeyNames(), - trigger.getLastUpdateBy(), trigger.getLastUpdateTime(), - trigger.getExternalSelect(), trigger.getChannelExpression(), trigger.getTriggerId() }, new int[] { - Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.SMALLINT, - Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, - Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.VARCHAR, Types.VARCHAR, - Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, - Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, - Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR })) { + new Object[] { trigger.getSourceCatalogName(), trigger.getSourceSchemaName(), trigger.getSourceTableName(), + trigger.getChannelId(), trigger.getReloadChannelId(), trigger.isSyncOnUpdate() ? 1 : 0, + trigger.isSyncOnInsert() ? 1 : 0, trigger.isSyncOnDelete() ? 1 : 0, trigger.isSyncOnIncomingBatch() ? 1 : 0, + trigger.isUseStreamLobs() ? 1 : 0, trigger.isUseCaptureLobs() ? 1 : 0, trigger.isUseCaptureOldData() ? 1 : 0, + trigger.isUseHandleKeyUpdates() ? 1 : 0, trigger.getNameForUpdateTrigger(), trigger.getNameForInsertTrigger(), + trigger.getNameForDeleteTrigger(), trigger.getSyncOnUpdateCondition(), trigger.getSyncOnInsertCondition(), + trigger.getSyncOnDeleteCondition(), trigger.getCustomOnUpdateText(), trigger.getCustomOnInsertText(), + trigger.getCustomOnDeleteText(), trigger.getTxIdExpression(), trigger.getExcludedColumnNames(), + trigger.getSyncKeyNames(), trigger.getLastUpdateBy(), trigger.getLastUpdateTime(), trigger.getExternalSelect(), + trigger.getChannelExpression(), trigger.getTriggerId() }, new int[] { Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, + Types.VARCHAR, Types.VARCHAR, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, + Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, + Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, + Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR })) { trigger.setCreateTime(trigger.getLastUpdateTime()); sqlTemplate.update( getSql("insertTriggerSql"), - new Object[] { trigger.getSourceCatalogName(), trigger.getSourceSchemaName(), - trigger.getSourceTableName(), trigger.getChannelId(), trigger.getReloadChannelId(), - trigger.isSyncOnUpdate() ? 1 : 0, trigger.isSyncOnInsert() ? 1 : 0, - trigger.isSyncOnDelete() ? 1 : 0, trigger.isSyncOnIncomingBatch() ? 1 : 0, - trigger.isUseStreamLobs() ? 1 : 0, trigger.isUseCaptureLobs() ? 1 : 0, - trigger.isUseCaptureOldData() ? 1 : 0, trigger.isUseHandleKeyUpdates() ? 1:0, - trigger.getNameForUpdateTrigger(), trigger.getNameForInsertTrigger(), - trigger.getNameForDeleteTrigger(), trigger.getSyncOnUpdateCondition(), - trigger.getSyncOnInsertCondition(), trigger.getSyncOnDeleteCondition(), - trigger.getCustomOnUpdateText(), trigger.getCustomOnInsertText(), - trigger.getCustomOnDeleteText(), trigger.getTxIdExpression(), - trigger.getExcludedColumnNames(), trigger.getSyncKeyNames(), - trigger.getCreateTime(), trigger.getLastUpdateBy(), - trigger.getLastUpdateTime(), trigger.getExternalSelect(), trigger.getChannelExpression(), - trigger.getTriggerId() }, new int[] { - Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, - Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, - Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, - Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, - Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, - Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, - Types.TIMESTAMP, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, + new Object[] { trigger.getSourceCatalogName(), trigger.getSourceSchemaName(), trigger.getSourceTableName(), + trigger.getChannelId(), trigger.getReloadChannelId(), trigger.isSyncOnUpdate() ? 1 : 0, + trigger.isSyncOnInsert() ? 1 : 0, trigger.isSyncOnDelete() ? 1 : 0, trigger.isSyncOnIncomingBatch() ? 1 : 0, + trigger.isUseStreamLobs() ? 1 : 0, trigger.isUseCaptureLobs() ? 1 : 0, trigger.isUseCaptureOldData() ? 1 : 0, + trigger.isUseHandleKeyUpdates() ? 1 : 0, trigger.getNameForUpdateTrigger(), trigger.getNameForInsertTrigger(), + trigger.getNameForDeleteTrigger(), trigger.getSyncOnUpdateCondition(), trigger.getSyncOnInsertCondition(), + trigger.getSyncOnDeleteCondition(), trigger.getCustomOnUpdateText(), trigger.getCustomOnInsertText(), + trigger.getCustomOnDeleteText(), trigger.getTxIdExpression(), trigger.getExcludedColumnNames(), + trigger.getSyncKeyNames(), trigger.getCreateTime(), trigger.getLastUpdateBy(), trigger.getLastUpdateTime(), + trigger.getExternalSelect(), trigger.getChannelExpression(), trigger.getTriggerId() }, new int[] { Types.VARCHAR, + Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, + Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.VARCHAR, Types.VARCHAR, + Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, + Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR }); } - + clearCache(); } @@ -1101,8 +969,7 @@ public void syncTriggers(StringBuilder sqlBuffer, boolean force) { if (clusterService.lock(ClusterConstants.SYNCTRIGGERS)) { try { String additionalMessage = ""; - if (isCalledFromSymmetricAdminTool() - && !parameterService.is(ParameterConstants.AUTO_SYNC_TRIGGERS)) { + if (isCalledFromSymmetricAdminTool() && !parameterService.is(ParameterConstants.AUTO_SYNC_TRIGGERS)) { additionalMessage = " " + ParameterConstants.AUTO_SYNC_TRIGGERS + " is set to false, but the sync triggers process will run so that needed changes can be written to a file so they can be applied manually. Once all of the triggers have been successfully applied this process should not show triggers being created"; @@ -1118,20 +985,16 @@ public void syncTriggers(StringBuilder sqlBuffer, boolean force) { // make sure channels are read from the database configurationService.clearCache(); - List triggersForCurrentNode = getTriggersForCurrentNode(); + List triggersForCurrentNode = getTriggersForCurrentNode(true); boolean createTriggersForTables = false; String nodeId = nodeService.findIdentityNodeId(); if (StringUtils.isNotBlank(nodeId)) { NodeSecurity nodeSecurity = nodeService.findNodeSecurity(nodeId); - if (nodeSecurity != null - && (nodeSecurity.isInitialLoadEnabled() || nodeSecurity - .getInitialLoadTime() == null)) { - createTriggersForTables = parameterService - .is(ParameterConstants.TRIGGER_CREATE_BEFORE_INITIAL_LOAD); + if (nodeSecurity != null && (nodeSecurity.isInitialLoadEnabled() || nodeSecurity.getInitialLoadTime() == null)) { + createTriggersForTables = parameterService.is(ParameterConstants.TRIGGER_CREATE_BEFORE_INITIAL_LOAD); if (!createTriggersForTables) { - log.info("Trigger creation has been disabled by " - + ParameterConstants.TRIGGER_CREATE_BEFORE_INITIAL_LOAD + log.info("Trigger creation has been disabled by " + ParameterConstants.TRIGGER_CREATE_BEFORE_INITIAL_LOAD + " because an initial load is in progress or has not yet been requested"); } } else { @@ -1145,8 +1008,7 @@ public void syncTriggers(StringBuilder sqlBuffer, boolean force) { List activeTriggerHistories = getActiveTriggerHistories(); inactivateTriggers(triggersForCurrentNode, sqlBuffer, activeTriggerHistories); - updateOrCreateDatabaseTriggers(triggersForCurrentNode, sqlBuffer, force, - true, activeTriggerHistories, true); + updateOrCreateDatabaseTriggers(triggersForCurrentNode, sqlBuffer, force, true, activeTriggerHistories, true); resetTriggerRouterCacheByNodeGroupId(); } finally { clusterService.unlock(ClusterConstants.SYNCTRIGGERS); @@ -1164,10 +1026,13 @@ public void syncTriggers(StringBuilder sqlBuffer, boolean force) { public void clearCache() { synchronized (cacheLock) { this.triggerRouterPerNodeCacheTime = 0; - this.triggerRouterPerChannelCacheTime = 0; - this.triggerRoutersCacheTime = 0; - this.routersCacheTime = 0; - this.triggersCacheTime = 0; + List> caches = new ArrayList>(); + caches.addAll(triggerRoutersCache.values()); + caches.addAll(triggersCache.values()); + caches.addAll(routersCache.values()); + for (KeyedCache keyedCache : caches) { + keyedCache.clear(); + } } } @@ -1188,8 +1053,8 @@ protected Trigger getTriggerFromList(String triggerId, List triggersTha return null; } - protected void inactivateTriggers(List triggersThatShouldBeActive, - StringBuilder sqlBuffer, List activeTriggerHistories) { + protected void inactivateTriggers(List triggersThatShouldBeActive, StringBuilder sqlBuffer, + List activeTriggerHistories) { boolean ignoreCase = this.parameterService.is(ParameterConstants.DB_METADATA_IGNORE_CASE); Map> tablesByTriggerId = new HashMap>(); for (TriggerHistory history : activeTriggerHistories) { @@ -1208,17 +1073,13 @@ protected void inactivateTriggers(List triggersThatShouldBeActive, for (Table table : tables) { boolean matchesCatalog = isEqual( - trigger.isSourceCatalogNameWildCarded() ? table.getCatalog() - : trigger.getSourceCatalogName(), + trigger.isSourceCatalogNameWildCarded() ? table.getCatalog() : trigger.getSourceCatalogName(), history.getSourceCatalogName(), ignoreCase); boolean matchesSchema = isEqual( - trigger.isSourceSchemaNameWildCarded() ? table.getSchema() - : trigger.getSourceSchemaName(), history.getSourceSchemaName(), - ignoreCase); - boolean matchesTable = isEqual( - trigger.isSourceTableNameWildCarded() ? table.getName() - : trigger.getSourceTableName(), history.getSourceTableName(), - ignoreCase); + trigger.isSourceSchemaNameWildCarded() ? table.getSchema() : trigger.getSourceSchemaName(), + history.getSourceSchemaName(), ignoreCase); + boolean matchesTable = isEqual(trigger.isSourceTableNameWildCarded() ? table.getName() : trigger.getSourceTableName(), + history.getSourceTableName(), ignoreCase); foundTable |= matchesCatalog && matchesSchema && matchesTable; } @@ -1228,8 +1089,7 @@ protected void inactivateTriggers(List triggersThatShouldBeActive, } if (removeTrigger) { - log.info("About to remove triggers for inactivated table: {}", - history.getFullyQualifiedSourceTableName()); + log.info("About to remove triggers for inactivated table: {}", history.getFullyQualifiedSourceTableName()); dropTriggers(history, sqlBuffer); } } @@ -1250,21 +1110,18 @@ public void dropTriggers(TriggerHistory history) { protected void dropTriggers(TriggerHistory history, StringBuilder sqlBuffer) { if (StringUtils.isNotBlank(history.getNameForInsertTrigger())) { - symmetricDialect.removeTrigger(sqlBuffer, history.getSourceCatalogName(), - history.getSourceSchemaName(), history.getNameForInsertTrigger(), - history.getSourceTableName()); + symmetricDialect.removeTrigger(sqlBuffer, history.getSourceCatalogName(), history.getSourceSchemaName(), + history.getNameForInsertTrigger(), history.getSourceTableName()); } if (StringUtils.isNotBlank(history.getNameForDeleteTrigger())) { - symmetricDialect.removeTrigger(sqlBuffer, history.getSourceCatalogName(), - history.getSourceSchemaName(), history.getNameForDeleteTrigger(), - history.getSourceTableName()); + symmetricDialect.removeTrigger(sqlBuffer, history.getSourceCatalogName(), history.getSourceSchemaName(), + history.getNameForDeleteTrigger(), history.getSourceTableName()); } if (StringUtils.isNotBlank(history.getNameForUpdateTrigger())) { - symmetricDialect.removeTrigger(sqlBuffer, history.getSourceCatalogName(), - history.getSourceSchemaName(), history.getNameForUpdateTrigger(), - history.getSourceTableName()); + symmetricDialect.removeTrigger(sqlBuffer, history.getSourceCatalogName(), history.getSourceSchemaName(), + history.getNameForUpdateTrigger(), history.getSourceTableName()); } if (parameterService.is(ParameterConstants.AUTO_SYNC_TRIGGERS)) { @@ -1273,15 +1130,12 @@ protected void dropTriggers(TriggerHistory history, StringBuilder sqlBuffer) { } } - boolean triggerExists = symmetricDialect.doesTriggerExist(history.getSourceCatalogName(), - history.getSourceSchemaName(), history.getSourceTableName(), - history.getNameForInsertTrigger()); - triggerExists |= symmetricDialect.doesTriggerExist(history.getSourceCatalogName(), - history.getSourceSchemaName(), history.getSourceTableName(), - history.getNameForUpdateTrigger()); - triggerExists |= symmetricDialect.doesTriggerExist(history.getSourceCatalogName(), - history.getSourceSchemaName(), history.getSourceTableName(), - history.getNameForDeleteTrigger()); + boolean triggerExists = symmetricDialect.doesTriggerExist(history.getSourceCatalogName(), history.getSourceSchemaName(), + history.getSourceTableName(), history.getNameForInsertTrigger()); + triggerExists |= symmetricDialect.doesTriggerExist(history.getSourceCatalogName(), history.getSourceSchemaName(), + history.getSourceTableName(), history.getNameForUpdateTrigger()); + triggerExists |= symmetricDialect.doesTriggerExist(history.getSourceCatalogName(), history.getSourceSchemaName(), + history.getSourceTableName(), history.getNameForDeleteTrigger()); if (triggerExists) { log.warn( "There are triggers that have been marked as inactive. Please remove triggers represented by trigger_id={} and trigger_hist_id={}", @@ -1301,16 +1155,10 @@ protected List toList(Collection> source) { return list; } - protected List getTriggersForCurrentNode() { - return new TriggerSelector(toList(getTriggerRoutersForCurrentNode(false).values())) - .select(); - } - protected Set getTablesForTrigger(Trigger trigger, List triggers, boolean useTableCache) { Set
tables = new HashSet
(); try { - boolean ignoreCase = this.parameterService - .is(ParameterConstants.DB_METADATA_IGNORE_CASE); + boolean ignoreCase = this.parameterService.is(ParameterConstants.DB_METADATA_IGNORE_CASE); List catalogNames = new ArrayList(); if (trigger.isSourceCatalogNameWildCarded()) { @@ -1325,12 +1173,12 @@ protected Set
getTablesForTrigger(Trigger trigger, List triggers } } else { if (isBlank(trigger.getSourceCatalogName())) { - catalogNames.add(platform.getDefaultCatalog()); + catalogNames.add(platform.getDefaultCatalog()); } else { - catalogNames.add(trigger.getSourceCatalogName()); + catalogNames.add(trigger.getSourceCatalogName()); } } - + for (String catalogName : catalogNames) { List schemaNames = new ArrayList(); if (trigger.isSourceSchemaNameWildCarded()) { @@ -1338,7 +1186,7 @@ protected Set
getTablesForTrigger(Trigger trigger, List triggers for (String schemaName : all) { if (trigger.matchesSchemaName(schemaName, ignoreCase)) { schemaNames.add(schemaName); - } + } } if (schemaNames.size() == 0) { schemaNames.add(null); @@ -1353,49 +1201,43 @@ protected Set
getTablesForTrigger(Trigger trigger, List triggers for (String schemaName : schemaNames) { if (trigger.isSourceTableNameWildCarded()) { - Database database = symmetricDialect.getPlatform().readDatabase( - catalogName, schemaName, - new String[] { "TABLE" }); + Database database = symmetricDialect.getPlatform().readDatabase(catalogName, schemaName, new String[] { "TABLE" }); Table[] tableArray = database.getTables(); for (Table table : tableArray) { - if (trigger.matches(table, catalogName, - schemaName, ignoreCase) - && !containsExactMatchForSourceTableName(table, triggers, - ignoreCase) + if (trigger.matches(table, catalogName, schemaName, ignoreCase) + && !containsExactMatchForSourceTableName(table, triggers, ignoreCase) && !table.getName().toLowerCase().startsWith(tablePrefix)) { tables.add(table); } } } else { - Table table = symmetricDialect.getPlatform().getTableFromCache( - catalogName, schemaName, - trigger.getSourceTableName(), !useTableCache); + Table table = symmetricDialect.getPlatform().getTableFromCache(catalogName, schemaName, trigger.getSourceTableName(), + !useTableCache); if (table != null) { tables.add(table); } - } - } - } - + } + } + } + } catch (Exception ex) { log.error(String.format("Failed to retrieve tables for trigger with id of %s", trigger.getTriggerId()), ex); } return tables; } - private boolean containsExactMatchForSourceTableName(Table table, List triggers, - boolean ignoreCase) { + private boolean containsExactMatchForSourceTableName(Table table, List triggers, boolean ignoreCase) { for (Trigger trigger : triggers) { - String sourceCatalogName = trigger.getSourceCatalogName() != null ? trigger.getSourceCatalogName() : platform.getDefaultCatalog(); - String sourceSchemaName = trigger.getSourceSchemaName() != null ? trigger.getSourceSchemaName() : platform.getDefaultSchema(); - if (trigger.getSourceTableName().equals(table.getName()) - && (sourceCatalogName == null || sourceCatalogName.equals(table.getCatalog())) && - (sourceSchemaName == null || sourceSchemaName.equals(table.getSchema()))) { + String sourceCatalogName = trigger.getSourceCatalogName() != null ? trigger.getSourceCatalogName() : platform.getDefaultCatalog(); + String sourceSchemaName = trigger.getSourceSchemaName() != null ? trigger.getSourceSchemaName() : platform.getDefaultSchema(); + if (trigger.getSourceTableName().equals(table.getName()) + && (sourceCatalogName == null || sourceCatalogName.equals(table.getCatalog())) + && (sourceSchemaName == null || sourceSchemaName.equals(table.getSchema()))) { return true; } else if (ignoreCase && trigger.getSourceTableName().equalsIgnoreCase(table.getName()) - && (sourceCatalogName == null || sourceCatalogName.equalsIgnoreCase(table.getCatalog())) - && (sourceSchemaName == null || sourceSchemaName.equalsIgnoreCase(table.getSchema()))) { + && (sourceCatalogName == null || sourceCatalogName.equalsIgnoreCase(table.getCatalog())) + && (sourceSchemaName == null || sourceSchemaName.equalsIgnoreCase(table.getSchema()))) { return true; } } @@ -1407,13 +1249,11 @@ public void syncTriggers(Table table, boolean force) { /* Re-lookup just in case the table was just altered */ platform.resetCachedTableModel(); - table = platform.getTableFromCache(table.getCatalog(), table.getSchema(), table.getName(), - true); - List triggersForCurrentNode = getTriggersForCurrentNode(); + table = platform.getTableFromCache(table.getCatalog(), table.getSchema(), table.getName(), true); + List triggersForCurrentNode = getTriggersForCurrentNode(true); List activeTriggerHistories = getActiveTriggerHistories(); for (Trigger trigger : triggersForCurrentNode) { - if (trigger.matches(table, platform.getDefaultCatalog(), platform.getDefaultSchema(), - ignoreCase)) { + if (trigger.matches(table, platform.getDefaultCatalog(), platform.getDefaultSchema(), ignoreCase)) { log.info("Synchronizing triggers for {}", table.getFullyQualifiedTableName()); updateOrCreateDatabaseTriggers(trigger, table, null, force, true, activeTriggerHistories); log.info("Done synchronizing triggers for {}", table.getFullyQualifiedTableName()); @@ -1421,15 +1261,15 @@ public void syncTriggers(Table table, boolean force) { } } - protected void updateOrCreateDatabaseTriggers(List triggers, StringBuilder sqlBuffer, - boolean force, boolean verifyInDatabase, List activeTriggerHistories, boolean useTableCache) { + protected void updateOrCreateDatabaseTriggers(List triggers, StringBuilder sqlBuffer, boolean force, boolean verifyInDatabase, + List activeTriggerHistories, boolean useTableCache) { for (Trigger trigger : triggers) { updateOrCreateDatabaseTrigger(trigger, triggers, sqlBuffer, force, verifyInDatabase, activeTriggerHistories, useTableCache); } } - protected void updateOrCreateDatabaseTrigger(Trigger trigger, List triggers, - StringBuilder sqlBuffer, boolean force, boolean verifyInDatabase, List activeTriggerHistories, boolean useTableCache) { + protected void updateOrCreateDatabaseTrigger(Trigger trigger, List triggers, StringBuilder sqlBuffer, boolean force, + boolean verifyInDatabase, List activeTriggerHistories, boolean useTableCache) { Set
tables = getTablesForTrigger(trigger, triggers, useTableCache); if (tables != null && tables.size() > 0) { @@ -1437,8 +1277,7 @@ protected void updateOrCreateDatabaseTrigger(Trigger trigger, List trig updateOrCreateDatabaseTriggers(trigger, table, sqlBuffer, force, verifyInDatabase, activeTriggerHistories); } } else { - log.warn( - "Could not find any database tables matching '{}' in the datasource that is configured", + log.warn("Could not find any database tables matching '{}' in the datasource that is configured", trigger.qualifiedSourceTableName()); for (ITriggerCreationListener l : extensionService.getExtensionPointList(ITriggerCreationListener.class)) { @@ -1446,17 +1285,18 @@ protected void updateOrCreateDatabaseTrigger(Trigger trigger, List trig } } } - - public void syncTrigger(Trigger trigger, ITriggerCreationListener listener, boolean force) { - syncTrigger(trigger, listener, force, true); + + public void syncTrigger(String triggerId, ITriggerCreationListener listener, boolean force) { + syncTrigger(triggerId, listener, force, true); } - public void syncTrigger(Trigger trigger, ITriggerCreationListener listener, boolean force, boolean verifyInDatabase) { + public void syncTrigger(String triggerId, ITriggerCreationListener listener, boolean force, boolean verifyInDatabase) { + Trigger trigger = getTriggerById(true, triggerId); StringBuilder sqlBuffer = new StringBuilder(); clearCache(); List triggersForCurrentNode = null; if (verifyInDatabase) { - triggersForCurrentNode = getTriggersForCurrentNode(); + triggersForCurrentNode = getTriggersForCurrentNode(true); } else { triggersForCurrentNode = new ArrayList(); triggersForCurrentNode.add(trigger); @@ -1467,18 +1307,17 @@ public void syncTrigger(Trigger trigger, ITriggerCreationListener listener, bool } List allHistories = getActiveTriggerHistories(); - if (triggersForCurrentNode.contains(trigger)) { + if (triggersForCurrentNode.contains(triggerId)) { if (!trigger.isSourceTableNameWildCarded()) { - for (TriggerHistory triggerHistory : getActiveTriggerHistories(trigger)) { + for (TriggerHistory triggerHistory : getActiveTriggerHistories(triggerId)) { if (!triggerHistory.getFullyQualifiedSourceTableName().equals(trigger.getFullyQualifiedSourceTableName())) { dropTriggers(triggerHistory, sqlBuffer); } } } - updateOrCreateDatabaseTrigger(trigger, triggersForCurrentNode, sqlBuffer, - force, verifyInDatabase, allHistories, false); - } else { - for (TriggerHistory triggerHistory : getActiveTriggerHistories(trigger)) { + updateOrCreateDatabaseTrigger(trigger, triggersForCurrentNode, sqlBuffer, force, verifyInDatabase, allHistories, false); + } else { + for (TriggerHistory triggerHistory : getActiveTriggerHistories(triggerId)) { dropTriggers(triggerHistory, sqlBuffer); } } @@ -1489,20 +1328,19 @@ public void syncTrigger(Trigger trigger, ITriggerCreationListener listener, bool } } - protected void updateOrCreateDatabaseTriggers(Trigger trigger, Table table, - StringBuilder sqlBuffer, boolean force, boolean verifyInDatabase, List activeTriggerHistories) { + protected void updateOrCreateDatabaseTriggers(Trigger trigger, Table table, StringBuilder sqlBuffer, boolean force, + boolean verifyInDatabase, List activeTriggerHistories) { TriggerHistory newestHistory = null; TriggerReBuildReason reason = TriggerReBuildReason.NEW_TRIGGERS; String errorMessage = null; - + if (verifyInDatabase) { Channel channel = configurationService.getChannel(trigger.getChannelId()); if (channel == null) { errorMessage = String .format("Trigger %s had an unrecognized channel_id of '%s'. Please check to make sure the channel exists. Creating trigger on the '%s' channel", - trigger.getTriggerId(), trigger.getChannelId(), - Constants.CHANNEL_DEFAULT); + trigger.getTriggerId(), trigger.getChannelId(), Constants.CHANNEL_DEFAULT); log.error(errorMessage); trigger.setChannelId(Constants.CHANNEL_DEFAULT); } @@ -1519,12 +1357,10 @@ protected void updateOrCreateDatabaseTriggers(Trigger trigger, Table table, table = platform.makeAllColumnsPrimaryKeys(table); } - TriggerHistory latestHistoryBeforeRebuild = getNewestTriggerHistoryForTrigger( - trigger.getTriggerId(), + TriggerHistory latestHistoryBeforeRebuild = getNewestTriggerHistoryForTrigger(trigger.getTriggerId(), trigger.isSourceCatalogNameWildCarded() ? table.getCatalog() : trigger.getSourceCatalogName(), trigger.isSourceSchemaNameWildCarded() ? table.getSchema() : trigger.getSourceSchemaName(), - trigger.isSourceTableNameWildCarded() ? table.getName() : trigger - .getSourceTableName()); + trigger.isSourceTableNameWildCarded() ? table.getName() : trigger.getSourceTableName()); boolean forceRebuildOfTriggers = false; if (latestHistoryBeforeRebuild == null) { @@ -1535,13 +1371,11 @@ protected void updateOrCreateDatabaseTriggers(Trigger trigger, Table table, reason = TriggerReBuildReason.TABLE_SCHEMA_CHANGED; forceRebuildOfTriggers = true; - } else if (trigger.hasChangedSinceLastTriggerBuild(latestHistoryBeforeRebuild - .getCreateTime()) + } else if (trigger.hasChangedSinceLastTriggerBuild(latestHistoryBeforeRebuild.getCreateTime()) || trigger.toHashedValue() != latestHistoryBeforeRebuild.getTriggerRowHash()) { reason = TriggerReBuildReason.TABLE_SYNC_CONFIGURATION_CHANGED; forceRebuildOfTriggers = true; - } else if (symmetricDialect.getTriggerTemplate().toHashedValue() != - latestHistoryBeforeRebuild.getTriggerTemplateHash()) { + } else if (symmetricDialect.getTriggerTemplate().toHashedValue() != latestHistoryBeforeRebuild.getTriggerTemplateHash()) { reason = TriggerReBuildReason.TRIGGER_TEMPLATE_CHANGED; forceRebuildOfTriggers = true; } else if (force) { @@ -1549,20 +1383,16 @@ protected void updateOrCreateDatabaseTriggers(Trigger trigger, Table table, forceRebuildOfTriggers = true; } - boolean supportsTriggers = symmetricDialect.getPlatform().getDatabaseInfo() - .isTriggersSupported(); + boolean supportsTriggers = symmetricDialect.getPlatform().getDatabaseInfo().isTriggersSupported(); - newestHistory = rebuildTriggerIfNecessary(sqlBuffer, forceRebuildOfTriggers, trigger, - DataEventType.INSERT, reason, latestHistoryBeforeRebuild, null, - trigger.isSyncOnInsert() && supportsTriggers, table, activeTriggerHistories); + newestHistory = rebuildTriggerIfNecessary(sqlBuffer, forceRebuildOfTriggers, trigger, DataEventType.INSERT, reason, + latestHistoryBeforeRebuild, null, trigger.isSyncOnInsert() && supportsTriggers, table, activeTriggerHistories); - newestHistory = rebuildTriggerIfNecessary(sqlBuffer, forceRebuildOfTriggers, trigger, - DataEventType.UPDATE, reason, latestHistoryBeforeRebuild, newestHistory, - trigger.isSyncOnUpdate() && supportsTriggers, table, activeTriggerHistories); + newestHistory = rebuildTriggerIfNecessary(sqlBuffer, forceRebuildOfTriggers, trigger, DataEventType.UPDATE, reason, + latestHistoryBeforeRebuild, newestHistory, trigger.isSyncOnUpdate() && supportsTriggers, table, activeTriggerHistories); - newestHistory = rebuildTriggerIfNecessary(sqlBuffer, forceRebuildOfTriggers, trigger, - DataEventType.DELETE, reason, latestHistoryBeforeRebuild, newestHistory, - trigger.isSyncOnDelete() && supportsTriggers, table, activeTriggerHistories); + newestHistory = rebuildTriggerIfNecessary(sqlBuffer, forceRebuildOfTriggers, trigger, DataEventType.DELETE, reason, + latestHistoryBeforeRebuild, newestHistory, trigger.isSyncOnDelete() && supportsTriggers, table, activeTriggerHistories); if (latestHistoryBeforeRebuild != null && newestHistory != null) { inactivateTriggerHistory(latestHistoryBeforeRebuild); @@ -1579,22 +1409,17 @@ protected void updateOrCreateDatabaseTriggers(Trigger trigger, Table table, } } catch (Exception ex) { - log.error( - String.format("Failed to create triggers for %s", - trigger.qualifiedSourceTableName()), ex); + log.error(String.format("Failed to create triggers for %s", trigger.qualifiedSourceTableName()), ex); if (newestHistory != null) { // Make sure all the triggers are removed from the // table - symmetricDialect.removeTrigger(null, trigger.getSourceCatalogName(), - trigger.getSourceSchemaName(), newestHistory.getNameForInsertTrigger(), - trigger.getSourceTableName()); - symmetricDialect.removeTrigger(null, trigger.getSourceCatalogName(), - trigger.getSourceSchemaName(), newestHistory.getNameForUpdateTrigger(), - trigger.getSourceTableName()); - symmetricDialect.removeTrigger(null, trigger.getSourceCatalogName(), - trigger.getSourceSchemaName(), newestHistory.getNameForDeleteTrigger(), - trigger.getSourceTableName()); + symmetricDialect.removeTrigger(null, trigger.getSourceCatalogName(), trigger.getSourceSchemaName(), + newestHistory.getNameForInsertTrigger(), trigger.getSourceTableName()); + symmetricDialect.removeTrigger(null, trigger.getSourceCatalogName(), trigger.getSourceSchemaName(), + newestHistory.getNameForUpdateTrigger(), trigger.getSourceTableName()); + symmetricDialect.removeTrigger(null, trigger.getSourceCatalogName(), trigger.getSourceSchemaName(), + newestHistory.getNameForDeleteTrigger(), trigger.getSourceTableName()); } for (ITriggerCreationListener l : extensionService.getExtensionPointList(ITriggerCreationListener.class)) { @@ -1603,10 +1428,9 @@ protected void updateOrCreateDatabaseTriggers(Trigger trigger, Table table, } } - protected TriggerHistory rebuildTriggerIfNecessary(StringBuilder sqlBuffer, - boolean forceRebuild, Trigger trigger, DataEventType dmlType, - TriggerReBuildReason reason, TriggerHistory oldhist, TriggerHistory hist, - boolean triggerIsActive, Table table, List activeTriggerHistories) { + protected TriggerHistory rebuildTriggerIfNecessary(StringBuilder sqlBuffer, boolean forceRebuild, Trigger trigger, DataEventType dmlType, + TriggerReBuildReason reason, TriggerHistory oldhist, TriggerHistory hist, boolean triggerIsActive, Table table, + List activeTriggerHistories) { boolean triggerExists = false; boolean triggerRemoved = false; @@ -1615,18 +1439,18 @@ protected TriggerHistory rebuildTriggerIfNecessary(StringBuilder sqlBuffer, int maxTriggerNameLength = symmetricDialect.getMaxTriggerNameLength(); if (trigger.isSyncOnInsert()) { - newTriggerHist.setNameForInsertTrigger(getTriggerName(DataEventType.INSERT, - maxTriggerNameLength, trigger, table, activeTriggerHistories).toUpperCase()); + newTriggerHist.setNameForInsertTrigger(getTriggerName(DataEventType.INSERT, maxTriggerNameLength, trigger, table, + activeTriggerHistories).toUpperCase()); } if (trigger.isSyncOnUpdate()) { - newTriggerHist.setNameForUpdateTrigger(getTriggerName(DataEventType.UPDATE, - maxTriggerNameLength, trigger, table, activeTriggerHistories).toUpperCase()); + newTriggerHist.setNameForUpdateTrigger(getTriggerName(DataEventType.UPDATE, maxTriggerNameLength, trigger, table, + activeTriggerHistories).toUpperCase()); } if (trigger.isSyncOnDelete()) { - newTriggerHist.setNameForDeleteTrigger(getTriggerName(DataEventType.DELETE, - maxTriggerNameLength, trigger, table, activeTriggerHistories).toUpperCase()); + newTriggerHist.setNameForDeleteTrigger(getTriggerName(DataEventType.DELETE, maxTriggerNameLength, trigger, table, + activeTriggerHistories).toUpperCase()); } String oldTriggerName = null; @@ -1636,8 +1460,7 @@ protected TriggerHistory rebuildTriggerIfNecessary(StringBuilder sqlBuffer, oldTriggerName = oldhist.getTriggerNameForDmlType(dmlType); oldSourceSchema = oldhist.getSourceSchemaName(); oldCatalogName = oldhist.getSourceCatalogName(); - triggerExists = symmetricDialect.doesTriggerExist(oldCatalogName, oldSourceSchema, - oldhist.getSourceTableName(), oldTriggerName); + triggerExists = symmetricDialect.doesTriggerExist(oldCatalogName, oldSourceSchema, oldhist.getSourceTableName(), oldTriggerName); } else { // We had no trigger_hist row, lets validate that the trigger as // defined in the trigger row data does not exist as well. @@ -1645,8 +1468,7 @@ protected TriggerHistory rebuildTriggerIfNecessary(StringBuilder sqlBuffer, oldSourceSchema = table.getSchema(); oldCatalogName = table.getCatalog(); if (StringUtils.isNotBlank(oldTriggerName)) { - triggerExists = symmetricDialect.doesTriggerExist(oldCatalogName, oldSourceSchema, - table.getName(), oldTriggerName); + triggerExists = symmetricDialect.doesTriggerExist(oldCatalogName, oldSourceSchema, table.getName(), oldTriggerName); } } @@ -1655,33 +1477,26 @@ protected TriggerHistory rebuildTriggerIfNecessary(StringBuilder sqlBuffer, } if ((forceRebuild || !triggerIsActive) && triggerExists) { - symmetricDialect.removeTrigger(sqlBuffer, oldCatalogName, oldSourceSchema, - oldTriggerName, trigger.isSourceTableNameWildCarded() ? table.getName() - : trigger.getSourceTableName()); + symmetricDialect.removeTrigger(sqlBuffer, oldCatalogName, oldSourceSchema, oldTriggerName, + trigger.isSourceTableNameWildCarded() ? table.getName() : trigger.getSourceTableName()); triggerExists = false; triggerRemoved = true; } - boolean isDeadTrigger = !trigger.isSyncOnInsert() && !trigger.isSyncOnUpdate() - && !trigger.isSyncOnDelete(); + boolean isDeadTrigger = !trigger.isSyncOnInsert() && !trigger.isSyncOnUpdate() && !trigger.isSyncOnDelete(); - if (hist == null - && (oldhist == null || (!triggerExists && triggerIsActive) || (isDeadTrigger && forceRebuild))) { + if (hist == null && (oldhist == null || (!triggerExists && triggerIsActive) || (isDeadTrigger && forceRebuild))) { insert(newTriggerHist); - hist = getNewestTriggerHistoryForTrigger( - trigger.getTriggerId(), - trigger.isSourceCatalogNameWildCarded() ? table.getCatalog() : trigger.getSourceCatalogName(), + hist = getNewestTriggerHistoryForTrigger(trigger.getTriggerId(), trigger.isSourceCatalogNameWildCarded() ? table.getCatalog() + : trigger.getSourceCatalogName(), trigger.isSourceSchemaNameWildCarded() ? table.getSchema() : trigger.getSourceSchemaName(), - trigger.isSourceTableNameWildCarded() ? table.getName() : trigger - .getSourceTableName()); + trigger.isSourceTableNameWildCarded() ? table.getName() : trigger.getSourceTableName()); } try { if (!triggerExists && triggerIsActive) { - symmetricDialect - .createTrigger(sqlBuffer, dmlType, trigger, hist, - configurationService.getChannel(trigger.getChannelId()), - tablePrefix, table); + symmetricDialect.createTrigger(sqlBuffer, dmlType, trigger, hist, configurationService.getChannel(trigger.getChannelId()), + tablePrefix, table); if (triggerRemoved) { statisticManager.incrementTriggersRebuiltCount(1); } else { @@ -1693,9 +1508,7 @@ protected TriggerHistory rebuildTriggerIfNecessary(StringBuilder sqlBuffer, } catch (RuntimeException ex) { if (hist != null) { - log.info( - "Cleaning up trigger hist row of {} after failing to create the associated trigger", - hist.getTriggerHistoryId()); + log.info("Cleaning up trigger hist row of {} after failing to create the associated trigger", hist.getTriggerHistoryId()); hist.setErrorMessage(ex.getMessage()); inactivateTriggerHistory(hist); } @@ -1709,8 +1522,8 @@ protected static String replaceCharsToShortenName(String triggerName) { return triggerName.replaceAll("[^a-zA-Z0-9_]|[a|e|i|o|u|A|E|I|O|U]", ""); } - protected String getTriggerName(DataEventType dml, int maxTriggerNameLength, Trigger trigger, - Table table, List activeTriggerHistories) { + protected String getTriggerName(DataEventType dml, int maxTriggerNameLength, Trigger trigger, Table table, + List activeTriggerHistories) { String triggerName = null; switch (dml) { @@ -1740,8 +1553,7 @@ protected String getTriggerName(DataEventType dml, int maxTriggerNameLength, Tri if (trigger.isSourceTableNameWildCarded()) { triggerSuffix2 = replaceCharsToShortenName(table.getName()); } - String triggerSuffix3 = replaceCharsToShortenName("_" - + parameterService.getNodeGroupId()); + String triggerSuffix3 = replaceCharsToShortenName("_" + parameterService.getNodeGroupId()); triggerName = triggerPrefix1 + triggerSuffix1 + triggerSuffix2 + triggerSuffix3; // use the node group id as part of the trigger if we can because it // helps uniquely identify the trigger in embedded databases. In @@ -1761,8 +1573,7 @@ protected String getTriggerName(DataEventType dml, int maxTriggerNameLength, Tri triggerName = triggerName.substring(0, maxTriggerNameLength - 1); log.debug( "We just truncated the trigger name for the {} trigger id={}. You might want to consider manually providing a name for the trigger that is less than {} characters long", - new Object[] { dml.name().toLowerCase(), trigger.getTriggerId(), - maxTriggerNameLength }); + new Object[] { dml.name().toLowerCase(), trigger.getTriggerId(), maxTriggerNameLength }); } int duplicateCount = 0; @@ -1770,9 +1581,7 @@ protected String getTriggerName(DataEventType dml, int maxTriggerNameLength, Tri duplicateCount++; String duplicateSuffix = Integer.toString(duplicateCount); if (triggerName.length() + duplicateSuffix.length() > maxTriggerNameLength) { - triggerName = triggerName.substring(0, - triggerName.length() - duplicateSuffix.length()) - + duplicateSuffix; + triggerName = triggerName.substring(0, triggerName.length() - duplicateSuffix.length()) + duplicateSuffix; } else { triggerName = triggerName + duplicateSuffix; } @@ -1800,8 +1609,7 @@ public TriggerHistory mapRow(Row rs) { hist.setCreateTime(rs.getDateTime("create_time")); hist.setPkColumnNames(rs.getString("pk_column_names")); hist.setColumnNames(rs.getString("column_names")); - hist.setLastTriggerBuildReason(TriggerReBuildReason.fromCode(rs - .getString("last_trigger_build_reason"))); + hist.setLastTriggerBuildReason(TriggerReBuildReason.fromCode(rs.getString("last_trigger_build_reason"))); hist.setNameForDeleteTrigger(rs.getString("name_for_delete_trigger")); hist.setNameForInsertTrigger(rs.getString("name_for_insert_trigger")); hist.setNameForUpdateTrigger(rs.getString("name_for_update_trigger")); @@ -1818,31 +1626,33 @@ public TriggerHistory mapRow(Row rs) { } class RouterMapper implements ISqlRowMapper { - + List nodeGroupLinks; - public RouterMapper(List nodeGroupLinks) { + boolean substituteParameters; + + public RouterMapper(boolean substituteParameters, List nodeGroupLinks) { this.nodeGroupLinks = nodeGroupLinks; + this.substituteParameters = substituteParameters; } - + private NodeGroupLink getNodeGroupLink(String sourceNodeGroupId, String targetNodeGroupId) { for (NodeGroupLink nodeGroupLink : nodeGroupLinks) { - if (nodeGroupLink.getSourceNodeGroupId().equals(sourceNodeGroupId) && - nodeGroupLink.getTargetNodeGroupId().equals(targetNodeGroupId)) { + if (nodeGroupLink.getSourceNodeGroupId().equals(sourceNodeGroupId) + && nodeGroupLink.getTargetNodeGroupId().equals(targetNodeGroupId)) { return nodeGroupLink; - } + } } return null; } - + public Router mapRow(Row rs) { Router router = new Router(); router.setSyncOnInsert(rs.getBoolean("r_sync_on_insert")); router.setSyncOnUpdate(rs.getBoolean("r_sync_on_update")); router.setSyncOnDelete(rs.getBoolean("r_sync_on_delete")); router.setTargetCatalogName(rs.getString("target_catalog_name")); - router.setNodeGroupLink(getNodeGroupLink( - rs.getString("source_node_group_id"), rs.getString("target_node_group_id"))); + router.setNodeGroupLink(getNodeGroupLink(rs.getString("source_node_group_id"), rs.getString("target_node_group_id"))); router.setTargetSchemaName(rs.getString("target_schema_name")); router.setTargetTableName(rs.getString("target_table_name")); @@ -1856,11 +1666,26 @@ public Router mapRow(Row rs) { router.setCreateTime(rs.getDateTime("r_create_time")); router.setLastUpdateTime(rs.getDateTime("r_last_update_time")); router.setLastUpdateBy(rs.getString("r_last_update_by")); + + if (substituteParameters) { + TypedProperties parameters = parameterService.getAllParameters(); + router.setTargetCatalogName(FormatUtils.replaceTokens(router.getTargetCatalogName(), parameters, true)); + router.setTargetSchemaName(FormatUtils.replaceTokens(router.getTargetSchemaName(), parameters, true)); + router.setTargetTableName(FormatUtils.replaceTokens(router.getTargetTableName(), parameters, true)); + } + return router; } } class TriggerMapper implements ISqlRowMapper { + + boolean substituteParameters; + + public TriggerMapper(boolean substituteParameters) { + this.substituteParameters = substituteParameters; + } + public Trigger mapRow(Row rs) { Trigger trigger = new Trigger(); trigger.setTriggerId(rs.getString("trigger_id")); @@ -1922,6 +1747,13 @@ public Trigger mapRow(Row rs) { trigger.setLastUpdateBy(rs.getString("t_last_update_by")); trigger.setExcludedColumnNames(rs.getString("excluded_column_names")); trigger.setSyncKeyNames(rs.getString("sync_key_names")); + + if (substituteParameters) { + TypedProperties parameters = parameterService.getAllParameters(); + trigger.setSourceCatalogName(FormatUtils.replaceTokens(trigger.getSourceCatalogName(), parameters, true)); + trigger.setSourceSchemaName(FormatUtils.replaceTokens(trigger.getSourceSchemaName(), parameters, true)); + trigger.setSourceTableName(FormatUtils.replaceTokens(trigger.getSourceTableName(), parameters, true)); + } return trigger; } @@ -1966,32 +1798,27 @@ public void addExtraConfigTable(String table) { public Map getFailedTriggers() { return this.failureListener.getFailures(); } - + public TriggerHistory findTriggerHistoryForGenericSync() { - String triggerTableName = TableConstants.getTableName(tablePrefix, - TableConstants.SYM_NODE); - TriggerHistory history = findTriggerHistory(null, null, triggerTableName - .toUpperCase()); + String triggerTableName = TableConstants.getTableName(tablePrefix, TableConstants.SYM_NODE); + TriggerHistory history = findTriggerHistory(null, null, triggerTableName.toUpperCase()); if (history == null) { history = findTriggerHistory(null, null, triggerTableName); } return history; } - public Map> fillTriggerRoutersByHistIdAndSortHist( - String sourceNodeGroupId, String targetNodeGroupId, List triggerHistories) { + public Map> fillTriggerRoutersByHistIdAndSortHist(String sourceNodeGroupId, String targetNodeGroupId, + List triggerHistories) { - List triggerRouters = new ArrayList( - getAllTriggerRoutersForReloadForCurrentNode( - sourceNodeGroupId, targetNodeGroupId)); + List triggerRouters = new ArrayList(getAllTriggerRoutersForReloadForCurrentNode(sourceNodeGroupId, + targetNodeGroupId)); - final Map> triggerRoutersByHistoryId = new HashMap>( - triggerHistories.size()); + final Map> triggerRoutersByHistoryId = new HashMap>(triggerHistories.size()); for (TriggerHistory triggerHistory : triggerHistories) { List triggerRoutersForTriggerHistory = new ArrayList(); - triggerRoutersByHistoryId.put(triggerHistory.getTriggerHistoryId(), - triggerRoutersForTriggerHistory); + triggerRoutersByHistoryId.put(triggerHistory.getTriggerHistoryId(), triggerRoutersForTriggerHistory); String triggerId = triggerHistory.getTriggerId(); for (TriggerRouter triggerRouter : triggerRouters) { @@ -2005,8 +1832,7 @@ public Map> fillTriggerRoutersByHistIdAndSortHist( Comparator comparator = new Comparator() { public int compare(TriggerHistory o1, TriggerHistory o2) { - List triggerRoutersForTriggerHist1 = triggerRoutersByHistoryId - .get(o1.getTriggerHistoryId()); + List triggerRoutersForTriggerHist1 = triggerRoutersByHistoryId.get(o1.getTriggerHistoryId()); int intialLoadOrder1 = 0; for (TriggerRouter triggerRouter1 : triggerRoutersForTriggerHist1) { if (triggerRouter1.getInitialLoadOrder() > intialLoadOrder1) { @@ -2014,8 +1840,7 @@ public int compare(TriggerHistory o1, TriggerHistory o2) { } } - List triggerRoutersForTriggerHist2 = triggerRoutersByHistoryId - .get(o2.getTriggerHistoryId()); + List triggerRoutersForTriggerHist2 = triggerRoutersByHistoryId.get(o2.getTriggerHistoryId()); int intialLoadOrder2 = 0; for (TriggerRouter triggerRouter2 : triggerRoutersForTriggerHist2) { if (triggerRouter2.getInitialLoadOrder() > intialLoadOrder2) { @@ -2029,13 +1854,12 @@ public int compare(TriggerHistory o1, TriggerHistory o2) { return 1; } - Table table1 = platform.getTableFromCache(o1.getSourceCatalogName(), - o1.getSourceSchemaName(), o1.getSourceTableName(), false); - Table table2 = platform.getTableFromCache(o2.getSourceCatalogName(), - o2.getSourceSchemaName(), o2.getSourceTableName(), false); + Table table1 = platform + .getTableFromCache(o1.getSourceCatalogName(), o1.getSourceSchemaName(), o1.getSourceTableName(), false); + Table table2 = platform + .getTableFromCache(o2.getSourceCatalogName(), o2.getSourceSchemaName(), o2.getSourceTableName(), false); - return new Integer(sortedTables.indexOf(table1)).compareTo(new Integer(sortedTables - .indexOf(table2))); + return new Integer(sortedTables.indexOf(table1)).compareTo(new Integer(sortedTables.indexOf(table2))); }; }; @@ -2048,9 +1872,8 @@ public int compare(TriggerHistory o1, TriggerHistory o2) { protected List
getSortedTablesFor(List histories) { List
tables = new ArrayList
(histories.size()); for (TriggerHistory triggerHistory : histories) { - Table table = platform.getTableFromCache(triggerHistory.getSourceCatalogName(), - triggerHistory.getSourceSchemaName(), triggerHistory.getSourceTableName(), - false); + Table table = platform.getTableFromCache(triggerHistory.getSourceCatalogName(), triggerHistory.getSourceSchemaName(), + triggerHistory.getSourceTableName(), false); if (table != null) { tables.add(table); } @@ -2058,11 +1881,9 @@ protected List
getSortedTablesFor(List histories) { return Database.sortByForeignKeys(tables); } - class TriggerRoutersCache { - public TriggerRoutersCache(Map> triggerRoutersByTriggerId, - Map routersByRouterId) { + public TriggerRoutersCache(Map> triggerRoutersByTriggerId, Map routersByRouterId) { this.triggerRoutersByTriggerId = triggerRoutersByTriggerId; this.routersByRouterId = routersByRouterId; } diff --git a/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/AbstractDataExtractorServiceTest.java b/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/AbstractDataExtractorServiceTest.java index 72d6d96663..cea3322441 100644 --- a/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/AbstractDataExtractorServiceTest.java +++ b/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/AbstractDataExtractorServiceTest.java @@ -50,7 +50,7 @@ public abstract class AbstractDataExtractorServiceTest extends AbstractServiceTe @Before public void setupForTest() { ITriggerRouterService triggerRouterService = getTriggerRouterService(); - TriggerRouter triggerRouter = triggerRouterService.findTriggerRouterById(TEST_TABLE, + TriggerRouter triggerRouter = triggerRouterService.findTriggerRouterById(true, TEST_TABLE, TestConstants.ROUTER_ID_ROOT_2_TEST); if (triggerRouter == null) { triggerRouter = new TriggerRouter( diff --git a/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/AbstractRouterServiceTest.java b/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/AbstractRouterServiceTest.java index 5b5034bacd..b6382fcf15 100644 --- a/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/AbstractRouterServiceTest.java +++ b/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/AbstractRouterServiceTest.java @@ -1178,7 +1178,7 @@ protected int countUnroutedBatches() { protected TriggerRouter getTestRoutingTableTrigger(String tableName) { TriggerRouter trigger = null; Set triggerRouters = getTriggerRouterService() - .getTriggerRouterForTableForCurrentNode(null, null, tableName, true); + .getTriggerRouterForTableForCurrentNode(true, null, null, tableName, true); if (triggerRouters == null || triggerRouters.size() == 0) { trigger = new TriggerRouter(); trigger.getTrigger().setSourceTableName(tableName); diff --git a/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/AbstractTriggerRouterServiceTest.java b/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/AbstractTriggerRouterServiceTest.java index dcf4a5cbec..06d69de19f 100644 --- a/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/AbstractTriggerRouterServiceTest.java +++ b/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/AbstractTriggerRouterServiceTest.java @@ -145,13 +145,13 @@ private int getTriggerHistTableRowCount() { @Test public void test04GetRouterById() throws Exception { - Router router = getTriggerRouterService().getRouterById("3000"); + Router router = getTriggerRouterService().getRouterById(true, "3000"); Assert.assertNotNull(router); Assert.assertEquals("3000", router.getRouterId()); Assert.assertEquals("test-root-group", router.getNodeGroupLink().getSourceNodeGroupId()); Assert.assertEquals("test-node-group2", router.getNodeGroupLink().getTargetNodeGroupId()); - router = getTriggerRouterService().getRouterById("666"); + router = getTriggerRouterService().getRouterById(true, "666"); Assert.assertNull(router); } @@ -179,7 +179,7 @@ public void test06InitialLoadSql() throws Exception { IParameterService parameterService = getParameterService(); parameterService.saveParameter(ParameterConstants.INITIAL_LOAD_CONCAT_CSV_IN_SQL_ENABLED, true, "unittest"); TriggerRouter triggerRouter = triggerRouterService - .getTriggerRouterForTableForCurrentNode(null, null, TEST_TRIGGERS_TABLE, true) + .getTriggerRouterForTableForCurrentNode(true, null, null, TEST_TRIGGERS_TABLE, true) .iterator().next(); Table table = getDbDialect().getPlatform().getTableFromCache(triggerRouter.getTrigger().getSourceTableName(), true); @@ -216,7 +216,7 @@ public void test07CaptureOnlyChangedData() throws Exception { getParameterService().saveParameter( ParameterConstants.TRIGGER_UPDATE_CAPTURE_CHANGED_DATA_ONLY, true, "test"); if (!Constants.ALWAYS_TRUE_CONDITION.equals(getDbDialect().getDataHasChangedCondition( - getTriggerRouterService().getTriggers().get(0)))) { + getTriggerRouterService().getTriggers(true).get(0)))) { forceRebuildOfTrigers(); insert(INSERT2_VALUES, template, getDbDialect()); Assert.assertTrue(template.queryForInt( @@ -245,7 +245,7 @@ public void test09ExcludedColumnsFunctionality() throws Exception { service.syncTriggers(); TriggerRouter triggerRouter = service - .getTriggerRouterForTableForCurrentNode(null, null, TEST_TRIGGERS_TABLE, true) + .getTriggerRouterForTableForCurrentNode(true, null, null, TEST_TRIGGERS_TABLE, true) .iterator().next(); assertEquals( jdbcTemplate diff --git a/symmetric-server/src/main/java/org/jumpmind/symmetric/web/rest/RestService.java b/symmetric-server/src/main/java/org/jumpmind/symmetric/web/rest/RestService.java index 0235d89c6d..678fb33dec 100644 --- a/symmetric-server/src/main/java/org/jumpmind/symmetric/web/rest/RestService.java +++ b/symmetric-server/src/main/java/org/jumpmind/symmetric/web/rest/RestService.java @@ -1379,7 +1379,7 @@ private SendSchemaResponse sendSchemaImpl(ISymmetricEngine engine, SendSchemaReq if (nodesToSendTo.size() > 0) { List tablesToSend = request.getTablesToSend(); - List triggerRouters = triggerRouterService.getTriggerRouters(false); + List triggerRouters = triggerRouterService.getTriggerRouters(true, false); for (TriggerRouter triggerRouter : triggerRouters) { Trigger trigger = triggerRouter.getTrigger(); NodeGroupLink link = triggerRouter.getRouter().getNodeGroupLink(); diff --git a/symmetric-server/src/test/java/org/jumpmind/symmetric/test/SimpleIntegrationTest.java b/symmetric-server/src/test/java/org/jumpmind/symmetric/test/SimpleIntegrationTest.java index 2cf8954ce8..21a2bce5bf 100644 --- a/symmetric-server/src/test/java/org/jumpmind/symmetric/test/SimpleIntegrationTest.java +++ b/symmetric-server/src/test/java/org/jumpmind/symmetric/test/SimpleIntegrationTest.java @@ -520,10 +520,12 @@ public void test13SuspendIgnorePullRemoteBatches() throws Exception { logTestRunning(); - // Should not sync when status = null + clientPull(); + Date date = DateUtils.parseDate("2009-09-30", new String[] { "yyyy-MM-dd" }); Order order = new Order("42", 100, "C", date); serverTestService.insertOrder(order); + clientPull(); IOutgoingBatchService rootOutgoingBatchService = getServer().getOutgoingBatchService(); @@ -981,7 +983,7 @@ public void test25TestPurge() throws Exception { @Test(timeout = 120000) public void test26Heartbeat() throws Exception { logTestRunning(); - Level previous = setLoggingLevelForTest(Level.DEBUG); + Level previous = setLoggingLevelForTest(Level.INFO); try { final String checkHeartbeatSql = "select max(heartbeat_time) from sym_node_host where node_id='" + TestConstants.TEST_CLIENT_EXTERNAL_ID + "'"; diff --git a/symmetric-util/src/main/java/org/jumpmind/util/FormatUtils.java b/symmetric-util/src/main/java/org/jumpmind/util/FormatUtils.java index 153c57b2e7..00c3898ece 100644 --- a/symmetric-util/src/main/java/org/jumpmind/util/FormatUtils.java +++ b/symmetric-util/src/main/java/org/jumpmind/util/FormatUtils.java @@ -85,7 +85,7 @@ public static String replaceToken(String text, String tokenToReplace, String rep * just replace the key outright. * @return The text with the token keys replaced */ - public static String replaceTokens(String text, Map replacements, + public static String replaceTokens(String text, Map replacements, boolean matchUsingPrefixSuffix) { if (text != null && replacements != null && replacements.size() > 0) { if (matchUsingPrefixSuffix) { @@ -93,7 +93,7 @@ public static String replaceTokens(String text, Map replacements StringBuffer buffer = new StringBuffer(); while (matcher.find()) { String[] match = matcher.group(1).split("\\|"); - String replacement = replacements.get(match[0]); + String replacement = (String)replacements.get(match[0]); if (replacement != null) { matcher.appendReplacement(buffer, ""); if (match.length == 2) { @@ -106,7 +106,7 @@ public static String replaceTokens(String text, Map replacements text = buffer.toString(); } else { for (Object key : replacements.keySet()) { - text = text.replaceAll(key.toString(), replacements.get(key)); + text = text.replaceAll(key.toString(), (String)replacements.get(key)); } } } diff --git a/symmetric-util/src/main/java/org/jumpmind/util/KeyedCache.java b/symmetric-util/src/main/java/org/jumpmind/util/KeyedCache.java new file mode 100644 index 0000000000..a0c48dfc27 --- /dev/null +++ b/symmetric-util/src/main/java/org/jumpmind/util/KeyedCache.java @@ -0,0 +1,65 @@ +package org.jumpmind.util; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +public class KeyedCache implements Serializable { + + private static final long serialVersionUID = 1L; + + long timeoutTimeInMs; + + long lastRefreshTimeMs = 0; + + protected LinkedHashMap keyedCache = new LinkedHashMap(); + + protected IRefreshCache refresher; + + public KeyedCache(long timeoutTimeInMs, IRefreshCache refresher) { + this.timeoutTimeInMs = timeoutTimeInMs; + this.refresher = refresher; + } + + public boolean containsKey(String key) { + refreshCacheIfNeeded(false); + return keyedCache.containsKey(key); + } + + public T find(K key, boolean refreshCache) { + refreshCacheIfNeeded(refreshCache); + return keyedCache.get(key); + } + + public List getAll(boolean refreshCache) { + refreshCacheIfNeeded(refreshCache); + return new ArrayList(keyedCache.values()); + } + + public void clear() { + lastRefreshTimeMs = Long.MIN_VALUE; + } + + protected void refreshCacheIfNeeded(boolean refreshCache) { + Map copy = keyedCache; + if (copy == null || refreshCache || (System.currentTimeMillis() - lastRefreshTimeMs) > timeoutTimeInMs) { + synchronized (this) { + if (copy == null || refreshCache || (System.currentTimeMillis() - lastRefreshTimeMs) > timeoutTimeInMs) { + refreshCache(); + } + } + } + } + + protected void refreshCache() { + keyedCache = refresher.refresh(); + lastRefreshTimeMs = System.currentTimeMillis(); + } + + public interface IRefreshCache { + public LinkedHashMap refresh(); + } + +}