Skip to content

Commit

Permalink
0002684: Support Custom Jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
mmichalek committed May 9, 2017
1 parent bd4549f commit 9b91ed6
Show file tree
Hide file tree
Showing 38 changed files with 438 additions and 258 deletions.
Expand Up @@ -31,7 +31,6 @@
import org.jumpmind.symmetric.job.IJob;
import org.jumpmind.symmetric.job.IJobManager;
import org.jumpmind.symmetric.model.JobDefinition;
import org.jumpmind.symmetric.model.JobDefinition.ScheduleType;
import org.jumpmind.symmetric.service.IParameterService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -178,7 +177,7 @@ public boolean invoke(boolean force) {
}
}

if (parameterService.is(ParameterConstants.START_PURGE_JOB)
if (parameterService.is(ParameterConstants.START_PURGE_INCOMING_JOB)
&& parameterService.getInt("job.purge.period.time.ms") < System
.currentTimeMillis() - lastPurgeTime) {
try {
Expand Down Expand Up @@ -328,6 +327,31 @@ public long getTimeBetweenRunsInMs() {
public JobDefinition getJobDefinition() {
return null;
}

@Override
public Date getNextExecutionTime() {
return null;
}

@Override
public boolean isCronSchedule() {
return false;
}

@Override
public boolean isPeriodicSchedule() {
return false;
}

@Override
public String getSchedule() {
return null;
}

@Override
public String getDeprecatedStartParameter() {
return null;
}
}

@Override
Expand All @@ -345,4 +369,8 @@ public void saveJob(JobDefinition jobDefinition) {
// No action on Android
}

@Override
public void removeJob(String name) {
}

}
Expand Up @@ -30,8 +30,6 @@
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.model.JobDefinition;
import org.jumpmind.symmetric.model.JobDefinition.ScheduleType;
import org.jumpmind.symmetric.model.JobDefinition.StartupType;
import org.jumpmind.symmetric.model.Lock;
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.util.RandomTimeSlot;
Expand All @@ -42,6 +40,7 @@
import org.springframework.jmx.export.annotation.ManagedMetric;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.scheduling.TriggerContext;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.support.CronTrigger;

Expand Down Expand Up @@ -77,27 +76,38 @@ abstract public class AbstractJob implements Runnable, IJob {
private ScheduledFuture<?> scheduledJob;

private RandomTimeSlot randomTimeSlot;

protected AbstractJob(String jobName, ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) {

private CronTrigger cronTrigger;

private Date periodicFirstRunTime;

private IParameterService parameterService;

public AbstractJob() {

}

public AbstractJob(String jobName, ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) {
this.engine = engine;
this.taskScheduler = taskScheduler;
this.jobName = jobName;
IParameterService parameterService = engine.getParameterService();
this.parameterService = engine.getParameterService();
this.randomTimeSlot = new RandomTimeSlot(parameterService.getExternalId(),
parameterService.getInt(ParameterConstants.JOB_RANDOM_MAX_START_TIME_MS));
}

public void start() {
if (this.scheduledJob == null && engine != null
&& !engine.getClusterService().isInfiniteLocked(getName())) {
if (jobDefinition.getScheduleType() == ScheduleType.CRON) {
String cronExpression = jobDefinition.getSchedule();
if (isCronSchedule()) {
String cronExpression = getSchedule();
log.info("Starting job '{}' with cron expression: '{}'", jobName, cronExpression);
cronTrigger = new CronTrigger(cronExpression);
try {
this.scheduledJob = taskScheduler.schedule(this, new CronTrigger(cronExpression));
this.scheduledJob = taskScheduler.schedule(this, cronTrigger);
} catch (Exception ex) {
throw new SymmetricException("Failed to schedule job '" + jobName + "' with schedule '" +
getJobDefinition().getSchedule() + "'", ex);
getSchedule() + "'", ex);
}
started = true;
} else {
Expand All @@ -106,6 +116,10 @@ public void start() {
return;
}

if (randomTimeSlot == null) {
this.randomTimeSlot = new RandomTimeSlot(parameterService.getExternalId(),
parameterService.getInt(ParameterConstants.JOB_RANDOM_MAX_START_TIME_MS));
}
int startDelay = randomTimeSlot.getRandomValueSeededByExternalId();
long currentTimeMillis = System.currentTimeMillis();
long lastRunTime = currentTimeMillis - timeBetweenRunsInMs;
Expand All @@ -116,28 +130,31 @@ public void start() {
lastRunTime = newRunTime;
}
}
Date firstRun = new Date(lastRunTime + timeBetweenRunsInMs + startDelay);
periodicFirstRunTime = new Date(lastRunTime + timeBetweenRunsInMs + startDelay);
log.info("Starting {} on periodic schedule: every {}ms with the first run at {}", new Object[] {jobName,
timeBetweenRunsInMs, firstRun});
timeBetweenRunsInMs, periodicFirstRunTime});
this.scheduledJob = taskScheduler.scheduleWithFixedDelay(this,
firstRun, timeBetweenRunsInMs);
periodicFirstRunTime, timeBetweenRunsInMs);
started = true;
}
}
}

protected long getTimeBetweenRunsInMs() {
long timeBetweenRunsInMs = -1;
String schedule = getSchedule();

try {
timeBetweenRunsInMs = Long.parseLong(jobDefinition.getSchedule());
if (StringUtils.isEmpty(schedule)) {
throw new SymmetricException("Schedule value is not defined for " + jobDefinition.getPeriodicParameter());
}
timeBetweenRunsInMs = Long.parseLong(getSchedule());
if (timeBetweenRunsInMs <= 0) {
log.error("Failed to schedule job '" + jobName + "' because of an invalid schedule '" +
getJobDefinition().getSchedule() + "'");
return -1;
throw new SymmetricException("Schedule value must be positive, but was '" + schedule + "'");
}
} catch (NumberFormatException ex) {
log.error("Failed to schedule job '" + jobName + "' because of an invalid schedule '" +
getJobDefinition().getSchedule() + "'", ex);
} catch (Exception ex) {
log.error("Failed to schedule job '" + jobName + "' because of an invalid schedule: '" +
getSchedule() + "' Check the " + jobDefinition.getPeriodicParameter() + " parameter.", ex);
return -1;
}
return timeBetweenRunsInMs;
Expand Down Expand Up @@ -319,6 +336,37 @@ public long getNumberOfRuns() {
public long getTotalExecutionTimeInMs() {
return totalExecutionTimeInMs;
}

@Override
public Date getNextExecutionTime() {
if (isCronSchedule() && cronTrigger != null) {
return cronTrigger.nextExecutionTime(new TriggerContext() {

@Override
public Date lastScheduledExecutionTime() {
return null;
}

@Override
public Date lastCompletionTime() {
return getLastFinishTime();
}

@Override
public Date lastActualExecutionTime() {
return null;
}
});
} else if (isPeriodicSchedule() ) {
if (getLastFinishTime() != null) {
return new Date(getLastFinishTime().getTime() + getTimeBetweenRunsInMs());
} else if (periodicFirstRunTime != null) {
return new Date(periodicFirstRunTime.getTime() + getTimeBetweenRunsInMs());
}
}
return null;

}

@Override
@ManagedMetric(description = "The total amount of time this job has spend in execution during the lifetime of the JVM")
Expand All @@ -329,13 +377,27 @@ public long getAverageExecutionTimeInMs() {
return 0;
}
}

public abstract JobDefaults getDefaults();

public StartupType getStartupType() {
// TODO check override parameters...
return jobDefinition.getStartupType();

public boolean isCronSchedule() {
String cronSchedule = parameterService.getString(jobDefinition.getCronParameter());
return !StringUtils.isEmpty(cronSchedule);
}

public boolean isPeriodicSchedule() {
return !isCronSchedule();
}

public String getSchedule() {
String cronSchedule = parameterService.getString(jobDefinition.getCronParameter());
if (!StringUtils.isEmpty(cronSchedule)) {
return cronSchedule;
} else {
String periodicSchedule = parameterService.getString(jobDefinition.getPeriodicParameter());
return periodicSchedule;
}
}

public abstract JobDefaults getDefaults();

public ISymmetricEngine getEngine() {
return engine;
Expand All @@ -360,4 +422,17 @@ public ThreadPoolTaskScheduler getTaskScheduler() {
public void setTaskScheduler(ThreadPoolTaskScheduler taskScheduler) {
this.taskScheduler = taskScheduler;
}

@Override
public String getDeprecatedStartParameter() {
return null;
}

public IParameterService getParameterService() {
return parameterService;
}

public void setParameterService(IParameterService parameterService) {
this.parameterService = parameterService;
}
}
Expand Up @@ -47,7 +47,7 @@ public void doJob(boolean force) throws Exception {
interpreter.eval(getJobDefinition().getJobExpression());
}
} catch (Exception ex) {
log.error("Exception during bsh job '" + this.getName() + "'", ex);
log.error("Exception during bsh job '" + this.getName() + "'\n" + getJobDefinition().getJobExpression(), ex);
}
}

Expand Down
Expand Up @@ -24,36 +24,21 @@
import java.util.List;

import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.model.JobDefinition;
import org.jumpmind.symmetric.model.JobDefinition.JobType;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

public class BuiltInJobs {

// public List<IJob> loadAndSyncBuiltInJobs(ISqlTemplate sqlTemplate, ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) {
// List<IJob> jobTemplates = getBuiltInJobs(engine, taskScheduler);
// List<IJob> jobRows = getBuiltInJobsFromDb(sqlTemplate);
// return null;
// }

public List<JobDefinition> syncBuiltInJobs(List<JobDefinition> existingJobs, ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) {
List<IJob> builtInJobs = getBuiltInJobs(engine, taskScheduler);
List<JobDefinition> builtInJobDefs = new ArrayList<JobDefinition>(builtInJobs.size());

for (IJob job : builtInJobs) {
// TODO, should use from the DB.
existingJobs.add(job.getJobDefinition());
}

return existingJobs;
}

// public List<IJob>
//
// private List<IJob> getBuiltInJobsFromDb(ISqlTemplate sqlTemplate) {
// return null;
// }

public List<IJob> getBuiltInJobs(ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) {
List<IJob> builtInJobs = new ArrayList<IJob>(20);
Expand Down Expand Up @@ -92,14 +77,9 @@ protected void setBuiltInDefaults(IJob argBuiltInJob) {
jobDefinition.setJobName(builtInJob.getName());
jobDefinition.setDescription(builtInJob.getDefaults().getDescription());
jobDefinition.setRequiresRegistration(builtInJob.getDefaults().isRequiresRegisteration());
jobDefinition.setExternalId(ParameterConstants.ALL);
jobDefinition.setNodeGroupId(ParameterConstants.ALL);
jobDefinition.setJobType(JobType.BUILT_IN);
jobDefinition.setStartupType(builtInJob.getDefaults().getStartupType());
jobDefinition.setJobExpression(argBuiltInJob.getClass().getName());
jobDefinition.setCreateBy("SymmetricDS");
jobDefinition.setScheduleType(builtInJob.getDefaults().getScheduleType());
jobDefinition.setSchedule(builtInJob.getDefaults().getSchedule());
builtInJob.setJobDefinition(jobDefinition);
}
}
Expand Up @@ -20,12 +20,10 @@
*/
package org.jumpmind.symmetric.job;

import org.jumpmind.symmetric.ISymmetricEngine;
import static org.jumpmind.symmetric.job.JobDefaults.EVERY_MINUTE;

import static org.jumpmind.symmetric.job.JobDefaults.*;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.model.JobDefinition.ScheduleType;
import org.jumpmind.symmetric.model.JobDefinition.StartupType;
import org.jumpmind.symmetric.service.ClusterConstants;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

Expand All @@ -40,9 +38,8 @@ public JobDefaults getDefaults() {
boolean fileSyncEnabeld = engine.getParameterService().is(ParameterConstants.FILE_SYNC_ENABLE);

return new JobDefaults()
.scheduleType(ScheduleType.PERIODIC)
.schedule(EVERY_MINUTE)
.startupType(fileSyncEnabeld ? StartupType.AUTOMATIC : StartupType.DISABLED)
.enabled(fileSyncEnabeld)
.description("Check for files to pull down from other nodes");
}

Expand Down
Expand Up @@ -20,12 +20,10 @@
*/
package org.jumpmind.symmetric.job;

import org.jumpmind.symmetric.ISymmetricEngine;
import static org.jumpmind.symmetric.job.JobDefaults.EVERY_MINUTE;

import static org.jumpmind.symmetric.job.JobDefaults.*;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.model.JobDefinition.ScheduleType;
import org.jumpmind.symmetric.model.JobDefinition.StartupType;
import org.jumpmind.symmetric.service.ClusterConstants;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

Expand All @@ -40,9 +38,8 @@ public JobDefaults getDefaults() {
boolean fileSyncEnabeld = engine.getParameterService().is(ParameterConstants.FILE_SYNC_ENABLE);

return new JobDefaults()
.scheduleType(ScheduleType.PERIODIC)
.schedule(EVERY_MINUTE)
.startupType(fileSyncEnabeld ? StartupType.AUTOMATIC : StartupType.DISABLED)
.enabled(fileSyncEnabeld)
.description("Push files to other nodes");
}

Expand Down

0 comments on commit 9b91ed6

Please sign in to comment.