From b45f99846f206c4ddb69c3a60d07dd08f8d2baf6 Mon Sep 17 00:00:00 2001 From: chenson42 Date: Fri, 1 Feb 2013 04:10:48 +0000 Subject: [PATCH] 0001026: If the minor version is upgraded, then (if you aren't the reg server) request a reload of key symmetric tables --- .../symmetric/AbstractSymmetricEngine.java | 41 ++- .../java/org/jumpmind/symmetric/Version.java | 8 +- .../symmetric/common/TableConstants.java | 14 +- .../symmetric/model/TableReloadRequest.java | 6 +- .../route/ConfigurationChangedDataRouter.java | 9 +- .../service/IConfigurationService.java | 6 - .../symmetric/service/IDataService.java | 7 +- .../service/ITriggerRouterService.java | 2 + .../service/impl/ConfigurationService.java | 46 ---- .../impl/ConfigurationServiceSqlMap.java | 6 - .../service/impl/DataExtractorService.java | 4 +- .../symmetric/service/impl/DataService.java | 57 +++- .../service/impl/DataServiceSqlMap.java | 6 + .../service/impl/RegistrationService.java | 2 +- .../service/impl/TriggerRouterService.java | 20 +- .../org/jumpmind/util/AbstractVersion.java | 245 ++++++++++-------- 16 files changed, 280 insertions(+), 199 deletions(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java index d35fad740a..1dee458398 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java @@ -24,6 +24,7 @@ import java.io.InputStreamReader; import java.net.MalformedURLException; import java.net.URL; +import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -60,6 +61,7 @@ import org.jumpmind.symmetric.model.NodeSecurity; import org.jumpmind.symmetric.model.NodeStatus; import org.jumpmind.symmetric.model.RemoteNodeStatuses; +import org.jumpmind.symmetric.model.TableReloadRequest; import org.jumpmind.symmetric.model.TriggerRouter; import org.jumpmind.symmetric.service.IAcknowledgeService; import org.jumpmind.symmetric.service.IBandwidthService; @@ -503,13 +505,48 @@ public synchronized boolean start(boolean startJobs) { "Starting registered node [group={}, id={}, externalId={}]", new Object[] { node.getNodeGroupId(), node.getNodeId(), node.getExternalId() }); + + triggerRouterService.syncTriggers(); + + if (Version.isOlderVersion(node.getSymmetricVersion()) + && !parameterService.isRegistrationServer()) { + log.info("Minor version of SymmetricDS has increased. Requesting a reload of key configuration tables"); + String parentNodeId = node.getCreatedAtNodeId(); + List tableNames = new ArrayList(); + String tablePrefix = getTablePrefix(); + tableNames.add(TableConstants.getTableName(tablePrefix, TableConstants.SYM_PARAMETER)); + tableNames.add(TableConstants.getTableName(tablePrefix, TableConstants.SYM_CHANNEL)); + tableNames.add(TableConstants.getTableName(tablePrefix, TableConstants.SYM_TRIGGER)); + tableNames.add(TableConstants.getTableName(tablePrefix, TableConstants.SYM_ROUTER)); + tableNames.add(TableConstants.getTableName(tablePrefix, TableConstants.SYM_TRIGGER_ROUTER)); + tableNames.add(TableConstants.getTableName(tablePrefix, TableConstants.SYM_TRANSFORM_TABLE)); + tableNames.add(TableConstants.getTableName(tablePrefix, TableConstants.SYM_LOAD_FILTER)); + tableNames.add(TableConstants.getTableName(tablePrefix, TableConstants.SYM_TRANSFORM_COLUMN)); + tableNames.add(TableConstants.getTableName(tablePrefix, TableConstants.SYM_CONFLICT)); + tableNames.add(TableConstants.getTableName(tablePrefix, TableConstants.SYM_GROUPLET)); + tableNames.add(TableConstants.getTableName(tablePrefix, TableConstants.SYM_GROUPLET_LINK)); + tableNames.add(TableConstants.getTableName(tablePrefix, TableConstants.SYM_TRIGGER_ROUTER_GROUPLET)); + + for (String tableName : tableNames) { + TableReloadRequest request = new TableReloadRequest(); + request.setSourceNodeId(parentNodeId); + request.setTargetNodeId(node.getNodeId()); + request.setTriggerId(tableName); + request.setRouterId(Constants.UNKNOWN_ROUTER_ID); + request.setLastUpdateBy(node.getSymmetricVersion() + " to " + Version.version()); + dataService.saveTableReloadRequest(request); + } + + } + + heartbeat(false); + } else { log.info("Starting unregistered node [group={}, externalId={}]", parameterService.getNodeGroupId(), parameterService.getExternalId()); } - triggerRouterService.syncTriggers(); - heartbeat(false); + if (startJobs && jobManager != null) { jobManager.startJobs(); } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/Version.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/Version.java index d6620558ce..c3e94df540 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/Version.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/Version.java @@ -45,7 +45,7 @@ public static String versionWithUnderscores() { public static int[] parseVersion(String version) { return Version.version.parseVersion(version); - } + } public static boolean isOlderVersion(String version) { return isOlderThanVersion(version, version()); @@ -54,5 +54,11 @@ public static boolean isOlderVersion(String version) { public static boolean isOlderThanVersion(String checkVersion, String targetVersion) { return version.isOlderThanVersion(checkVersion, targetVersion); } + + public static boolean hasOlderMinorVersion(String version) { + return Version.version.isOlderMinorVersion(version); + } + + } \ No newline at end of file diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/TableConstants.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/TableConstants.java index e4912e6a6d..a5e65721ce 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/TableConstants.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/TableConstants.java @@ -159,13 +159,19 @@ protected static List populateAllTables(String tablePrefix) { tables.add(getTableName(tablePrefix, SYM_INCOMING_ERROR)); tables.add(getTableName(tablePrefix, SYM_SEQUENCE)); tables.add(getTableName(tablePrefix, SYM_NODE_COMMUNICATION)); - tables.add(getTableName(tablePrefix, TableConstants.SYM_TABLE_RELOAD_REQUEST)); - tables.add(getTableName(tablePrefix, TableConstants.SYM_GROUPLET)); - tables.add(getTableName(tablePrefix, TableConstants.SYM_GROUPLET_LINK)); - tables.add(getTableName(tablePrefix, TableConstants.SYM_TRIGGER_ROUTER_GROUPLET)); + tables.add(getTableName(tablePrefix, SYM_TABLE_RELOAD_REQUEST)); + tables.add(getTableName(tablePrefix, SYM_GROUPLET)); + tables.add(getTableName(tablePrefix, SYM_GROUPLET_LINK)); + tables.add(getTableName(tablePrefix, SYM_TRIGGER_ROUTER_GROUPLET)); return tables; } + + public static final List getTablesThatSync(String tablePrefix) { + List tables = new ArrayList(getConfigTables(tablePrefix)); + tables.removeAll(getTablesThatDoNotSync(tablePrefix)); + return tables; + } public static final List getTablesThatDoNotSync(String tablePrefix) { List tables = new ArrayList(2); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/TableReloadRequest.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/TableReloadRequest.java index 0a1977eeb7..070094687f 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/TableReloadRequest.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/TableReloadRequest.java @@ -30,10 +30,10 @@ public class TableReloadRequest { protected String routerId; protected String reloadSelect; protected String reloadDeleteStmt; - protected boolean reloadEnabled; + protected boolean reloadEnabled = true; protected Date reloadTime; - protected Date createTime; - protected Date lastUpdateTime; + protected Date createTime = new Date(); + protected Date lastUpdateTime = new Date(); protected String lastUpdateBy; public TableReloadRequest(TableReloadRequestKey key) { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConfigurationChangedDataRouter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConfigurationChangedDataRouter.java index 505e8e4abf..015603c6b1 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConfigurationChangedDataRouter.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConfigurationChangedDataRouter.java @@ -345,7 +345,6 @@ private boolean isLinked(String nodeIdInQuestion, Node nodeThatCouldBeRoutedTo, @Override public void contextCommitted(SimpleRouterContext routingContext) { - if (engine.getParameterService().is(ParameterConstants.AUTO_REFRESH_AFTER_CONFIG_CHANGED, true)) { if (routingContext.get(CTX_KEY_FLUSH_PARAMETERS_NEEDED) != null @@ -395,20 +394,20 @@ public void contextCommitted(SimpleRouterContext routingContext) { } - protected void insertReloadEvents(SimpleRouterContext routingContext) { + protected void insertReloadEvents(SimpleRouterContext routingContext) { @SuppressWarnings("unchecked") List reloadRequestKeys = (List) routingContext .get(CTX_KEY_TABLE_RELOAD_NEEDED); if (reloadRequestKeys != null) { for (TableReloadRequestKey reloadRequestKey : reloadRequestKeys) { - TableReloadRequest request = engine.getConfigurationService() + TableReloadRequest request = engine.getDataService() .getTableReloadRequest(reloadRequestKey); log.info( "Attempting to insert table reload request from data router for node {} and trigger {}", reloadRequestKey.getTargetNodeId(), reloadRequestKey.getTriggerId()); - engine.getDataService().insertReloadEvent(request); - routingContext.setRequestGapDetection(true); + engine.getDataService().insertReloadEvent(request); } + routingContext.setRequestGapDetection(true); } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IConfigurationService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IConfigurationService.java index 23a47f3348..0dc1b268f0 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IConfigurationService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IConfigurationService.java @@ -29,8 +29,6 @@ import org.jumpmind.symmetric.model.NodeGroupChannelWindow; import org.jumpmind.symmetric.model.NodeGroupLink; import org.jumpmind.symmetric.model.NodeGroupLinkAction; -import org.jumpmind.symmetric.model.TableReloadRequest; -import org.jumpmind.symmetric.model.TableReloadRequestKey; /** * Provides an API to configure data synchronizations. @@ -53,10 +51,6 @@ public interface IConfigurationService { public NodeGroupLink getNodeGroupLinkFor(String sourceNodeGroupId, String targetNodeGroupId); - public void saveTableReloadRequest(TableReloadRequest request); - - public TableReloadRequest getTableReloadRequest(TableReloadRequestKey key); - /** * Check to see if the channel is currently being used in the system. */ diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java index c014764c82..3eb40e77ef 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java @@ -34,13 +34,18 @@ import org.jumpmind.symmetric.model.DataGap; import org.jumpmind.symmetric.model.Node; import org.jumpmind.symmetric.model.TableReloadRequest; +import org.jumpmind.symmetric.model.TableReloadRequestKey; import org.jumpmind.symmetric.model.TriggerHistory; import org.jumpmind.symmetric.model.TriggerRouter; /** * This service provides an API to access and update {@link Data}. */ -public interface IDataService { +public interface IDataService { + + public void saveTableReloadRequest(TableReloadRequest request); + + public TableReloadRequest getTableReloadRequest(TableReloadRequestKey key); public String reloadNode(String nodeId, boolean reverseLoad); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/ITriggerRouterService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/ITriggerRouterService.java index 272a3754a3..35b4314987 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/ITriggerRouterService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/ITriggerRouterService.java @@ -53,6 +53,8 @@ public interface ITriggerRouterService { public Trigger getTriggerForCurrentNodeById(String triggerId); + public TriggerRouter getTriggerRouterForCurrentNode(String triggerId, String routerId, boolean refreshCache); + /** * Returns a list of triggers that should be active for the current node. * @param refreshCache Indicates that the cache should be refreshed diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationService.java index 62b1f20500..c4f8b61aab 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationService.java @@ -20,7 +20,6 @@ */ package org.jumpmind.symmetric.service.impl; -import java.sql.Types; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; @@ -40,8 +39,6 @@ import org.jumpmind.symmetric.model.NodeGroupChannelWindow; import org.jumpmind.symmetric.model.NodeGroupLink; import org.jumpmind.symmetric.model.NodeGroupLinkAction; -import org.jumpmind.symmetric.model.TableReloadRequest; -import org.jumpmind.symmetric.model.TableReloadRequestKey; import org.jumpmind.symmetric.service.IConfigurationService; import org.jumpmind.symmetric.service.INodeService; import org.jumpmind.symmetric.service.IParameterService; @@ -76,49 +73,6 @@ public ConfigurationService(IParameterService parameterService, ISymmetricDialec createSqlReplacementTokens())); } - public void saveTableReloadRequest(TableReloadRequest request) { - Date time = new Date(); - request.setLastUpdateTime(time); - if (0 == sqlTemplate.update( - getSql("updateTableReloadRequest"), - new Object[] { request.getReloadSelect(), request.getReloadDeleteStmt(), - request.isReloadEnabled() ? 1 : 0, request.getReloadTime(), - request.getCreateTime(), request.getLastUpdateBy(), - request.getLastUpdateTime(), request.getSourceNodeId(), - request.getTargetNodeId(), request.getRouterId(), request.getTriggerId() }, - new int[] { Types.VARCHAR, Types.VARCHAR, Types.SMALLINT, Types.TIMESTAMP, - Types.TIMESTAMP, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, - Types.VARCHAR, Types.VARCHAR, Types.VARCHAR })) { - request.setCreateTime(time); - sqlTemplate.update( - getSql("insertTableReloadRequest"), - new Object[] { request.getReloadSelect(), request.getReloadDeleteStmt(), - request.isReloadEnabled() ? 1 : 0, request.getReloadTime(), - request.getCreateTime(), request.getLastUpdateBy(), - request.getLastUpdateTime(), request.getSourceNodeId(), - request.getTargetNodeId(), request.getRouterId(), - request.getTriggerId() }, new int[] { Types.VARCHAR, Types.VARCHAR, - Types.SMALLINT, Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, - Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, - Types.VARCHAR }); - } - } - - public TableReloadRequest getTableReloadRequest(final TableReloadRequestKey key) { - return sqlTemplate.queryForObject(getSql("selectTableReloadRequest"), new ISqlRowMapper() { - public TableReloadRequest mapRow(Row rs) { - TableReloadRequest request = new TableReloadRequest(key); - request.setReloadSelect(rs.getString("reload_select")); - request.setReloadEnabled(rs.getBoolean("reload_enabled")); - request.setReloadTime(rs.getDateTime("reload_time")); - request.setCreateTime(rs.getDateTime("create_time")); - request.setLastUpdateBy(rs.getString("last_update_by")); - request.setLastUpdateTime(rs.getDateTime("last_update_time")); - return request; - } - }, key.getSourceNodeId(), key.getTargetNodeId(), key.getTriggerId(), key.getRouterId()); - } - public void saveNodeGroupLink(NodeGroupLink link) { if (!doesNodeGroupExist(link.getSourceNodeGroupId())) { saveNodeGroup(new NodeGroup(link.getSourceNodeGroupId())); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationServiceSqlMap.java index ca07d2b7a7..45b2794f59 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationServiceSqlMap.java @@ -12,12 +12,6 @@ public ConfigurationServiceSqlMap(IDatabasePlatform platform, // @formatter:off - putSql("selectTableReloadRequest", "select reload_select, reload_delete_stmt, reload_enabled, reload_time, create_time, last_update_by, last_update_time from $(table_reload_request) where source_node_id=? and target_node_id=? and trigger_id=? and router_id=?"); - - putSql("insertTableReloadRequest", "insert into $(table_reload_request) (reload_select, reload_delete_stmt, reload_enabled, reload_time, create_time, last_update_by, last_update_time, source_node_id, target_node_id, trigger_id, router_id) values (?,?,?,?,?,?,?,?,?,?,?)"); - - putSql("updateTableReloadRequest", "update $(table_reload_request) set reload_select=?, reload_delete_stmt=?, reload_enabled=?, reload_time=?, create_time=?, last_update_by=?, last_update_time=? where source_node_id=? and target_node_id=? and trigger_id=? and router_id=?"); - putSql("selectDataEventActionsByIdSql", " select data_event_action from $(node_group_link) where " + " source_node_group_id = ? and target_node_group_id = ? "); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index 701fdf485a..cdef3f4191 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -764,9 +764,7 @@ public CsvData next() { String triggerId = triggerHistory.getTriggerId(); - // fyi, this queries the database - TriggerRouter triggerRouter = triggerRouterService.findTriggerRouterById( - triggerId, routerId); + TriggerRouter triggerRouter = triggerRouterService.getTriggerRouterForCurrentNode(triggerId, routerId, false); SelectFromTableEvent event = new SelectFromTableEvent(targetNode, triggerRouter, triggerHistory); this.reloadSource = new SelectFromTableSource(outgoingBatch, batch, event); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java index 8ad902846c..989cce5874 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java @@ -61,9 +61,10 @@ import org.jumpmind.symmetric.model.NodeGroupLink; import org.jumpmind.symmetric.model.NodeGroupLinkAction; import org.jumpmind.symmetric.model.OutgoingBatch; +import org.jumpmind.symmetric.model.OutgoingBatch.Status; import org.jumpmind.symmetric.model.Router; import org.jumpmind.symmetric.model.TableReloadRequest; -import org.jumpmind.symmetric.model.OutgoingBatch.Status; +import org.jumpmind.symmetric.model.TableReloadRequestKey; import org.jumpmind.symmetric.model.Trigger; import org.jumpmind.symmetric.model.TriggerHistory; import org.jumpmind.symmetric.model.TriggerRouter; @@ -106,8 +107,8 @@ public boolean insertReloadEvent(TableReloadRequest request) { INodeService nodeService = engine.getNodeService(); Node targetNode = nodeService.findNode(request.getTargetNodeId()); if (targetNode != null) { - TriggerRouter triggerRouter = triggerRouterService.findTriggerRouterById( - request.getTriggerId(), request.getRouterId()); + TriggerRouter triggerRouter = triggerRouterService. + getTriggerRouterForCurrentNode(request.getTriggerId(), request.getRouterId(), false); if (triggerRouter != null) { Trigger trigger = triggerRouter.getTrigger(); Router router = triggerRouter.getRouter(); @@ -147,12 +148,13 @@ public boolean insertReloadEvent(TableReloadRequest request) { .getSourceNodeId(), request.getTriggerId(), request.getRouterId()), false); + transaction.commit(); + request.setReloadEnabled(false); request.setReloadTime(new Date()); request.setLastUpdateBy("symmetricds"); - engine.getConfigurationService().saveTableReloadRequest(request); + saveTableReloadRequest(request); - transaction.commit(); } finally { close(transaction); } @@ -171,7 +173,7 @@ public boolean insertReloadEvent(TableReloadRequest request) { } } else { log.error( - "Could not reload table for node {} because it the trigger router ({}, {}) could not be found", + "Could not reload table for node {} because the trigger router ({}, {}) could not be found", new Object[] { request.getTargetNodeId(), request.getTriggerId(), request.getRouterId() }); } @@ -183,6 +185,49 @@ public boolean insertReloadEvent(TableReloadRequest request) { return successful; } + + public void saveTableReloadRequest(TableReloadRequest request) { + Date time = new Date(); + request.setLastUpdateTime(time); + if (0 == sqlTemplate.update( + getSql("updateTableReloadRequest"), + new Object[] { request.getReloadSelect(), request.getReloadDeleteStmt(), + request.isReloadEnabled() ? 1 : 0, request.getReloadTime(), + request.getCreateTime(), request.getLastUpdateBy(), + request.getLastUpdateTime(), request.getSourceNodeId(), + request.getTargetNodeId(), request.getTriggerId(), request.getRouterId() }, + new int[] { Types.VARCHAR, Types.VARCHAR, Types.SMALLINT, Types.TIMESTAMP, + Types.TIMESTAMP, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, + Types.VARCHAR, Types.VARCHAR, Types.VARCHAR })) { + request.setCreateTime(time); + sqlTemplate.update( + getSql("insertTableReloadRequest"), + new Object[] { request.getReloadSelect(), request.getReloadDeleteStmt(), + request.isReloadEnabled() ? 1 : 0, request.getReloadTime(), + request.getCreateTime(), request.getLastUpdateBy(), + request.getLastUpdateTime(), request.getSourceNodeId(), + request.getTargetNodeId(), + request.getTriggerId(), request.getRouterId() }, new int[] { Types.VARCHAR, Types.VARCHAR, + Types.SMALLINT, Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, + Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, + Types.VARCHAR }); + } + } + + public TableReloadRequest getTableReloadRequest(final TableReloadRequestKey key) { + return sqlTemplate.queryForObject(getSql("selectTableReloadRequest"), new ISqlRowMapper() { + public TableReloadRequest mapRow(Row rs) { + TableReloadRequest request = new TableReloadRequest(key); + request.setReloadSelect(rs.getString("reload_select")); + request.setReloadEnabled(rs.getBoolean("reload_enabled")); + request.setReloadTime(rs.getDateTime("reload_time")); + request.setCreateTime(rs.getDateTime("create_time")); + request.setLastUpdateBy(rs.getString("last_update_by")); + request.setLastUpdateTime(rs.getDateTime("last_update_time")); + return request; + } + }, key.getSourceNodeId(), key.getTargetNodeId(), key.getTriggerId(), key.getRouterId()); + } public void insertReloadEvent(final Node targetNode, final TriggerRouter triggerRouter) { insertReloadEvent(targetNode, triggerRouter, null); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java index d950ad0c9f..76c269b9f6 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java @@ -9,6 +9,12 @@ public class DataServiceSqlMap extends AbstractSqlMap { public DataServiceSqlMap(IDatabasePlatform platform, Map replacementTokens) { super(platform, replacementTokens); + putSql("selectTableReloadRequest", "select reload_select, reload_delete_stmt, reload_enabled, reload_time, create_time, last_update_by, last_update_time from $(table_reload_request) where source_node_id=? and target_node_id=? and trigger_id=? and router_id=?"); + + putSql("insertTableReloadRequest", "insert into $(table_reload_request) (reload_select, reload_delete_stmt, reload_enabled, reload_time, create_time, last_update_by, last_update_time, source_node_id, target_node_id, trigger_id, router_id) values (?,?,?,?,?,?,?,?,?,?,?)"); + + putSql("updateTableReloadRequest", "update $(table_reload_request) set reload_select=?, reload_delete_stmt=?, reload_enabled=?, reload_time=?, create_time=?, last_update_by=?, last_update_time=? where source_node_id=? and target_node_id=? and trigger_id=? and router_id=?"); + // Note that the order by data_id is done appended in code putSql("selectEventDataToExtractSql", "" diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RegistrationService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RegistrationService.java index fe2f2981fc..ae50b9b5c9 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RegistrationService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RegistrationService.java @@ -129,7 +129,7 @@ public boolean registerNode(Node nodePriorToRegistration, String remoteHost, Str RegistrationStatus.RQ, remoteHost, remoteAddress)); log.warn("Registration is not allowed unless a link exists so the registering node can receive configuration updates. Please add a group link where the source group id is {} and the target group id is {}", identity.getNodeGroupId(), nodePriorToRegistration.getNodeGroupId()); - return false; + return false; } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java index 8f692232ff..00d0321bce 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java @@ -322,7 +322,7 @@ protected Trigger buildTriggerForSymmetricTable(String tableName) { || tableName.equals(TableConstants.getTableName(tablePrefix, TableConstants.SYM_TABLE_RELOAD_REQUEST)); Trigger trigger = new Trigger(); - trigger.setTriggerId(Integer.toString(Math.abs(tableName.hashCode()))); + trigger.setTriggerId(tableName); trigger.setSyncOnDelete(syncChanges); trigger.setSyncOnInsert(syncChanges); trigger.setSyncOnUpdate(syncChanges); @@ -480,6 +480,24 @@ protected boolean doesTriggerRouterExistInList(List triggerRouter } return false; } + + public TriggerRouter getTriggerRouterForCurrentNode(String triggerId, String routerId, boolean refreshCache) { + TriggerRouter triggerRouter = null; + List triggerRouters = getTriggerRoutersForCurrentNode(refreshCache).get(triggerId); + for (TriggerRouter testTriggerRouter : triggerRouters) { + if (testTriggerRouter.getRouter().getRouterId().equals(routerId) || + testTriggerRouter.getRouter().getRouterId().equals(Constants.UNKNOWN_ROUTER_ID)) { + triggerRouter = testTriggerRouter; + break; + } + } + + if (triggerRouter == null) { + log.warn("Could not find trigger router {} {} in list {}", new Object[] {triggerId, routerId, triggerRouters.toString()}); + } + + return triggerRouter; + } public Map> getTriggerRoutersForCurrentNode(boolean refreshCache) { return getTriggerRoutersCacheForCurrentNode(refreshCache).triggerRoutersByTriggerId; diff --git a/symmetric-util/src/main/java/org/jumpmind/util/AbstractVersion.java b/symmetric-util/src/main/java/org/jumpmind/util/AbstractVersion.java index c74922ab56..b63eaaff10 100644 --- a/symmetric-util/src/main/java/org/jumpmind/util/AbstractVersion.java +++ b/symmetric-util/src/main/java/org/jumpmind/util/AbstractVersion.java @@ -16,125 +16,142 @@ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. */ -package org.jumpmind.util; - + * under the License. + */ +package org.jumpmind.util; + import java.io.IOException; import java.io.InputStream; import java.util.Properties; import org.apache.commons.lang.StringUtils; - -/** - * Follow the Apache versioning scheme documented here. - */ -abstract public class AbstractVersion { - - final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory - .getLog(getClass()); - - public static final int MAJOR_INDEX = 0; - - public static final int MINOR_INDEX = 1; - - public static final int PATCH_INDEX = 2; - + */ +abstract public class AbstractVersion { + + final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory + .getLog(getClass()); + + public static final int MAJOR_INDEX = 0; + + public static final int MINOR_INDEX = 1; + + public static final int PATCH_INDEX = 2; + private String version = null; - - abstract protected String getPropertiesFileLocation(); - - public String version() { - if (version == null) { - InputStream is = AbstractVersion.class - .getResourceAsStream(getPropertiesFileLocation()); - if (is != null) { - Properties p = new Properties(); - try { - p.load(is); - version = p.getProperty("version"); - } catch (IOException e) { - version = "unknown"; - log.warn(e, e); - } - } else { - version = "development"; - } - } - return version; - } - - public String versionWithUnderscores() { - return version().replace("[\\.\\-]", "_"); - } - - public int[] parseVersion(String version) { - version = version.replaceAll("[^0-9\\.]", ""); - int[] versions = new int[3]; - if (!StringUtils.isEmpty(version)) { - String[] splitVersion = version.split("\\."); - if (splitVersion.length >= 3) { - versions[PATCH_INDEX] = parseVersionComponent(splitVersion[2]); - } - if (splitVersion.length >= 2) { - versions[MINOR_INDEX] = parseVersionComponent(splitVersion[1]); - } - if (splitVersion.length >= 1) { - versions[MAJOR_INDEX] = parseVersionComponent(splitVersion[0]); - } - } - return versions; - } - - private int parseVersionComponent(String versionComponent) { - int version = 0; - try { - version = Integer.parseInt(versionComponent); - } catch (NumberFormatException e) { - } - return version; - } - - protected boolean isOlderMajorVersion(String version) { - return isOlderMajorVersion(parseVersion(version)); - } - - protected boolean isOlderMajorVersion(int[] versions) { - int[] softwareVersion = parseVersion(version()); - if (versions[MAJOR_INDEX] < softwareVersion[MAJOR_INDEX]) { - return true; - } - return false; - } - - public boolean isOlderVersion(String version) { - return isOlderThanVersion(version, version()); - } - - public boolean isOlderThanVersion(String checkVersion, - String targetVersion) { - - if(noVersion(targetVersion) || noVersion(checkVersion)) { - return false; - } - - int[] checkVersions = parseVersion(checkVersion); - int[] targetVersions = parseVersion(targetVersion); - - if (checkVersions[MAJOR_INDEX] < targetVersions[MAJOR_INDEX]) { - return true; - } else if (checkVersions[MAJOR_INDEX] == targetVersions[MAJOR_INDEX] - && checkVersions[MINOR_INDEX] < targetVersions[MINOR_INDEX]) { - return true; - } else if (checkVersions[MAJOR_INDEX] == targetVersions[MAJOR_INDEX] - && checkVersions[MINOR_INDEX] == targetVersions[MINOR_INDEX] - && checkVersions[PATCH_INDEX] < targetVersions[PATCH_INDEX]) { - return true; - } - return false; - } - - private boolean noVersion(String targetVersion) { - return StringUtils.isBlank(targetVersion); - } + + abstract protected String getPropertiesFileLocation(); + + public String version() { + if (version == null) { + InputStream is = AbstractVersion.class.getResourceAsStream(getPropertiesFileLocation()); + if (is != null) { + Properties p = new Properties(); + try { + p.load(is); + version = p.getProperty("version"); + } catch (IOException e) { + version = "unknown"; + log.warn(e, e); + } + } else { + version = "development"; + } + } + return version; + } + + public String versionWithUnderscores() { + return version().replace("[\\.\\-]", "_"); + } + + public int[] parseVersion(String version) { + version = version.replaceAll("[^0-9\\.]", ""); + int[] versions = new int[3]; + if (!StringUtils.isEmpty(version)) { + String[] splitVersion = version.split("\\."); + if (splitVersion.length >= 3) { + versions[PATCH_INDEX] = parseVersionComponent(splitVersion[2]); + } + if (splitVersion.length >= 2) { + versions[MINOR_INDEX] = parseVersionComponent(splitVersion[1]); + } + if (splitVersion.length >= 1) { + versions[MAJOR_INDEX] = parseVersionComponent(splitVersion[0]); + } + } + return versions; + } + + private int parseVersionComponent(String versionComponent) { + int version = 0; + try { + version = Integer.parseInt(versionComponent); + } catch (NumberFormatException e) { + } + return version; + } + + protected boolean isOlderMajorVersion(String version) { + return isOlderMajorVersion(parseVersion(version)); + } + + protected boolean isOlderMajorVersion(int[] versions) { + int[] softwareVersion = parseVersion(version()); + if (versions[MAJOR_INDEX] < softwareVersion[MAJOR_INDEX]) { + return true; + } + return false; + } + + public boolean isOlderVersion(String version) { + return isOlderThanVersion(version, version()); + } + + public boolean isOlderThanVersion(String checkVersion, String targetVersion) { + + if (noVersion(targetVersion) || noVersion(checkVersion)) { + return false; + } + + int[] checkVersions = parseVersion(checkVersion); + int[] targetVersions = parseVersion(targetVersion); + + if (checkVersions[MAJOR_INDEX] < targetVersions[MAJOR_INDEX]) { + return true; + } else if (checkVersions[MAJOR_INDEX] == targetVersions[MAJOR_INDEX] + && checkVersions[MINOR_INDEX] < targetVersions[MINOR_INDEX]) { + return true; + } else if (checkVersions[MAJOR_INDEX] == targetVersions[MAJOR_INDEX] + && checkVersions[MINOR_INDEX] == targetVersions[MINOR_INDEX] + && checkVersions[PATCH_INDEX] < targetVersions[PATCH_INDEX]) { + return true; + } + return false; + } + + protected boolean noVersion(String targetVersion) { + return StringUtils.isBlank(targetVersion); + } + + public boolean isOlderMinorVersion(String oldVersion) { + String currentVersion = version(); + if (noVersion(currentVersion) || noVersion(oldVersion)) { + return false; + } + + int[] checkVersions = parseVersion(oldVersion); + int[] targetVersions = parseVersion(currentVersion); + + if (checkVersions[MAJOR_INDEX] < targetVersions[MAJOR_INDEX]) { + return true; + } else if (checkVersions[MAJOR_INDEX] == targetVersions[MAJOR_INDEX] + && checkVersions[MINOR_INDEX] < targetVersions[MINOR_INDEX]) { + return true; + } + return false; + } }