Skip to content

Commit

Permalink
0003075: Sync configuration on upgrade and as periodic job
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Apr 27, 2017
1 parent b5d4c05 commit b56ea77
Show file tree
Hide file tree
Showing 24 changed files with 283 additions and 12 deletions.
Expand Up @@ -68,6 +68,7 @@ public JobManager(ISymmetricEngine engine) {
this.jobs.add(new InitialLoadExtractorJob(engine,taskScheduler));
this.jobs.add(new MonitorJob(engine, taskScheduler));
this.jobs.add(new ReportStatusJob(engine, taskScheduler));
this.jobs.add(new SyncConfigJob(engine, taskScheduler));
}

@Override
Expand Down
@@ -0,0 +1,53 @@
/**
* Licensed to JumpMind Inc under one or more contributor
* license agreements. See the NOTICE file distributed
* with this work for additional information regarding
* copyright ownership. JumpMind Inc licenses this file
* to you under the GNU General Public License, version 3.0 (GPLv3)
* (the "License"); you may not use this file except in compliance
* with the License.
*
* You should have received a copy of the GNU General Public License,
* version 3.0 (GPLv3) along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.jumpmind.symmetric.job;

import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.service.ClusterConstants;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

/*
* Background job that checks to see if config needs to be synced from registration server
*/
public class SyncConfigJob extends AbstractJob {

public SyncConfigJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) {
super("job.sync.config", engine, taskScheduler);
}

@Override
public boolean isAutoStartConfigured() {
return engine.getParameterService().is(ParameterConstants.START_SYNC_CONFIG_JOB);
}

@Override
public String getClusterLockName() {
return ClusterConstants.SYNC_CONFIG;
}

@Override
public void doJob(boolean force) throws Exception {
engine.getPullService().pullConfigData(false);
}

}
Expand Up @@ -618,6 +618,10 @@ public synchronized boolean start(boolean startJobs) {
parameterService.getSyncUrl())) {
heartbeat(false);
}

if (parameterService.is(ParameterConstants.AUTO_SYNC_CONFIG_AT_STARTUP, true)) {
pullService.pullConfigData(false);
}

} else {
log.info("Starting unregistered node [group={}, externalId={}]",
Expand Down
Expand Up @@ -56,6 +56,7 @@ private ParameterConstants() {
public final static String START_ROUTE_JOB = "start.route.job";
public final static String START_HEARTBEAT_JOB = "start.heartbeat.job";
public final static String START_SYNCTRIGGERS_JOB = "start.synctriggers.job";
public final static String START_SYNC_CONFIG_JOB = "start.sync.config.job";
public final static String START_STATISTIC_FLUSH_JOB = "start.stat.flush.job";
public final static String START_STAGE_MGMT_JOB = "start.stage.management.job";
public final static String START_WATCHDOG_JOB = "start.watchdog.job";
Expand Down Expand Up @@ -116,6 +117,7 @@ private ParameterConstants() {
public final static String AUTO_CONFIGURE_DATABASE = "auto.config.database";
public final static String AUTO_SYNC_TRIGGERS = "auto.sync.triggers";
public final static String AUTO_SYNC_TRIGGERS_AT_STARTUP = "auto.sync.triggers.at.startup";
public final static String AUTO_SYNC_CONFIG_AT_STARTUP = "auto.sync.config.at.startup";
public final static String AUTO_SYNC_TRIGGERS_AFTER_CONFIG_CHANGED = "auto.sync.triggers.after.config.change";
public final static String AUTO_SYNC_TRIGGERS_AFTER_CONFIG_LOADED = "auto.sync.triggers.after.config.loaded";
public final static String AUTO_REFRESH_AFTER_CONFIG_CHANGED = "auto.refresh.after.config.changes.detected";
Expand Down
Expand Up @@ -118,6 +118,9 @@ public String afterUpgrade(ISymmetricDialect symmetricDialect, String tablePrefi
engine.getSqlTemplate().update("update " + tablePrefix + "_" + TableConstants.SYM_CHANNEL +
" set max_batch_size = 10000 where reload_flag = 1 and max_batch_size = 10000");
}

engine.getPullService().pullConfigData(false);

return sb.toString();
}

Expand Down
Expand Up @@ -58,6 +58,8 @@ public class Node implements Serializable, Comparable<Node> {
* sync software.
*/
private String schemaVersion;

private String configVersion;

/**
* Record the type of database the node hosts.
Expand Down Expand Up @@ -104,6 +106,7 @@ public Node(IParameterService parameterService, ISymmetricDialect symmetricDiale
setDatabaseVersion(symmetricDialect.getVersion());
setSyncUrl(parameterService.getSyncUrl());
setSchemaVersion(parameterService.getString(ParameterConstants.SCHEMA_VERSION));
setConfigVersion(Version.version());
}

public Node(String nodeId, String syncURL, String version) {
Expand Down Expand Up @@ -146,6 +149,14 @@ public void setSchemaVersion(String version) {
this.schemaVersion = StringUtils.abbreviate(version, MAX_VERSION_SIZE);
}

public String getConfigVersion() {
return configVersion;
}

public void setConfigVersion(String configVersion) {
this.configVersion = configVersion;
}

public boolean isSyncEnabled() {
return syncEnabled;
}
Expand Down
Expand Up @@ -27,7 +27,7 @@ public class ProcessInfoKey implements Serializable {
private static final long serialVersionUID = 1L;

public enum ProcessType {
ANY, PUSH_JOB, PULL_JOB, PUSH_HANDLER, PULL_HANDLER, REST_PULL_HANLDER, OFFLINE_PUSH, OFFLINE_PULL, ROUTER_JOB, INSERT_LOAD_EVENTS, GAP_DETECT, ROUTER_READER, MANUAL_LOAD, FILE_SYNC_PULL_JOB, FILE_SYNC_PUSH_JOB, FILE_SYNC_PULL_HANDLER, FILE_SYNC_PUSH_HANDLER, FILE_SYNC_TRACKER, INITIAL_LOAD_EXTRACT_JOB, FILE_SYNC_INITIAL_LOAD_EXTRACT_JOB;
ANY, PUSH_JOB, PULL_JOB, PUSH_HANDLER, PULL_HANDLER, REST_PULL_HANLDER, OFFLINE_PUSH, OFFLINE_PULL, ROUTER_JOB, INSERT_LOAD_EVENTS, GAP_DETECT, ROUTER_READER, MANUAL_LOAD, FILE_SYNC_PULL_JOB, FILE_SYNC_PUSH_JOB, FILE_SYNC_PULL_HANDLER, FILE_SYNC_PUSH_HANDLER, FILE_SYNC_TRACKER, INITIAL_LOAD_EXTRACT_JOB, FILE_SYNC_INITIAL_LOAD_EXTRACT_JOB, PULL_CONFIG_JOB;

public String toString() {
switch (this) {
Expand All @@ -39,6 +39,8 @@ public String toString() {
return "Database Push";
case PULL_JOB:
return "Database Pull";
case PULL_CONFIG_JOB:
return "Config Pull";
case PUSH_HANDLER:
return "Service Database Push";
case PULL_HANDLER:
Expand Down
Expand Up @@ -39,7 +39,8 @@ public class ClusterConstants {
public static final String PURGE_DATA_GAPS = "PURGE_DATA_GAPS";
public static final String HEARTBEAT = "HEARTBEAT";
public static final String INITIAL_LOAD_EXTRACT = "INITIAL_LOAD_EXTRACT";
public static final String SYNCTRIGGERS = "SYNCTRIGGERS";
public static final String SYNCTRIGGERS = "SYNCTRIGGERS";
public static final String SYNC_CONFIG = "SYNC_CONFIG";
public static final String WATCHDOG = "WATCHDOG";
public static final String STATISTICS = "STATISTICS";
public static final String FILE_SYNC_TRACKER = "FILE_SYNC_TRACKER";
Expand Down
Expand Up @@ -51,6 +51,8 @@ public interface IDataLoaderService {

public List<IncomingBatch> loadDataFromOfflineTransport(Node remote, RemoteNodeStatus status, IIncomingTransport transport) throws IOException;

public void loadDataFromConfig(Node remote, RemoteNodeStatus status, boolean force) throws IOException;

public List<String> getAvailableDataLoaderFactories();

public List<IncomingBatch> loadDataBatch(String batchData);
Expand Down
Expand Up @@ -22,6 +22,7 @@

import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.NodeGroupLinkAction;
import org.jumpmind.symmetric.model.RemoteNodeStatus;
import org.jumpmind.symmetric.model.RemoteNodeStatuses;

/**
Expand All @@ -32,4 +33,6 @@ public interface IPullService extends IOfflineDetectorService {

public RemoteNodeStatuses pullData(boolean force);

public RemoteNodeStatus pullConfigData(boolean force);

}
Expand Up @@ -44,6 +44,7 @@
import static org.jumpmind.symmetric.service.ClusterConstants.TYPE_EXCLUSIVE;
import static org.jumpmind.symmetric.service.ClusterConstants.TYPE_SHARED;
import static org.jumpmind.symmetric.service.ClusterConstants.WATCHDOG;
import static org.jumpmind.symmetric.service.ClusterConstants.SYNC_CONFIG;

import java.util.Collection;
import java.util.Date;
Expand Down Expand Up @@ -71,7 +72,8 @@ public class ClusterService extends AbstractService implements IClusterService {

private static final String[] actions = new String[] { ROUTE, PULL, PUSH, HEARTBEAT, PURGE_INCOMING, PURGE_OUTGOING,
PURGE_STATISTICS, SYNCTRIGGERS, PURGE_DATA_GAPS, STAGE_MANAGEMENT, WATCHDOG, STATISTICS, FILE_SYNC_PULL,
FILE_SYNC_PUSH, FILE_SYNC_TRACKER, FILE_SYNC_SCAN, INITIAL_LOAD_EXTRACT, OFFLINE_PUSH, OFFLINE_PULL, MONITOR };
FILE_SYNC_PUSH, FILE_SYNC_TRACKER, FILE_SYNC_SCAN, INITIAL_LOAD_EXTRACT, OFFLINE_PUSH, OFFLINE_PULL, MONITOR,
SYNC_CONFIG };

private static final String[] sharedActions = new String[] { FILE_SYNC_SHARED };

Expand Down
Expand Up @@ -58,6 +58,7 @@
import org.jumpmind.exception.IoException;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.SymmetricException;
import org.jumpmind.symmetric.Version;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ErrorConstants;
import org.jumpmind.symmetric.common.ParameterConstants;
Expand Down Expand Up @@ -467,6 +468,51 @@ public List<IncomingBatch> loadDataFromOfflineTransport(Node remote, RemoteNodeS
return list;
}

public void loadDataFromConfig(Node remote, RemoteNodeStatus status, boolean force) throws IOException {
if (engine.getParameterService().isRegistrationServer()) {
return;
}
Node local = nodeService.findIdentity();
try {
NodeSecurity localSecurity = nodeService.findNodeSecurity(local.getNodeId(), true);
String configVersion = force ? "" : local.getConfigVersion();

IIncomingTransport transport = engine.getTransportManager().getConfigTransport(remote, local,
localSecurity.getNodePassword(), Version.version(), configVersion, remote.getSyncUrl());

ProcessInfo processInfo = statisticManager.newProcessInfo(new ProcessInfoKey(remote
.getNodeId(), Constants.CHANNEL_CONFIG, local.getNodeId(), ProcessType.PULL_CONFIG_JOB));
try {
log.info("Requesting current configuration {symmetricVersion={}, configVersion={}}",
Version.version(), local.getConfigVersion());
List<IncomingBatch> list = loadDataFromTransport(processInfo, remote, transport, null);
if (containsError(list)) {
processInfo.setStatus(ProcessInfo.Status.ERROR);
} else {
if (list.size() > 0) {
status.updateIncomingStatus(list);
local.setConfigVersion(Version.version());
nodeService.save(local);
}
processInfo.setStatus(ProcessInfo.Status.OK);
}
} catch (RuntimeException e) {
processInfo.setStatus(ProcessInfo.Status.ERROR);
throw e;
} catch (IOException e) {
processInfo.setStatus(ProcessInfo.Status.ERROR);
throw e;
}
} catch (RegistrationRequiredException e) {
log.warn("Failed to pull configuration from node '{}'. It probably is missing a node security record for '{}'.",
remote.getNodeId(), local.getNodeId());
} catch (MalformedURLException e) {
log.error("Could not connect to the {} node's transport because of a bad URL: '{}' {}",
remote.getNodeId(), remote.getSyncUrl(), e);
throw e;
}
}

/**
* Load database from input stream and return a list of batch statuses. This
* is used for a pull request that responds with data, and the
Expand Down
Expand Up @@ -299,11 +299,12 @@ public void save(Node node) {
node.getSymmetricVersion(), node.getSyncUrl(), new Date(),
node.isSyncEnabled() ? 1 : 0, AppUtils.getTimezoneOffset(),
node.getBatchToSendCount(), node.getBatchInErrorCount(),
node.getCreatedAtNodeId(), node.getDeploymentType(), node.getNodeId() },
node.getCreatedAtNodeId(), node.getDeploymentType(),
node.getConfigVersion(), node.getNodeId() },
new int[] { Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR,
Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP,
Types.INTEGER, Types.VARCHAR, Types.INTEGER, Types.INTEGER, Types.VARCHAR,
Types.VARCHAR, Types.VARCHAR });
Types.VARCHAR, Types.VARCHAR, Types.VARCHAR });
flushNodeGroupCache();
}
}
Expand All @@ -316,11 +317,12 @@ public boolean updateNode(Node node) {
node.getSymmetricVersion(), node.getSyncUrl(), new Date(),
node.isSyncEnabled() ? 1 : 0, AppUtils.getTimezoneOffset(),
node.getBatchToSendCount(), node.getBatchInErrorCount(),
node.getCreatedAtNodeId(), node.getDeploymentType(), node.getNodeId() },
node.getCreatedAtNodeId(), node.getDeploymentType(),
node.getConfigVersion(), node.getNodeId() },
new int[] { Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR,
Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP,
Types.INTEGER, Types.VARCHAR, Types.INTEGER, Types.INTEGER, Types.VARCHAR,
Types.VARCHAR, Types.VARCHAR }) == 1;
Types.VARCHAR, Types.VARCHAR, Types.VARCHAR }) == 1;
return updated;
}

Expand Down Expand Up @@ -881,6 +883,7 @@ public Node mapRow(Row rs) {
node.setBatchToSendCount(rs.getInt("batch_to_send_count"));
node.setBatchInErrorCount(rs.getInt("batch_in_error_count"));
node.setDeploymentType(rs.getString("deployment_type"));
node.setConfigVersion(rs.getString("config_version"));
return node;
}
}
Expand Down
Expand Up @@ -52,12 +52,13 @@ public NodeServiceSqlMap(IDatabasePlatform platform, Map<String, String> replace
putSql("insertNodeSql",
"insert into $(node) (node_group_id, external_id, database_type, database_version, schema_version, symmetric_version, sync_url," +
"heartbeat_time, sync_enabled, timezone_offset, batch_to_send_count, batch_in_error_count, created_at_node_id, " +
"deployment_type, node_id) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)");
"deployment_type, config_version, node_id) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)");

putSql("updateNodeSql",
"update $(node) set node_group_id=?, external_id=?, database_type=?, "
+ " database_version=?, schema_version=?, symmetric_version=?, sync_url=?, heartbeat_time=?, "
+ " sync_enabled=?, timezone_offset=?, batch_to_send_count=?, batch_in_error_count=?, created_at_node_id=?, deployment_type=? where node_id = ? ");
+ " sync_enabled=?, timezone_offset=?, batch_to_send_count=?, batch_in_error_count=?, "
+ " created_at_node_id=?, deployment_type=?, config_version = ? where node_id = ?");

putSql("findNodeSql", "where node_id = ? ");

Expand Down Expand Up @@ -151,7 +152,7 @@ public NodeServiceSqlMap(IDatabasePlatform platform, Map<String, String> replace

putSql("selectNodePrefixSql",
"select c.node_id, c.node_group_id, c.external_id, c.sync_enabled, c.sync_url, "
+ " c.schema_version, c.database_type, c.database_version, c.symmetric_version, c.created_at_node_id, c.heartbeat_time, c.timezone_offset, c.batch_to_send_count, c.batch_in_error_count, c.deployment_type from "
+ " c.schema_version, c.database_type, c.database_version, c.symmetric_version, c.created_at_node_id, c.heartbeat_time, c.timezone_offset, c.batch_to_send_count, c.batch_in_error_count, c.deployment_type, c.config_version from "
+ " $(node) c ");

putSql("updateNodeSecuritySql",
Expand Down
Expand Up @@ -23,6 +23,8 @@
import java.util.List;

import org.apache.commons.lang.StringUtils;
import org.jumpmind.symmetric.Version;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.model.Node;
Expand Down Expand Up @@ -154,5 +156,22 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status
}
}

public RemoteNodeStatus pullConfigData(boolean force) {
Node local = nodeService.findIdentity();
RemoteNodeStatus status = null;

if (!parameterService.isRegistrationServer() && (force || !Version.version().equals(local.getConfigVersion()))) {
Node remote = new Node();
remote.setSyncUrl(parameterService.getRegistrationUrl());
status = new RemoteNodeStatus(remote.getNodeId(), Constants.CHANNEL_CONFIG, configurationService.getChannels(false));

try {
dataLoaderService.loadDataFromConfig(remote, status, force);
} catch (Exception e) {
fireOffline(e, remote, status);
}
}
return status;
}

}
Expand Up @@ -51,7 +51,10 @@ public IOutgoingWithResponseTransport getFilePushTransport(Node remote, Node loc
public IOutgoingWithResponseTransport getPushTransport(Node remote, Node local, String securityToken, Map<String,String> requestProperties, String registrationUrl) throws IOException;

public IIncomingTransport getRegisterTransport(Node node, String registrationUrl) throws IOException;


public IIncomingTransport getConfigTransport(Node remote, Node local, String securityToken,
String symmetricVersion, String configVersion, String registrationUrl) throws IOException;

/**
* This is the proper way to determine the URL for a node. It delegates to configured
* extension points when necessary to take in to account custom load balancing and
Expand Down

0 comments on commit b56ea77

Please sign in to comment.