diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/JobManager.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/JobManager.java index 9aa47c46f6..94f09c39c8 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/JobManager.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/JobManager.java @@ -68,6 +68,7 @@ public JobManager(ISymmetricEngine engine) { this.jobs.add(new InitialLoadExtractorJob(engine,taskScheduler)); this.jobs.add(new MonitorJob(engine, taskScheduler)); this.jobs.add(new ReportStatusJob(engine, taskScheduler)); + this.jobs.add(new SyncConfigJob(engine, taskScheduler)); } @Override diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/SyncConfigJob.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/SyncConfigJob.java new file mode 100644 index 0000000000..ada2dfa5fc --- /dev/null +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/SyncConfigJob.java @@ -0,0 +1,53 @@ +/** + * Licensed to JumpMind Inc under one or more contributor + * license agreements. See the NOTICE file distributed + * with this work for additional information regarding + * copyright ownership. JumpMind Inc licenses this file + * to you under the GNU General Public License, version 3.0 (GPLv3) + * (the "License"); you may not use this file except in compliance + * with the License. + * + * You should have received a copy of the GNU General Public License, + * version 3.0 (GPLv3) along with this library; if not, see + * . + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.jumpmind.symmetric.job; + +import org.jumpmind.symmetric.ISymmetricEngine; +import org.jumpmind.symmetric.common.ParameterConstants; +import org.jumpmind.symmetric.service.ClusterConstants; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; + +/* + * Background job that checks to see if config needs to be synced from registration server + */ +public class SyncConfigJob extends AbstractJob { + + public SyncConfigJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) { + super("job.sync.config", engine, taskScheduler); + } + + @Override + public boolean isAutoStartConfigured() { + return engine.getParameterService().is(ParameterConstants.START_SYNC_CONFIG_JOB); + } + + @Override + public String getClusterLockName() { + return ClusterConstants.SYNC_CONFIG; + } + + @Override + public void doJob(boolean force) throws Exception { + engine.getPullService().pullConfigData(false); + } + +} \ No newline at end of file diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java index 9eaaaab487..5f468ec2f1 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java @@ -618,6 +618,10 @@ public synchronized boolean start(boolean startJobs) { parameterService.getSyncUrl())) { heartbeat(false); } + + if (parameterService.is(ParameterConstants.AUTO_SYNC_CONFIG_AT_STARTUP, true)) { + pullService.pullConfigData(false); + } } else { log.info("Starting unregistered node [group={}, externalId={}]", diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java index d223fc23b3..4008c7a3c8 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java @@ -56,6 +56,7 @@ private ParameterConstants() { public final static String START_ROUTE_JOB = "start.route.job"; public final static String START_HEARTBEAT_JOB = "start.heartbeat.job"; public final static String START_SYNCTRIGGERS_JOB = "start.synctriggers.job"; + public final static String START_SYNC_CONFIG_JOB = "start.sync.config.job"; public final static String START_STATISTIC_FLUSH_JOB = "start.stat.flush.job"; public final static String START_STAGE_MGMT_JOB = "start.stage.management.job"; public final static String START_WATCHDOG_JOB = "start.watchdog.job"; @@ -116,6 +117,7 @@ private ParameterConstants() { public final static String AUTO_CONFIGURE_DATABASE = "auto.config.database"; public final static String AUTO_SYNC_TRIGGERS = "auto.sync.triggers"; public final static String AUTO_SYNC_TRIGGERS_AT_STARTUP = "auto.sync.triggers.at.startup"; + public final static String AUTO_SYNC_CONFIG_AT_STARTUP = "auto.sync.config.at.startup"; public final static String AUTO_SYNC_TRIGGERS_AFTER_CONFIG_CHANGED = "auto.sync.triggers.after.config.change"; public final static String AUTO_SYNC_TRIGGERS_AFTER_CONFIG_LOADED = "auto.sync.triggers.after.config.loaded"; public final static String AUTO_REFRESH_AFTER_CONFIG_CHANGED = "auto.refresh.after.config.changes.detected"; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/db/DatabaseUpgradeListener.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/db/DatabaseUpgradeListener.java index 10570830b7..a925e13e46 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/db/DatabaseUpgradeListener.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/db/DatabaseUpgradeListener.java @@ -118,6 +118,9 @@ public String afterUpgrade(ISymmetricDialect symmetricDialect, String tablePrefi engine.getSqlTemplate().update("update " + tablePrefix + "_" + TableConstants.SYM_CHANNEL + " set max_batch_size = 10000 where reload_flag = 1 and max_batch_size = 10000"); } + + engine.getPullService().pullConfigData(false); + return sb.toString(); } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Node.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Node.java index 2700d582d3..990be9389d 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Node.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Node.java @@ -58,6 +58,8 @@ public class Node implements Serializable, Comparable { * sync software. */ private String schemaVersion; + + private String configVersion; /** * Record the type of database the node hosts. @@ -104,6 +106,7 @@ public Node(IParameterService parameterService, ISymmetricDialect symmetricDiale setDatabaseVersion(symmetricDialect.getVersion()); setSyncUrl(parameterService.getSyncUrl()); setSchemaVersion(parameterService.getString(ParameterConstants.SCHEMA_VERSION)); + setConfigVersion(Version.version()); } public Node(String nodeId, String syncURL, String version) { @@ -146,6 +149,14 @@ public void setSchemaVersion(String version) { this.schemaVersion = StringUtils.abbreviate(version, MAX_VERSION_SIZE); } + public String getConfigVersion() { + return configVersion; + } + + public void setConfigVersion(String configVersion) { + this.configVersion = configVersion; + } + public boolean isSyncEnabled() { return syncEnabled; } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ProcessInfoKey.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ProcessInfoKey.java index 678efd7eeb..7414fd6d3d 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ProcessInfoKey.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ProcessInfoKey.java @@ -27,7 +27,7 @@ public class ProcessInfoKey implements Serializable { private static final long serialVersionUID = 1L; public enum ProcessType { - ANY, PUSH_JOB, PULL_JOB, PUSH_HANDLER, PULL_HANDLER, REST_PULL_HANLDER, OFFLINE_PUSH, OFFLINE_PULL, ROUTER_JOB, INSERT_LOAD_EVENTS, GAP_DETECT, ROUTER_READER, MANUAL_LOAD, FILE_SYNC_PULL_JOB, FILE_SYNC_PUSH_JOB, FILE_SYNC_PULL_HANDLER, FILE_SYNC_PUSH_HANDLER, FILE_SYNC_TRACKER, INITIAL_LOAD_EXTRACT_JOB, FILE_SYNC_INITIAL_LOAD_EXTRACT_JOB; + ANY, PUSH_JOB, PULL_JOB, PUSH_HANDLER, PULL_HANDLER, REST_PULL_HANLDER, OFFLINE_PUSH, OFFLINE_PULL, ROUTER_JOB, INSERT_LOAD_EVENTS, GAP_DETECT, ROUTER_READER, MANUAL_LOAD, FILE_SYNC_PULL_JOB, FILE_SYNC_PUSH_JOB, FILE_SYNC_PULL_HANDLER, FILE_SYNC_PUSH_HANDLER, FILE_SYNC_TRACKER, INITIAL_LOAD_EXTRACT_JOB, FILE_SYNC_INITIAL_LOAD_EXTRACT_JOB, PULL_CONFIG_JOB; public String toString() { switch (this) { @@ -39,6 +39,8 @@ public String toString() { return "Database Push"; case PULL_JOB: return "Database Pull"; + case PULL_CONFIG_JOB: + return "Config Pull"; case PUSH_HANDLER: return "Service Database Push"; case PULL_HANDLER: diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/ClusterConstants.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/ClusterConstants.java index 0e1194920f..ae538cc004 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/ClusterConstants.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/ClusterConstants.java @@ -39,7 +39,8 @@ public class ClusterConstants { public static final String PURGE_DATA_GAPS = "PURGE_DATA_GAPS"; public static final String HEARTBEAT = "HEARTBEAT"; public static final String INITIAL_LOAD_EXTRACT = "INITIAL_LOAD_EXTRACT"; - public static final String SYNCTRIGGERS = "SYNCTRIGGERS"; + public static final String SYNCTRIGGERS = "SYNCTRIGGERS"; + public static final String SYNC_CONFIG = "SYNC_CONFIG"; public static final String WATCHDOG = "WATCHDOG"; public static final String STATISTICS = "STATISTICS"; public static final String FILE_SYNC_TRACKER = "FILE_SYNC_TRACKER"; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataLoaderService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataLoaderService.java index 50d28aed8a..a8ddee50b8 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataLoaderService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataLoaderService.java @@ -51,6 +51,8 @@ public interface IDataLoaderService { public List loadDataFromOfflineTransport(Node remote, RemoteNodeStatus status, IIncomingTransport transport) throws IOException; + public void loadDataFromConfig(Node remote, RemoteNodeStatus status, boolean force) throws IOException; + public List getAvailableDataLoaderFactories(); public List loadDataBatch(String batchData); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IPullService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IPullService.java index 7fc3db6c6c..e8457f95fe 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IPullService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IPullService.java @@ -22,6 +22,7 @@ import org.jumpmind.symmetric.model.Node; import org.jumpmind.symmetric.model.NodeGroupLinkAction; +import org.jumpmind.symmetric.model.RemoteNodeStatus; import org.jumpmind.symmetric.model.RemoteNodeStatuses; /** @@ -32,4 +33,6 @@ public interface IPullService extends IOfflineDetectorService { public RemoteNodeStatuses pullData(boolean force); + public RemoteNodeStatus pullConfigData(boolean force); + } \ No newline at end of file diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ClusterService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ClusterService.java index c1b378d7c6..7884458cf1 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ClusterService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ClusterService.java @@ -44,6 +44,7 @@ import static org.jumpmind.symmetric.service.ClusterConstants.TYPE_EXCLUSIVE; import static org.jumpmind.symmetric.service.ClusterConstants.TYPE_SHARED; import static org.jumpmind.symmetric.service.ClusterConstants.WATCHDOG; +import static org.jumpmind.symmetric.service.ClusterConstants.SYNC_CONFIG; import java.util.Collection; import java.util.Date; @@ -71,7 +72,8 @@ public class ClusterService extends AbstractService implements IClusterService { private static final String[] actions = new String[] { ROUTE, PULL, PUSH, HEARTBEAT, PURGE_INCOMING, PURGE_OUTGOING, PURGE_STATISTICS, SYNCTRIGGERS, PURGE_DATA_GAPS, STAGE_MANAGEMENT, WATCHDOG, STATISTICS, FILE_SYNC_PULL, - FILE_SYNC_PUSH, FILE_SYNC_TRACKER, FILE_SYNC_SCAN, INITIAL_LOAD_EXTRACT, OFFLINE_PUSH, OFFLINE_PULL, MONITOR }; + FILE_SYNC_PUSH, FILE_SYNC_TRACKER, FILE_SYNC_SCAN, INITIAL_LOAD_EXTRACT, OFFLINE_PUSH, OFFLINE_PULL, MONITOR, + SYNC_CONFIG }; private static final String[] sharedActions = new String[] { FILE_SYNC_SHARED }; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java index ac7b190ef2..26192c25af 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java @@ -58,6 +58,7 @@ import org.jumpmind.exception.IoException; import org.jumpmind.symmetric.ISymmetricEngine; import org.jumpmind.symmetric.SymmetricException; +import org.jumpmind.symmetric.Version; import org.jumpmind.symmetric.common.Constants; import org.jumpmind.symmetric.common.ErrorConstants; import org.jumpmind.symmetric.common.ParameterConstants; @@ -467,6 +468,51 @@ public List loadDataFromOfflineTransport(Node remote, RemoteNodeS return list; } + public void loadDataFromConfig(Node remote, RemoteNodeStatus status, boolean force) throws IOException { + if (engine.getParameterService().isRegistrationServer()) { + return; + } + Node local = nodeService.findIdentity(); + try { + NodeSecurity localSecurity = nodeService.findNodeSecurity(local.getNodeId(), true); + String configVersion = force ? "" : local.getConfigVersion(); + + IIncomingTransport transport = engine.getTransportManager().getConfigTransport(remote, local, + localSecurity.getNodePassword(), Version.version(), configVersion, remote.getSyncUrl()); + + ProcessInfo processInfo = statisticManager.newProcessInfo(new ProcessInfoKey(remote + .getNodeId(), Constants.CHANNEL_CONFIG, local.getNodeId(), ProcessType.PULL_CONFIG_JOB)); + try { + log.info("Requesting current configuration {symmetricVersion={}, configVersion={}}", + Version.version(), local.getConfigVersion()); + List list = loadDataFromTransport(processInfo, remote, transport, null); + if (containsError(list)) { + processInfo.setStatus(ProcessInfo.Status.ERROR); + } else { + if (list.size() > 0) { + status.updateIncomingStatus(list); + local.setConfigVersion(Version.version()); + nodeService.save(local); + } + processInfo.setStatus(ProcessInfo.Status.OK); + } + } catch (RuntimeException e) { + processInfo.setStatus(ProcessInfo.Status.ERROR); + throw e; + } catch (IOException e) { + processInfo.setStatus(ProcessInfo.Status.ERROR); + throw e; + } + } catch (RegistrationRequiredException e) { + log.warn("Failed to pull configuration from node '{}'. It probably is missing a node security record for '{}'.", + remote.getNodeId(), local.getNodeId()); + } catch (MalformedURLException e) { + log.error("Could not connect to the {} node's transport because of a bad URL: '{}' {}", + remote.getNodeId(), remote.getSyncUrl(), e); + throw e; + } + } + /** * Load database from input stream and return a list of batch statuses. This * is used for a pull request that responds with data, and the diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeService.java index 6eafce9fe4..a3793f80d6 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeService.java @@ -299,11 +299,12 @@ public void save(Node node) { node.getSymmetricVersion(), node.getSyncUrl(), new Date(), node.isSyncEnabled() ? 1 : 0, AppUtils.getTimezoneOffset(), node.getBatchToSendCount(), node.getBatchInErrorCount(), - node.getCreatedAtNodeId(), node.getDeploymentType(), node.getNodeId() }, + node.getCreatedAtNodeId(), node.getDeploymentType(), + node.getConfigVersion(), node.getNodeId() }, new int[] { Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.INTEGER, Types.VARCHAR, Types.INTEGER, Types.INTEGER, Types.VARCHAR, - Types.VARCHAR, Types.VARCHAR }); + Types.VARCHAR, Types.VARCHAR, Types.VARCHAR }); flushNodeGroupCache(); } } @@ -316,11 +317,12 @@ public boolean updateNode(Node node) { node.getSymmetricVersion(), node.getSyncUrl(), new Date(), node.isSyncEnabled() ? 1 : 0, AppUtils.getTimezoneOffset(), node.getBatchToSendCount(), node.getBatchInErrorCount(), - node.getCreatedAtNodeId(), node.getDeploymentType(), node.getNodeId() }, + node.getCreatedAtNodeId(), node.getDeploymentType(), + node.getConfigVersion(), node.getNodeId() }, new int[] { Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.INTEGER, Types.VARCHAR, Types.INTEGER, Types.INTEGER, Types.VARCHAR, - Types.VARCHAR, Types.VARCHAR }) == 1; + Types.VARCHAR, Types.VARCHAR, Types.VARCHAR }) == 1; return updated; } @@ -881,6 +883,7 @@ public Node mapRow(Row rs) { node.setBatchToSendCount(rs.getInt("batch_to_send_count")); node.setBatchInErrorCount(rs.getInt("batch_in_error_count")); node.setDeploymentType(rs.getString("deployment_type")); + node.setConfigVersion(rs.getString("config_version")); return node; } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeServiceSqlMap.java index f94debf5eb..0587c9285a 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeServiceSqlMap.java @@ -52,12 +52,13 @@ public NodeServiceSqlMap(IDatabasePlatform platform, Map replace putSql("insertNodeSql", "insert into $(node) (node_group_id, external_id, database_type, database_version, schema_version, symmetric_version, sync_url," + "heartbeat_time, sync_enabled, timezone_offset, batch_to_send_count, batch_in_error_count, created_at_node_id, " + - "deployment_type, node_id) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"); + "deployment_type, config_version, node_id) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"); putSql("updateNodeSql", "update $(node) set node_group_id=?, external_id=?, database_type=?, " + " database_version=?, schema_version=?, symmetric_version=?, sync_url=?, heartbeat_time=?, " - + " sync_enabled=?, timezone_offset=?, batch_to_send_count=?, batch_in_error_count=?, created_at_node_id=?, deployment_type=? where node_id = ? "); + + " sync_enabled=?, timezone_offset=?, batch_to_send_count=?, batch_in_error_count=?, " + + " created_at_node_id=?, deployment_type=?, config_version = ? where node_id = ?"); putSql("findNodeSql", "where node_id = ? "); @@ -151,7 +152,7 @@ public NodeServiceSqlMap(IDatabasePlatform platform, Map replace putSql("selectNodePrefixSql", "select c.node_id, c.node_group_id, c.external_id, c.sync_enabled, c.sync_url, " - + " c.schema_version, c.database_type, c.database_version, c.symmetric_version, c.created_at_node_id, c.heartbeat_time, c.timezone_offset, c.batch_to_send_count, c.batch_in_error_count, c.deployment_type from " + + " c.schema_version, c.database_type, c.database_version, c.symmetric_version, c.created_at_node_id, c.heartbeat_time, c.timezone_offset, c.batch_to_send_count, c.batch_in_error_count, c.deployment_type, c.config_version from " + " $(node) c "); putSql("updateNodeSecuritySql", diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PullService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PullService.java index 8e06242efe..e0e2d2d813 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PullService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PullService.java @@ -23,6 +23,8 @@ import java.util.List; import org.apache.commons.lang.StringUtils; +import org.jumpmind.symmetric.Version; +import org.jumpmind.symmetric.common.Constants; import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.db.ISymmetricDialect; import org.jumpmind.symmetric.model.Node; @@ -154,5 +156,22 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status } } + public RemoteNodeStatus pullConfigData(boolean force) { + Node local = nodeService.findIdentity(); + RemoteNodeStatus status = null; + + if (!parameterService.isRegistrationServer() && (force || !Version.version().equals(local.getConfigVersion()))) { + Node remote = new Node(); + remote.setSyncUrl(parameterService.getRegistrationUrl()); + status = new RemoteNodeStatus(remote.getNodeId(), Constants.CHANNEL_CONFIG, configurationService.getChannels(false)); + + try { + dataLoaderService.loadDataFromConfig(remote, status, force); + } catch (Exception e) { + fireOffline(e, remote, status); + } + } + return status; + } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/ITransportManager.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/ITransportManager.java index c87b323cfe..991f6d5d9e 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/ITransportManager.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/ITransportManager.java @@ -51,7 +51,10 @@ public IOutgoingWithResponseTransport getFilePushTransport(Node remote, Node loc public IOutgoingWithResponseTransport getPushTransport(Node remote, Node local, String securityToken, Map requestProperties, String registrationUrl) throws IOException; public IIncomingTransport getRegisterTransport(Node node, String registrationUrl) throws IOException; - + + public IIncomingTransport getConfigTransport(Node remote, Node local, String securityToken, + String symmetricVersion, String configVersion, String registrationUrl) throws IOException; + /** * This is the proper way to determine the URL for a node. It delegates to configured * extension points when necessary to take in to account custom load balancing and diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/http/HttpTransportManager.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/http/HttpTransportManager.java index f5e29a0704..cf493a50e3 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/http/HttpTransportManager.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/http/HttpTransportManager.java @@ -245,6 +245,16 @@ public IOutgoingWithResponseTransport getFilePushTransport(Node remote, Node loc getBasicAuthPassword(), isOutputStreamEnabled(), getOutputStreamSize(), true); } + public IIncomingTransport getConfigTransport(Node remote, Node local, String securityToken, + String symmetricVersion, String configVersion, String registrationUrl) throws IOException { + StringBuilder builder = new StringBuilder(buildURL("config", remote, local, + securityToken, registrationUrl)); + append(builder, WebConstants.SYMMETRIC_VERSION, symmetricVersion); + append(builder, WebConstants.CONFIG_VERSION, configVersion); + HttpURLConnection conn = createGetConnectionFor(new URL(builder.toString())); + return new HttpIncomingTransport(conn, engine.getParameterService()); + } + public IIncomingTransport getRegisterTransport(Node node, String registrationUrl) throws IOException { return new HttpIncomingTransport(createGetConnectionFor(new URL(buildRegistrationUrl( diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/internal/InternalTransportManager.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/internal/InternalTransportManager.java index 154f421840..b6889ddf49 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/internal/InternalTransportManager.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/internal/InternalTransportManager.java @@ -246,5 +246,11 @@ interface IClientRunnable { public void run(ISymmetricEngine engine, InputStream is, OutputStream os) throws Exception; } + @Override + public IIncomingTransport getConfigTransport(Node remote, Node local, String securityToken, + String symmetricVersion, String configVersion, String registrationUrl) throws IOException { + return null; + } + } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/web/WebConstants.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/web/WebConstants.java index 1edaa5247a..d6b0edd4d8 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/web/WebConstants.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/web/WebConstants.java @@ -114,5 +114,7 @@ public class WebConstants { public static final String BATCH_TO_SEND_COUNT = "Batch-To-Send-Count"; public static final String THREAD_CHANNEL = "threadChannel"; + + public static final String CONFIG_VERSION = "configVersion"; } \ No newline at end of file diff --git a/symmetric-core/src/main/resources/symmetric-default.properties b/symmetric-core/src/main/resources/symmetric-default.properties index bac19134b9..c12bb2b6c1 100644 --- a/symmetric-core/src/main/resources/symmetric-default.properties +++ b/symmetric-core/src/main/resources/symmetric-default.properties @@ -419,6 +419,13 @@ auto.sync.triggers.after.config.loaded=true # Type: boolean auto.sync.triggers.at.startup=true +# If this is true, then check if configuration should be pulled from registration server at startup. +# If the config version in the database does not match the software version, it will pull config. +# +# Tags: general +# Type: boolean +auto.sync.config.at.startup=true + # Capture and send SymmetricDS configuration changes to client nodes. # # Tags: general @@ -779,6 +786,12 @@ job.purge.outgoing.cron=0 0 0 * * * # Tags: jobs job.synctriggers.cron=0 0 0 * * * +# This is when the sync config job will run. +# +# DatabaseOverridable: true +# Tags: jobs +job.sync.config.cron=0 0/10 1 * * * + # This is when the refresh cache job will run. # # DatabaseOverridable: true @@ -862,6 +875,14 @@ start.heartbeat.job=true # Type: boolean start.synctriggers.job=true +# Whether the sync config job is enabled for this node. +# This job checks that the configuration version matches the software version, otherwise it +# will pull the latest configuration from the registration server. +# +# Tags: jobs +# Type: boolean +start.sync.config.job=true + # Whether the statistic flush job is enabled for this node. # # Tags: jobs diff --git a/symmetric-core/src/main/resources/symmetric-schema.xml b/symmetric-core/src/main/resources/symmetric-schema.xml index 6616080f8f..72ea659f96 100644 --- a/symmetric-core/src/main/resources/symmetric-schema.xml +++ b/symmetric-core/src/main/resources/symmetric-schema.xml @@ -334,6 +334,7 @@ + diff --git a/symmetric-core/src/test/java/org/jumpmind/symmetric/transport/MockTransportManager.java b/symmetric-core/src/test/java/org/jumpmind/symmetric/transport/MockTransportManager.java index 09094acb24..59b84e359b 100644 --- a/symmetric-core/src/test/java/org/jumpmind/symmetric/transport/MockTransportManager.java +++ b/symmetric-core/src/test/java/org/jumpmind/symmetric/transport/MockTransportManager.java @@ -123,6 +123,10 @@ public IOutgoingWithResponseTransport getPushTransport(Node remote, Node local, return outgoingTransport; } - + @Override + public IIncomingTransport getConfigTransport(Node remote, Node local, String securityToken, + String symmetricVersion, String configVersion, String registrationUrl) throws IOException { + return null; + } } \ No newline at end of file diff --git a/symmetric-server/src/main/java/org/jumpmind/symmetric/web/ConfigurationUriHandler.java b/symmetric-server/src/main/java/org/jumpmind/symmetric/web/ConfigurationUriHandler.java new file mode 100644 index 0000000000..9c60f1f366 --- /dev/null +++ b/symmetric-server/src/main/java/org/jumpmind/symmetric/web/ConfigurationUriHandler.java @@ -0,0 +1,69 @@ +/** + * Licensed to JumpMind Inc under one or more contributor + * license agreements. See the NOTICE file distributed + * with this work for additional information regarding + * copyright ownership. JumpMind Inc licenses this file + * to you under the GNU General Public License, version 3.0 (GPLv3) + * (the "License"); you may not use this file except in compliance + * with the License. + * + * You should have received a copy of the GNU General Public License, + * version 3.0 (GPLv3) along with this library; if not, see + * . + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.jumpmind.symmetric.web; + +import java.io.IOException; +import java.io.OutputStream; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.commons.lang.StringUtils; +import org.jumpmind.symmetric.Version; +import org.jumpmind.symmetric.model.Node; +import org.jumpmind.symmetric.service.IDataExtractorService; +import org.jumpmind.symmetric.service.IParameterService; + +/** + * Allow a node to pull the configuration. + * The symmetricVersion parameter is the version number of the software, which is used to filter configuration that is sent. + * The configVersion parameter is the current version number of the configuration that will be upgraded. + */ +public class ConfigurationUriHandler extends AbstractUriHandler { + + private IDataExtractorService dataExtractorService; + + public ConfigurationUriHandler(IParameterService parameterService, + IDataExtractorService dataExtractorService, IInterceptor... interceptors) { + super("/config/*", parameterService, interceptors); + this.dataExtractorService = dataExtractorService; + } + + public void handle(HttpServletRequest req, HttpServletResponse res) throws IOException, ServletException { + Node remoteNode = new Node(); + String symVersion = ServletUtils.getParameter(req, WebConstants.SYMMETRIC_VERSION); + String configVersion = ServletUtils.getParameter(req, WebConstants.CONFIG_VERSION); + remoteNode.setNodeId(ServletUtils.getParameter(req, WebConstants.NODE_ID)); + remoteNode.setSymmetricVersion(symVersion); + remoteNode.setConfigVersion(configVersion); + log.info("Configuration request from node ID " + remoteNode.getNodeId() + " {symmetricVersion={}, configVersion={}}", + symVersion, configVersion); + + // TODO + //if (StringUtils.isBlank(configVersion) || Version.isOlderThanVersion(configVersion, Version.version())) { + if (StringUtils.isBlank(configVersion) || Version.isOlderThanVersion(configVersion, "3.8.22")) { + OutputStream outputStream = res.getOutputStream(); + dataExtractorService.extractConfigurationStandalone(remoteNode, outputStream); + } + } + +} diff --git a/symmetric-server/src/main/java/org/jumpmind/symmetric/web/ServerSymmetricEngine.java b/symmetric-server/src/main/java/org/jumpmind/symmetric/web/ServerSymmetricEngine.java index 32312d5b7b..3357f9fa90 100644 --- a/symmetric-server/src/main/java/org/jumpmind/symmetric/web/ServerSymmetricEngine.java +++ b/symmetric-server/src/main/java/org/jumpmind/symmetric/web/ServerSymmetricEngine.java @@ -85,6 +85,8 @@ protected void init() { concurrencyInterceptor, authInterceptor)); this.uriHandlers.add(new RegistrationUriHandler(parameterService, registrationService, concurrencyInterceptor)); + this.uriHandlers.add(new ConfigurationUriHandler(parameterService, dataExtractorService, + concurrencyInterceptor, authInterceptor)); this.uriHandlers.add(new FileSyncPullUriHandler(this, concurrencyInterceptor, authInterceptor)); this.uriHandlers.add(new FileSyncPushUriHandler(this, concurrencyInterceptor, authInterceptor)); this.uriHandlers.add(new CopyNodeUriHandler(this, authInterceptor));