diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java index 6b4dc966f0..a0a164c489 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java @@ -332,7 +332,7 @@ protected void init() { this.bandwidthService = new BandwidthService(parameterService); this.sequenceService = new SequenceService(parameterService, symmetricDialect); this.stagingManager = createStagingManager(); - this.nodeService = new NodeService(parameterService, symmetricDialect, securityService, extensionService); + this.nodeService = new NodeService(this); this.configurationService = new ConfigurationService(parameterService, symmetricDialect, nodeService); this.clusterService = new ClusterService(parameterService, symmetricDialect); @@ -657,7 +657,9 @@ public synchronized boolean start(boolean startJobs) { isInitialized = true; } - jobManager.init(); + if (jobManager != null) { + jobManager.init(); + } nodeService.captureTableMetaInfo(false, parameterService.getTablePrefix()); if (startJobs && jobManager != null) { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java index d478c0eafd..767476a6e9 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java @@ -300,6 +300,7 @@ private ParameterConstants() { public final static String NODE_OFFLINE_ARCHIVE_DIR = "node.offline.archive.dir"; public final static String OFFLINE_NODE_DETECTION_PERIOD_MINUTES = "offline.node.detection.period.minutes"; + public final static String OFFLINE_NODE_DETECTION_RESTART_MINUTES = "offline.node.detection.restart.minutes"; public final static String HEARTBEAT_SYNC_ON_PUSH_PERIOD_SEC = "heartbeat.sync.on.push.period.sec"; public final static String HEARTBEAT_JOB_PERIOD_MS = "job.heartbeat.period.time.ms"; public final static String HEARTBEAT_SYNC_ON_STARTUP = "heartbeat.sync.on.startup"; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeService.java index eda0a4a3bb..5ef641f2b2 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeService.java @@ -39,11 +39,10 @@ import org.jumpmind.db.sql.SqlException; import org.jumpmind.db.sql.UniqueKeyException; import org.jumpmind.db.sql.mapper.StringMapper; -import org.jumpmind.security.ISecurityService; +import org.jumpmind.symmetric.ISymmetricEngine; import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.common.TableConstants; import org.jumpmind.symmetric.config.INodeIdCreator; -import org.jumpmind.symmetric.db.ISymmetricDialect; import org.jumpmind.symmetric.ext.IOfflineServerListener; import org.jumpmind.symmetric.model.NetworkedNode; import org.jumpmind.symmetric.model.Node; @@ -54,7 +53,6 @@ import org.jumpmind.symmetric.security.INodePasswordFilter; import org.jumpmind.symmetric.service.IExtensionService; import org.jumpmind.symmetric.service.INodeService; -import org.jumpmind.symmetric.service.IParameterService; import org.jumpmind.symmetric.util.DefaultNodeIdCreator; import org.jumpmind.util.AppUtils; @@ -63,6 +61,8 @@ */ public class NodeService extends AbstractService implements INodeService { + private ISymmetricEngine engine; + private IExtensionService extensionService; private Node cachedNodeIdentity; @@ -85,11 +85,11 @@ public class NodeService extends AbstractService implements INodeService { private NodeHost nodeHostForCurrentNode = null; - public NodeService(IParameterService parameterService, ISymmetricDialect dialect, ISecurityService securityService, - IExtensionService extensionService) { - super(parameterService, dialect); - this.extensionService = extensionService; - extensionService.addExtensionPoint(new DefaultNodeIdCreator(parameterService, this, securityService)); + public NodeService(ISymmetricEngine engine) { + super(engine.getParameterService(), engine.getSymmetricDialect()); + this.engine = engine; + extensionService = engine.getExtensionService(); + extensionService.addExtensionPoint(new DefaultNodeIdCreator(parameterService, this, engine.getSecurityService())); setSqlMap(new NodeServiceSqlMap(symmetricDialect.getPlatform(), createSqlReplacementTokens())); } @@ -749,8 +749,9 @@ public List findOfflineNodes() { public List findOfflineNodes(long minutesOffline) { List offlineNodeList = new ArrayList(); Node myNode = findIdentity(); + long restartDelayMinutes = parameterService.getLong(ParameterConstants.OFFLINE_NODE_DETECTION_RESTART_MINUTES); - if (myNode != null) { + if (myNode != null && System.currentTimeMillis() - engine.getLastRestartTime().getTime() > restartDelayMinutes * 60000) { long offlineNodeDetectionMillis = minutesOffline * 60 * 1000; List list = sqlTemplate.query(getSql("findNodeHeartbeatsSql")); diff --git a/symmetric-core/src/main/resources/symmetric-default.properties b/symmetric-core/src/main/resources/symmetric-default.properties index 4ee0b40671..7ccc71f08e 100644 --- a/symmetric-core/src/main/resources/symmetric-default.properties +++ b/symmetric-core/src/main/resources/symmetric-default.properties @@ -1464,6 +1464,13 @@ heartbeat.update.node.with.batch.status=false # Tags: other offline.node.detection.period.minutes=-1 +# This is the number of minutes after a node has been restarted that it will begin +# considering remote nodes as offline. This gives remote nodes a chance to send +# their heartbeats. +# DatabaseOverridable: true +# Tags: other +offline.node.detection.restart.minutes=5 + # Enable this property to force a compare of old and new data in triggers. If # old=new, then don't record the change in the data capture table. #