Skip to content

Commit

Permalink
0001026: If the minor version is upgraded, then (if you aren't the re…
Browse files Browse the repository at this point in the history
…g server) request a reload of key symmetric tables
  • Loading branch information
chenson42 committed Feb 1, 2013
1 parent b61dde6 commit b45f998
Show file tree
Hide file tree
Showing 16 changed files with 280 additions and 199 deletions.
Expand Up @@ -24,6 +24,7 @@
import java.io.InputStreamReader;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -60,6 +61,7 @@
import org.jumpmind.symmetric.model.NodeSecurity;
import org.jumpmind.symmetric.model.NodeStatus;
import org.jumpmind.symmetric.model.RemoteNodeStatuses;
import org.jumpmind.symmetric.model.TableReloadRequest;
import org.jumpmind.symmetric.model.TriggerRouter;
import org.jumpmind.symmetric.service.IAcknowledgeService;
import org.jumpmind.symmetric.service.IBandwidthService;
Expand Down Expand Up @@ -503,13 +505,48 @@ public synchronized boolean start(boolean startJobs) {
"Starting registered node [group={}, id={}, externalId={}]",
new Object[] { node.getNodeGroupId(), node.getNodeId(),
node.getExternalId() });

triggerRouterService.syncTriggers();

if (Version.isOlderVersion(node.getSymmetricVersion())
&& !parameterService.isRegistrationServer()) {
log.info("Minor version of SymmetricDS has increased. Requesting a reload of key configuration tables");
String parentNodeId = node.getCreatedAtNodeId();
List<String> tableNames = new ArrayList<String>();
String tablePrefix = getTablePrefix();
tableNames.add(TableConstants.getTableName(tablePrefix, TableConstants.SYM_PARAMETER));
tableNames.add(TableConstants.getTableName(tablePrefix, TableConstants.SYM_CHANNEL));
tableNames.add(TableConstants.getTableName(tablePrefix, TableConstants.SYM_TRIGGER));
tableNames.add(TableConstants.getTableName(tablePrefix, TableConstants.SYM_ROUTER));
tableNames.add(TableConstants.getTableName(tablePrefix, TableConstants.SYM_TRIGGER_ROUTER));
tableNames.add(TableConstants.getTableName(tablePrefix, TableConstants.SYM_TRANSFORM_TABLE));
tableNames.add(TableConstants.getTableName(tablePrefix, TableConstants.SYM_LOAD_FILTER));
tableNames.add(TableConstants.getTableName(tablePrefix, TableConstants.SYM_TRANSFORM_COLUMN));
tableNames.add(TableConstants.getTableName(tablePrefix, TableConstants.SYM_CONFLICT));
tableNames.add(TableConstants.getTableName(tablePrefix, TableConstants.SYM_GROUPLET));
tableNames.add(TableConstants.getTableName(tablePrefix, TableConstants.SYM_GROUPLET_LINK));
tableNames.add(TableConstants.getTableName(tablePrefix, TableConstants.SYM_TRIGGER_ROUTER_GROUPLET));

for (String tableName : tableNames) {
TableReloadRequest request = new TableReloadRequest();
request.setSourceNodeId(parentNodeId);
request.setTargetNodeId(node.getNodeId());
request.setTriggerId(tableName);
request.setRouterId(Constants.UNKNOWN_ROUTER_ID);
request.setLastUpdateBy(node.getSymmetricVersion() + " to " + Version.version());
dataService.saveTableReloadRequest(request);
}

}

heartbeat(false);

} else {
log.info("Starting unregistered node [group={}, externalId={}]",
parameterService.getNodeGroupId(),
parameterService.getExternalId());
}
triggerRouterService.syncTriggers();
heartbeat(false);

if (startJobs && jobManager != null) {
jobManager.startJobs();
}
Expand Down
Expand Up @@ -45,7 +45,7 @@ public static String versionWithUnderscores() {

public static int[] parseVersion(String version) {
return Version.version.parseVersion(version);
}
}

public static boolean isOlderVersion(String version) {
return isOlderThanVersion(version, version());
Expand All @@ -54,5 +54,11 @@ public static boolean isOlderVersion(String version) {
public static boolean isOlderThanVersion(String checkVersion, String targetVersion) {
return version.isOlderThanVersion(checkVersion, targetVersion);
}

public static boolean hasOlderMinorVersion(String version) {
return Version.version.isOlderMinorVersion(version);
}



}
Expand Up @@ -159,13 +159,19 @@ protected static List<String> populateAllTables(String tablePrefix) {
tables.add(getTableName(tablePrefix, SYM_INCOMING_ERROR));
tables.add(getTableName(tablePrefix, SYM_SEQUENCE));
tables.add(getTableName(tablePrefix, SYM_NODE_COMMUNICATION));
tables.add(getTableName(tablePrefix, TableConstants.SYM_TABLE_RELOAD_REQUEST));
tables.add(getTableName(tablePrefix, TableConstants.SYM_GROUPLET));
tables.add(getTableName(tablePrefix, TableConstants.SYM_GROUPLET_LINK));
tables.add(getTableName(tablePrefix, TableConstants.SYM_TRIGGER_ROUTER_GROUPLET));
tables.add(getTableName(tablePrefix, SYM_TABLE_RELOAD_REQUEST));
tables.add(getTableName(tablePrefix, SYM_GROUPLET));
tables.add(getTableName(tablePrefix, SYM_GROUPLET_LINK));
tables.add(getTableName(tablePrefix, SYM_TRIGGER_ROUTER_GROUPLET));

return tables;
}

public static final List<String> getTablesThatSync(String tablePrefix) {
List<String> tables = new ArrayList<String>(getConfigTables(tablePrefix));
tables.removeAll(getTablesThatDoNotSync(tablePrefix));
return tables;
}

public static final List<String> getTablesThatDoNotSync(String tablePrefix) {
List<String> tables = new ArrayList<String>(2);
Expand Down
Expand Up @@ -30,10 +30,10 @@ public class TableReloadRequest {
protected String routerId;
protected String reloadSelect;
protected String reloadDeleteStmt;
protected boolean reloadEnabled;
protected boolean reloadEnabled = true;
protected Date reloadTime;
protected Date createTime;
protected Date lastUpdateTime;
protected Date createTime = new Date();
protected Date lastUpdateTime = new Date();
protected String lastUpdateBy;

public TableReloadRequest(TableReloadRequestKey key) {
Expand Down
Expand Up @@ -345,7 +345,6 @@ private boolean isLinked(String nodeIdInQuestion, Node nodeThatCouldBeRoutedTo,

@Override
public void contextCommitted(SimpleRouterContext routingContext) {

if (engine.getParameterService().is(ParameterConstants.AUTO_REFRESH_AFTER_CONFIG_CHANGED,
true)) {
if (routingContext.get(CTX_KEY_FLUSH_PARAMETERS_NEEDED) != null
Expand Down Expand Up @@ -395,20 +394,20 @@ public void contextCommitted(SimpleRouterContext routingContext) {

}

protected void insertReloadEvents(SimpleRouterContext routingContext) {
protected void insertReloadEvents(SimpleRouterContext routingContext) {
@SuppressWarnings("unchecked")
List<TableReloadRequestKey> reloadRequestKeys = (List<TableReloadRequestKey>) routingContext
.get(CTX_KEY_TABLE_RELOAD_NEEDED);
if (reloadRequestKeys != null) {
for (TableReloadRequestKey reloadRequestKey : reloadRequestKeys) {
TableReloadRequest request = engine.getConfigurationService()
TableReloadRequest request = engine.getDataService()
.getTableReloadRequest(reloadRequestKey);
log.info(
"Attempting to insert table reload request from data router for node {} and trigger {}",
reloadRequestKey.getTargetNodeId(), reloadRequestKey.getTriggerId());
engine.getDataService().insertReloadEvent(request);
routingContext.setRequestGapDetection(true);
engine.getDataService().insertReloadEvent(request);
}
routingContext.setRequestGapDetection(true);
}
}

Expand Down
Expand Up @@ -29,8 +29,6 @@
import org.jumpmind.symmetric.model.NodeGroupChannelWindow;
import org.jumpmind.symmetric.model.NodeGroupLink;
import org.jumpmind.symmetric.model.NodeGroupLinkAction;
import org.jumpmind.symmetric.model.TableReloadRequest;
import org.jumpmind.symmetric.model.TableReloadRequestKey;

/**
* Provides an API to configure data synchronizations.
Expand All @@ -53,10 +51,6 @@ public interface IConfigurationService {

public NodeGroupLink getNodeGroupLinkFor(String sourceNodeGroupId, String targetNodeGroupId);

public void saveTableReloadRequest(TableReloadRequest request);

public TableReloadRequest getTableReloadRequest(TableReloadRequestKey key);

/**
* Check to see if the channel is currently being used in the system.
*/
Expand Down
Expand Up @@ -34,13 +34,18 @@
import org.jumpmind.symmetric.model.DataGap;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.TableReloadRequest;
import org.jumpmind.symmetric.model.TableReloadRequestKey;
import org.jumpmind.symmetric.model.TriggerHistory;
import org.jumpmind.symmetric.model.TriggerRouter;

/**
* This service provides an API to access and update {@link Data}.
*/
public interface IDataService {
public interface IDataService {

public void saveTableReloadRequest(TableReloadRequest request);

public TableReloadRequest getTableReloadRequest(TableReloadRequestKey key);

public String reloadNode(String nodeId, boolean reverseLoad);

Expand Down
Expand Up @@ -53,6 +53,8 @@ public interface ITriggerRouterService {

public Trigger getTriggerForCurrentNodeById(String triggerId);

public TriggerRouter getTriggerRouterForCurrentNode(String triggerId, String routerId, boolean refreshCache);

/**
* Returns a list of triggers that should be active for the current node.
* @param refreshCache Indicates that the cache should be refreshed
Expand Down
Expand Up @@ -20,7 +20,6 @@
*/
package org.jumpmind.symmetric.service.impl;

import java.sql.Types;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
Expand All @@ -40,8 +39,6 @@
import org.jumpmind.symmetric.model.NodeGroupChannelWindow;
import org.jumpmind.symmetric.model.NodeGroupLink;
import org.jumpmind.symmetric.model.NodeGroupLinkAction;
import org.jumpmind.symmetric.model.TableReloadRequest;
import org.jumpmind.symmetric.model.TableReloadRequestKey;
import org.jumpmind.symmetric.service.IConfigurationService;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.IParameterService;
Expand Down Expand Up @@ -76,49 +73,6 @@ public ConfigurationService(IParameterService parameterService, ISymmetricDialec
createSqlReplacementTokens()));
}

public void saveTableReloadRequest(TableReloadRequest request) {
Date time = new Date();
request.setLastUpdateTime(time);
if (0 == sqlTemplate.update(
getSql("updateTableReloadRequest"),
new Object[] { request.getReloadSelect(), request.getReloadDeleteStmt(),
request.isReloadEnabled() ? 1 : 0, request.getReloadTime(),
request.getCreateTime(), request.getLastUpdateBy(),
request.getLastUpdateTime(), request.getSourceNodeId(),
request.getTargetNodeId(), request.getRouterId(), request.getTriggerId() },
new int[] { Types.VARCHAR, Types.VARCHAR, Types.SMALLINT, Types.TIMESTAMP,
Types.TIMESTAMP, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR,
Types.VARCHAR, Types.VARCHAR, Types.VARCHAR })) {
request.setCreateTime(time);
sqlTemplate.update(
getSql("insertTableReloadRequest"),
new Object[] { request.getReloadSelect(), request.getReloadDeleteStmt(),
request.isReloadEnabled() ? 1 : 0, request.getReloadTime(),
request.getCreateTime(), request.getLastUpdateBy(),
request.getLastUpdateTime(), request.getSourceNodeId(),
request.getTargetNodeId(), request.getRouterId(),
request.getTriggerId() }, new int[] { Types.VARCHAR, Types.VARCHAR,
Types.SMALLINT, Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR,
Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR,
Types.VARCHAR });
}
}

public TableReloadRequest getTableReloadRequest(final TableReloadRequestKey key) {
return sqlTemplate.queryForObject(getSql("selectTableReloadRequest"), new ISqlRowMapper<TableReloadRequest>() {
public TableReloadRequest mapRow(Row rs) {
TableReloadRequest request = new TableReloadRequest(key);
request.setReloadSelect(rs.getString("reload_select"));
request.setReloadEnabled(rs.getBoolean("reload_enabled"));
request.setReloadTime(rs.getDateTime("reload_time"));
request.setCreateTime(rs.getDateTime("create_time"));
request.setLastUpdateBy(rs.getString("last_update_by"));
request.setLastUpdateTime(rs.getDateTime("last_update_time"));
return request;
}
}, key.getSourceNodeId(), key.getTargetNodeId(), key.getTriggerId(), key.getRouterId());
}

public void saveNodeGroupLink(NodeGroupLink link) {
if (!doesNodeGroupExist(link.getSourceNodeGroupId())) {
saveNodeGroup(new NodeGroup(link.getSourceNodeGroupId()));
Expand Down
Expand Up @@ -12,12 +12,6 @@ public ConfigurationServiceSqlMap(IDatabasePlatform platform,

// @formatter:off

putSql("selectTableReloadRequest", "select reload_select, reload_delete_stmt, reload_enabled, reload_time, create_time, last_update_by, last_update_time from $(table_reload_request) where source_node_id=? and target_node_id=? and trigger_id=? and router_id=?");

putSql("insertTableReloadRequest", "insert into $(table_reload_request) (reload_select, reload_delete_stmt, reload_enabled, reload_time, create_time, last_update_by, last_update_time, source_node_id, target_node_id, trigger_id, router_id) values (?,?,?,?,?,?,?,?,?,?,?)");

putSql("updateTableReloadRequest", "update $(table_reload_request) set reload_select=?, reload_delete_stmt=?, reload_enabled=?, reload_time=?, create_time=?, last_update_by=?, last_update_time=? where source_node_id=? and target_node_id=? and trigger_id=? and router_id=?");

putSql("selectDataEventActionsByIdSql",
" select data_event_action from $(node_group_link) where "
+ " source_node_group_id = ? and target_node_group_id = ? ");
Expand Down
Expand Up @@ -764,9 +764,7 @@ public CsvData next() {

String triggerId = triggerHistory.getTriggerId();

// fyi, this queries the database
TriggerRouter triggerRouter = triggerRouterService.findTriggerRouterById(
triggerId, routerId);
TriggerRouter triggerRouter = triggerRouterService.getTriggerRouterForCurrentNode(triggerId, routerId, false);
SelectFromTableEvent event = new SelectFromTableEvent(targetNode,
triggerRouter, triggerHistory);
this.reloadSource = new SelectFromTableSource(outgoingBatch, batch, event);
Expand Down

0 comments on commit b45f998

Please sign in to comment.