Skip to content

Commit

Permalink
0002796: File Channels are missing when first enabling file sync.
Browse files Browse the repository at this point in the history
- also did a little cleanup in the jobs to support properly starting up
the file sync jobs when the file.sync.enabled parameter changes (and
similar for other jobs driven by the parameters).
  • Loading branch information
mmichalek committed Sep 16, 2016
1 parent 078f52b commit 5b6446c
Show file tree
Hide file tree
Showing 25 changed files with 313 additions and 154 deletions.
Expand Up @@ -191,7 +191,7 @@ public boolean invoke(boolean force) {
}

if (parameterService.is(ParameterConstants.FILE_SYNC_ENABLE)
&& parameterService.is("start.file.sync.tracker.job")
&& parameterService.is(ParameterConstants.START_FILE_SYNC_TRACKER_JOB)
&& parameterService.getLong("job.file.sync.tracker.period.time.ms", 5000) < (System
.currentTimeMillis() - lastFileSyncTrackerTime)) {
try {
Expand All @@ -205,7 +205,7 @@ public boolean invoke(boolean force) {
}

if (parameterService.is(ParameterConstants.FILE_SYNC_ENABLE)
&& parameterService.is("start.file.sync.pull.job")
&& parameterService.is(ParameterConstants.START_FILE_SYNC_PULL_JOB)
&& parameterService.getLong("job.file.sync.pull.period.time.ms", 60000) < (System
.currentTimeMillis() - lastFileSyncPullTime)) {
try {
Expand All @@ -219,7 +219,7 @@ public boolean invoke(boolean force) {
}

if (parameterService.is(ParameterConstants.FILE_SYNC_ENABLE)
&& parameterService.is("start.file.sync.push.job")
&& parameterService.is(ParameterConstants.START_FILE_SYNC_PUSH_JOB)
&& parameterService.getLong("job.file.sync.push.period.time.ms", 60000) < (System
.currentTimeMillis() - lastFileSyncPushTime)) {
try {
Expand Down
Expand Up @@ -47,8 +47,6 @@ abstract public class AbstractJob implements Runnable, IJob {

private String jobName;

private boolean requiresRegistration = true;

private boolean paused = false;

private Date lastFinishTime;
Expand All @@ -71,24 +69,21 @@ abstract public class AbstractJob implements Runnable, IJob {

private RandomTimeSlot randomTimeSlot;

private boolean autoStartConfigured;

protected ISymmetricEngine engine;

protected AbstractJob(String jobName, boolean requiresRegistration, boolean autoStartRequired,
ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) {

protected AbstractJob(String jobName, ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) {
this.engine = engine;
this.taskScheduler = taskScheduler;
this.jobName = jobName;
this.requiresRegistration = requiresRegistration;
this.autoStartConfigured = autoStartRequired;
IParameterService parameterService = engine.getParameterService();
this.randomTimeSlot = new RandomTimeSlot(parameterService.getExternalId(),
parameterService.getInt(ParameterConstants.JOB_RANDOM_MAX_START_TIME_MS));
parameterService.getInt(ParameterConstants.JOB_RANDOM_MAX_START_TIME_MS));
}

public boolean isAutoStartConfigured() {
return autoStartConfigured;
@Deprecated
protected AbstractJob(String jobName, boolean requiresRegistration, boolean autoStartRequired,
ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) {
this(jobName, engine, taskScheduler);
}

public void start() {
Expand Down Expand Up @@ -145,7 +140,7 @@ public String getName() {
return jobName;
}

@ManagedOperation(description = "Run this job is it isn't already running")
@ManagedOperation(description = "Run this job if it isn't already running")
public boolean invoke() {
return invoke(true);
}
Expand All @@ -167,8 +162,8 @@ public boolean invoke(boolean force) {
ran = true;
long startTime = System.currentTimeMillis();
try {
if (!requiresRegistration
|| (requiresRegistration && engine
if (!isRequiresRegistration()
|| (isRequiresRegistration() && engine
.getRegistrationService()
.isRegisteredWithServer())) {
hasNotRegisteredMessageBeenLogged = false;
Expand Down Expand Up @@ -293,5 +288,8 @@ public String getCronExpression() {
public long getTimeBetweenRunsInMs() {
return engine.getParameterService().getInt(jobName + ".period.time.ms", -1);
}


public boolean isRequiresRegistration() {
return true;
}
}
Expand Up @@ -28,10 +28,16 @@
public class FileSyncPullJob extends AbstractJob {

protected FileSyncPullJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) {
super("job.file.sync.pull", true, engine.getParameterService().is(
ParameterConstants.FILE_SYNC_ENABLE) && engine.getParameterService().is("start.file.sync.pull.job", true), engine, taskScheduler);
super("job.file.sync.pull", engine, taskScheduler);
}

@Override
public boolean isAutoStartConfigured() {
return engine.getParameterService().is(ParameterConstants.FILE_SYNC_ENABLE)
&& engine.getParameterService().is(ParameterConstants.START_FILE_SYNC_PULL_JOB, true);
}

@Override
public String getClusterLockName() {
return ClusterConstants.FILE_SYNC_PULL;
}
Expand All @@ -40,6 +46,5 @@ public String getClusterLockName() {
void doJob(boolean force) throws Exception {
engine.getFileSyncService().pullFilesFromNodes(force);
}



}
Expand Up @@ -28,12 +28,16 @@
public class FileSyncPushJob extends AbstractJob {

protected FileSyncPushJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) {
super("job.file.sync.push", true, engine.getParameterService().is(
ParameterConstants.FILE_SYNC_ENABLE)
&& engine.getParameterService().is("start.file.sync.push.job", true), engine,
taskScheduler);
super("job.file.sync.push", engine, taskScheduler);
}

@Override
public boolean isAutoStartConfigured() {
return engine.getParameterService().is(ParameterConstants.FILE_SYNC_ENABLE)
&& engine.getParameterService().is(ParameterConstants.START_FILE_SYNC_PUSH_JOB, true);
}

@Override
public String getClusterLockName() {
return ClusterConstants.FILE_SYNC_PUSH;
}
Expand All @@ -42,5 +46,5 @@ public String getClusterLockName() {
void doJob(boolean force) throws Exception {
engine.getFileSyncService().pushFilesToNodes(force);
}

}
Expand Up @@ -28,11 +28,16 @@
public class FileSyncTrackerJob extends AbstractJob {

protected FileSyncTrackerJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) {
super("job.file.sync.tracker", true, engine.getParameterService().is(
ParameterConstants.FILE_SYNC_ENABLE)
&& engine.getParameterService().is("start.file.sync.tracker.job", true), engine, taskScheduler);
super("job.file.sync.tracker", engine, taskScheduler);
}

@Override
public boolean isAutoStartConfigured() {
return engine.getParameterService().is(ParameterConstants.FILE_SYNC_ENABLE)
&& engine.getParameterService().is(ParameterConstants.START_FILE_SYNC_TRACKER_JOB, true);
}

@Override
public String getClusterLockName() {
return ClusterConstants.FILE_SYNC_TRACKER;
}
Expand All @@ -43,5 +48,4 @@ void doJob(boolean force) throws Exception {
engine.getFileSyncService().trackChanges(force);
}
}

}
Expand Up @@ -21,6 +21,7 @@
package org.jumpmind.symmetric.job;

import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.service.ClusterConstants;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

Expand All @@ -30,10 +31,24 @@
public class HeartbeatJob extends AbstractJob {

public HeartbeatJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) {
super("job.heartbeat", false, engine.getParameterService().is("start.heartbeat.job"),
engine, taskScheduler);
super("job.heartbeat", engine, taskScheduler);
}

@Override
public boolean isAutoStartConfigured() {
return engine.getParameterService().is(ParameterConstants.START_HEARTBEAT_JOB);
}

@Override
public boolean isRequiresRegistration() {
return false;
}

@Override
public String getClusterLockName() {
return ClusterConstants.HEARTBEAT;
}

@Override
public void doJob(boolean force) throws Exception {
if (engine.getClusterService().lock(getClusterLockName())) {
Expand All @@ -44,9 +59,5 @@ public void doJob(boolean force) throws Exception {
}
}
}

public String getClusterLockName() {
return ClusterConstants.HEARTBEAT;
}


}
Expand Up @@ -21,7 +21,8 @@

package org.jumpmind.symmetric.job;

import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.service.ClusterConstants;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

Expand All @@ -31,17 +32,22 @@
public class IncomingPurgeJob extends AbstractJob {

public IncomingPurgeJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) {
super("job.purge.incoming", true, engine.getParameterService().is("start.purge.job"),
engine, taskScheduler);
super("job.purge.incoming", engine, taskScheduler);
}

@Override
public void doJob(boolean force) throws Exception {
engine.getPurgeService().purgeIncoming(force);
}
public boolean isAutoStartConfigured() {
return engine.getParameterService().is(ParameterConstants.START_PURGE_JOB);
}

@Override
public String getClusterLockName() {
return ClusterConstants.PURGE_INCOMING;
return ClusterConstants.PURGE_INCOMING;
}

@Override
public void doJob(boolean force) throws Exception {
engine.getPurgeService().purgeIncoming(force);
}

}
Expand Up @@ -21,16 +21,22 @@
package org.jumpmind.symmetric.job;

import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.service.ClusterConstants;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

public class InitialLoadExtractorJob extends AbstractJob {

protected InitialLoadExtractorJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) {
super("job.initial.load.extract", true, engine.getParameterService().is(
"start.initial.load.extract.job", true), engine, taskScheduler);
super("job.initial.load.extract", engine, taskScheduler);
}

@Override
public boolean isAutoStartConfigured() {
return engine.getParameterService().is(ParameterConstants.INITIAL_LOAD_EXTRACT_JOB_START, true);
}

@Override
public String getClusterLockName() {
return ClusterConstants.INITIAL_LOAD_EXTRACT;
}
Expand Down
Expand Up @@ -21,13 +21,24 @@
package org.jumpmind.symmetric.job;

import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.service.ClusterConstants;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

public class MonitorJob extends AbstractJob {

public MonitorJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) {
super("job.monitor", true, engine.getParameterService().is("start.monitor.job"), engine, taskScheduler);
super("job.monitor", engine, taskScheduler);
}

@Override
public boolean isAutoStartConfigured() {
return engine.getParameterService().is(ParameterConstants.START_MONITOR_JOB);
}

@Override
public String getClusterLockName() {
return ClusterConstants.MONITOR;
}

@Override
Expand All @@ -37,8 +48,4 @@ public void doJob(boolean force) throws Exception {
}
}

public String getClusterLockName() {
return ClusterConstants.MONITOR;
}

}
Expand Up @@ -21,6 +21,7 @@
package org.jumpmind.symmetric.job;

import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.service.ClusterConstants;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

Expand All @@ -30,17 +31,27 @@
public class OfflinePullJob extends AbstractJob {

public OfflinePullJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) {
super("job.offline.pull", false, engine.getParameterService().is("start.offline.pull.job"),
engine, taskScheduler);
super("job.offline.pull", engine, taskScheduler);
}

@Override
public void doJob(boolean force) throws Exception {
engine.getOfflinePullService().pullData(force);
public boolean isAutoStartConfigured() {
return engine.getParameterService().is(ParameterConstants.START_OFFLINE_PULL_JOB);
}

@Override
public boolean isRequiresRegistration() {
return false;
}

@Override
public String getClusterLockName() {
return ClusterConstants.OFFLINE_PULL;
}

@Override
public void doJob(boolean force) throws Exception {
engine.getOfflinePullService().pullData(force);
}

}

0 comments on commit 5b6446c

Please sign in to comment.