diff --git a/symmetric-android/src/main/java/org/jumpmind/symmetric/android/AndroidJobManager.java b/symmetric-android/src/main/java/org/jumpmind/symmetric/android/AndroidJobManager.java index f39a45bef0..b6d6040734 100644 --- a/symmetric-android/src/main/java/org/jumpmind/symmetric/android/AndroidJobManager.java +++ b/symmetric-android/src/main/java/org/jumpmind/symmetric/android/AndroidJobManager.java @@ -31,7 +31,6 @@ import org.jumpmind.symmetric.job.IJob; import org.jumpmind.symmetric.job.IJobManager; import org.jumpmind.symmetric.model.JobDefinition; -import org.jumpmind.symmetric.model.JobDefinition.ScheduleType; import org.jumpmind.symmetric.service.IParameterService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -178,7 +177,7 @@ public boolean invoke(boolean force) { } } - if (parameterService.is(ParameterConstants.START_PURGE_JOB) + if (parameterService.is(ParameterConstants.START_PURGE_INCOMING_JOB) && parameterService.getInt("job.purge.period.time.ms") < System .currentTimeMillis() - lastPurgeTime) { try { @@ -328,6 +327,31 @@ public long getTimeBetweenRunsInMs() { public JobDefinition getJobDefinition() { return null; } + + @Override + public Date getNextExecutionTime() { + return null; + } + + @Override + public boolean isCronSchedule() { + return false; + } + + @Override + public boolean isPeriodicSchedule() { + return false; + } + + @Override + public String getSchedule() { + return null; + } + + @Override + public String getDeprecatedStartParameter() { + return null; + } } @Override @@ -345,4 +369,8 @@ public void saveJob(JobDefinition jobDefinition) { // No action on Android } + @Override + public void removeJob(String name) { + } + } diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/AbstractJob.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/AbstractJob.java index 2502bea21c..623e9314a0 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/AbstractJob.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/AbstractJob.java @@ -30,8 +30,6 @@ import org.jumpmind.symmetric.common.Constants; import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.model.JobDefinition; -import org.jumpmind.symmetric.model.JobDefinition.ScheduleType; -import org.jumpmind.symmetric.model.JobDefinition.StartupType; import org.jumpmind.symmetric.model.Lock; import org.jumpmind.symmetric.service.IParameterService; import org.jumpmind.util.RandomTimeSlot; @@ -42,6 +40,7 @@ import org.springframework.jmx.export.annotation.ManagedMetric; import org.springframework.jmx.export.annotation.ManagedOperation; import org.springframework.jmx.export.annotation.ManagedResource; +import org.springframework.scheduling.TriggerContext; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.scheduling.support.CronTrigger; @@ -77,12 +76,22 @@ abstract public class AbstractJob implements Runnable, IJob { private ScheduledFuture scheduledJob; private RandomTimeSlot randomTimeSlot; - - protected AbstractJob(String jobName, ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) { + + private CronTrigger cronTrigger; + + private Date periodicFirstRunTime; + + private IParameterService parameterService; + + public AbstractJob() { + + } + + public AbstractJob(String jobName, ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) { this.engine = engine; this.taskScheduler = taskScheduler; this.jobName = jobName; - IParameterService parameterService = engine.getParameterService(); + this.parameterService = engine.getParameterService(); this.randomTimeSlot = new RandomTimeSlot(parameterService.getExternalId(), parameterService.getInt(ParameterConstants.JOB_RANDOM_MAX_START_TIME_MS)); } @@ -90,14 +99,15 @@ protected AbstractJob(String jobName, ISymmetricEngine engine, ThreadPoolTaskSch public void start() { if (this.scheduledJob == null && engine != null && !engine.getClusterService().isInfiniteLocked(getName())) { - if (jobDefinition.getScheduleType() == ScheduleType.CRON) { - String cronExpression = jobDefinition.getSchedule(); + if (isCronSchedule()) { + String cronExpression = getSchedule(); log.info("Starting job '{}' with cron expression: '{}'", jobName, cronExpression); + cronTrigger = new CronTrigger(cronExpression); try { - this.scheduledJob = taskScheduler.schedule(this, new CronTrigger(cronExpression)); + this.scheduledJob = taskScheduler.schedule(this, cronTrigger); } catch (Exception ex) { throw new SymmetricException("Failed to schedule job '" + jobName + "' with schedule '" + - getJobDefinition().getSchedule() + "'", ex); + getSchedule() + "'", ex); } started = true; } else { @@ -106,6 +116,10 @@ public void start() { return; } + if (randomTimeSlot == null) { + this.randomTimeSlot = new RandomTimeSlot(parameterService.getExternalId(), + parameterService.getInt(ParameterConstants.JOB_RANDOM_MAX_START_TIME_MS)); + } int startDelay = randomTimeSlot.getRandomValueSeededByExternalId(); long currentTimeMillis = System.currentTimeMillis(); long lastRunTime = currentTimeMillis - timeBetweenRunsInMs; @@ -116,11 +130,11 @@ public void start() { lastRunTime = newRunTime; } } - Date firstRun = new Date(lastRunTime + timeBetweenRunsInMs + startDelay); + periodicFirstRunTime = new Date(lastRunTime + timeBetweenRunsInMs + startDelay); log.info("Starting {} on periodic schedule: every {}ms with the first run at {}", new Object[] {jobName, - timeBetweenRunsInMs, firstRun}); + timeBetweenRunsInMs, periodicFirstRunTime}); this.scheduledJob = taskScheduler.scheduleWithFixedDelay(this, - firstRun, timeBetweenRunsInMs); + periodicFirstRunTime, timeBetweenRunsInMs); started = true; } } @@ -128,16 +142,19 @@ public void start() { protected long getTimeBetweenRunsInMs() { long timeBetweenRunsInMs = -1; + String schedule = getSchedule(); + try { - timeBetweenRunsInMs = Long.parseLong(jobDefinition.getSchedule()); + if (StringUtils.isEmpty(schedule)) { + throw new SymmetricException("Schedule value is not defined for " + jobDefinition.getPeriodicParameter()); + } + timeBetweenRunsInMs = Long.parseLong(getSchedule()); if (timeBetweenRunsInMs <= 0) { - log.error("Failed to schedule job '" + jobName + "' because of an invalid schedule '" + - getJobDefinition().getSchedule() + "'"); - return -1; + throw new SymmetricException("Schedule value must be positive, but was '" + schedule + "'"); } - } catch (NumberFormatException ex) { - log.error("Failed to schedule job '" + jobName + "' because of an invalid schedule '" + - getJobDefinition().getSchedule() + "'", ex); + } catch (Exception ex) { + log.error("Failed to schedule job '" + jobName + "' because of an invalid schedule: '" + + getSchedule() + "' Check the " + jobDefinition.getPeriodicParameter() + " parameter.", ex); return -1; } return timeBetweenRunsInMs; @@ -319,6 +336,37 @@ public long getNumberOfRuns() { public long getTotalExecutionTimeInMs() { return totalExecutionTimeInMs; } + + @Override + public Date getNextExecutionTime() { + if (isCronSchedule() && cronTrigger != null) { + return cronTrigger.nextExecutionTime(new TriggerContext() { + + @Override + public Date lastScheduledExecutionTime() { + return null; + } + + @Override + public Date lastCompletionTime() { + return getLastFinishTime(); + } + + @Override + public Date lastActualExecutionTime() { + return null; + } + }); + } else if (isPeriodicSchedule() ) { + if (getLastFinishTime() != null) { + return new Date(getLastFinishTime().getTime() + getTimeBetweenRunsInMs()); + } else if (periodicFirstRunTime != null) { + return new Date(periodicFirstRunTime.getTime() + getTimeBetweenRunsInMs()); + } + } + return null; + + } @Override @ManagedMetric(description = "The total amount of time this job has spend in execution during the lifetime of the JVM") @@ -329,13 +377,27 @@ public long getAverageExecutionTimeInMs() { return 0; } } - - public abstract JobDefaults getDefaults(); - - public StartupType getStartupType() { - // TODO check override parameters... - return jobDefinition.getStartupType(); + + public boolean isCronSchedule() { + String cronSchedule = parameterService.getString(jobDefinition.getCronParameter()); + return !StringUtils.isEmpty(cronSchedule); + } + + public boolean isPeriodicSchedule() { + return !isCronSchedule(); + } + + public String getSchedule() { + String cronSchedule = parameterService.getString(jobDefinition.getCronParameter()); + if (!StringUtils.isEmpty(cronSchedule)) { + return cronSchedule; + } else { + String periodicSchedule = parameterService.getString(jobDefinition.getPeriodicParameter()); + return periodicSchedule; + } } + + public abstract JobDefaults getDefaults(); public ISymmetricEngine getEngine() { return engine; @@ -360,4 +422,17 @@ public ThreadPoolTaskScheduler getTaskScheduler() { public void setTaskScheduler(ThreadPoolTaskScheduler taskScheduler) { this.taskScheduler = taskScheduler; } + + @Override + public String getDeprecatedStartParameter() { + return null; + } + + public IParameterService getParameterService() { + return parameterService; + } + + public void setParameterService(IParameterService parameterService) { + this.parameterService = parameterService; + } } diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/BshJob.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/BshJob.java index e9d450089f..34aff2fc50 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/BshJob.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/BshJob.java @@ -47,7 +47,7 @@ public void doJob(boolean force) throws Exception { interpreter.eval(getJobDefinition().getJobExpression()); } } catch (Exception ex) { - log.error("Exception during bsh job '" + this.getName() + "'", ex); + log.error("Exception during bsh job '" + this.getName() + "'\n" + getJobDefinition().getJobExpression(), ex); } } diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/BuiltInJobs.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/BuiltInJobs.java index 93b2ba1521..cfca64d9da 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/BuiltInJobs.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/BuiltInJobs.java @@ -24,36 +24,21 @@ import java.util.List; import org.jumpmind.symmetric.ISymmetricEngine; -import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.model.JobDefinition; import org.jumpmind.symmetric.model.JobDefinition.JobType; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; public class BuiltInJobs { - -// public List loadAndSyncBuiltInJobs(ISqlTemplate sqlTemplate, ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) { -// List jobTemplates = getBuiltInJobs(engine, taskScheduler); -// List jobRows = getBuiltInJobsFromDb(sqlTemplate); -// return null; -// } public List syncBuiltInJobs(List existingJobs, ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) { List builtInJobs = getBuiltInJobs(engine, taskScheduler); - List builtInJobDefs = new ArrayList(builtInJobs.size()); for (IJob job : builtInJobs) { - // TODO, should use from the DB. existingJobs.add(job.getJobDefinition()); } return existingJobs; } - -// public List -// -// private List getBuiltInJobsFromDb(ISqlTemplate sqlTemplate) { -// return null; -// } public List getBuiltInJobs(ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) { List builtInJobs = new ArrayList(20); @@ -92,14 +77,9 @@ protected void setBuiltInDefaults(IJob argBuiltInJob) { jobDefinition.setJobName(builtInJob.getName()); jobDefinition.setDescription(builtInJob.getDefaults().getDescription()); jobDefinition.setRequiresRegistration(builtInJob.getDefaults().isRequiresRegisteration()); - jobDefinition.setExternalId(ParameterConstants.ALL); - jobDefinition.setNodeGroupId(ParameterConstants.ALL); jobDefinition.setJobType(JobType.BUILT_IN); - jobDefinition.setStartupType(builtInJob.getDefaults().getStartupType()); jobDefinition.setJobExpression(argBuiltInJob.getClass().getName()); jobDefinition.setCreateBy("SymmetricDS"); - jobDefinition.setScheduleType(builtInJob.getDefaults().getScheduleType()); - jobDefinition.setSchedule(builtInJob.getDefaults().getSchedule()); builtInJob.setJobDefinition(jobDefinition); } } diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/FileSyncPullJob.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/FileSyncPullJob.java index 92fbb88a6a..e7914848dc 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/FileSyncPullJob.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/FileSyncPullJob.java @@ -20,12 +20,10 @@ */ package org.jumpmind.symmetric.job; -import org.jumpmind.symmetric.ISymmetricEngine; +import static org.jumpmind.symmetric.job.JobDefaults.EVERY_MINUTE; -import static org.jumpmind.symmetric.job.JobDefaults.*; +import org.jumpmind.symmetric.ISymmetricEngine; import org.jumpmind.symmetric.common.ParameterConstants; -import org.jumpmind.symmetric.model.JobDefinition.ScheduleType; -import org.jumpmind.symmetric.model.JobDefinition.StartupType; import org.jumpmind.symmetric.service.ClusterConstants; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @@ -40,9 +38,8 @@ public JobDefaults getDefaults() { boolean fileSyncEnabeld = engine.getParameterService().is(ParameterConstants.FILE_SYNC_ENABLE); return new JobDefaults() - .scheduleType(ScheduleType.PERIODIC) .schedule(EVERY_MINUTE) - .startupType(fileSyncEnabeld ? StartupType.AUTOMATIC : StartupType.DISABLED) + .enabled(fileSyncEnabeld) .description("Check for files to pull down from other nodes"); } diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/FileSyncPushJob.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/FileSyncPushJob.java index dfd3800ad4..39b81e116a 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/FileSyncPushJob.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/FileSyncPushJob.java @@ -20,12 +20,10 @@ */ package org.jumpmind.symmetric.job; -import org.jumpmind.symmetric.ISymmetricEngine; +import static org.jumpmind.symmetric.job.JobDefaults.EVERY_MINUTE; -import static org.jumpmind.symmetric.job.JobDefaults.*; +import org.jumpmind.symmetric.ISymmetricEngine; import org.jumpmind.symmetric.common.ParameterConstants; -import org.jumpmind.symmetric.model.JobDefinition.ScheduleType; -import org.jumpmind.symmetric.model.JobDefinition.StartupType; import org.jumpmind.symmetric.service.ClusterConstants; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @@ -40,9 +38,8 @@ public JobDefaults getDefaults() { boolean fileSyncEnabeld = engine.getParameterService().is(ParameterConstants.FILE_SYNC_ENABLE); return new JobDefaults() - .scheduleType(ScheduleType.PERIODIC) .schedule(EVERY_MINUTE) - .startupType(fileSyncEnabeld ? StartupType.AUTOMATIC : StartupType.DISABLED) + .enabled(fileSyncEnabeld) .description("Push files to other nodes"); } diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/FileSyncTrackerJob.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/FileSyncTrackerJob.java index be8d6bd75c..015a97a2cf 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/FileSyncTrackerJob.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/FileSyncTrackerJob.java @@ -20,11 +20,10 @@ */ package org.jumpmind.symmetric.job; +import static org.jumpmind.symmetric.job.JobDefaults.EVERY_5_MINUTES; + import org.jumpmind.symmetric.ISymmetricEngine; -import static org.jumpmind.symmetric.job.JobDefaults.*; import org.jumpmind.symmetric.common.ParameterConstants; -import org.jumpmind.symmetric.model.JobDefinition.ScheduleType; -import org.jumpmind.symmetric.model.JobDefinition.StartupType; import org.jumpmind.symmetric.service.ClusterConstants; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @@ -39,9 +38,8 @@ public JobDefaults getDefaults() { boolean fileSyncEnabeld = engine.getParameterService().is(ParameterConstants.FILE_SYNC_ENABLE); return new JobDefaults() - .scheduleType(ScheduleType.CRON) .schedule(EVERY_5_MINUTES) - .startupType(fileSyncEnabeld ? StartupType.AUTOMATIC : StartupType.DISABLED) + .enabled(fileSyncEnabeld) .description("Check for changes in sync'd files"); } diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/HeartbeatJob.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/HeartbeatJob.java index 4a6947784d..554791cee5 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/HeartbeatJob.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/HeartbeatJob.java @@ -20,10 +20,9 @@ */ package org.jumpmind.symmetric.job; -import static org.jumpmind.symmetric.job.JobDefaults.*; +import static org.jumpmind.symmetric.job.JobDefaults.EVERY_FIFTEEN_MINUTES; + import org.jumpmind.symmetric.ISymmetricEngine; -import org.jumpmind.symmetric.model.JobDefinition.ScheduleType; -import org.jumpmind.symmetric.model.JobDefinition.StartupType; import org.jumpmind.symmetric.service.ClusterConstants; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @@ -41,9 +40,7 @@ public JobDefaults getDefaults() { return new JobDefaults() .description("Record a heartbeat") .requiresRegisteration(false) - .scheduleType(ScheduleType.PERIODIC) - .schedule(EVERY_FIFTEEN_MINUTES) - .startupType(StartupType.AUTOMATIC); + .schedule(EVERY_FIFTEEN_MINUTES); } @Override diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/IncomingPurgeJob.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/IncomingPurgeJob.java index d3bfe7c58d..ef5649d3ca 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/IncomingPurgeJob.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/IncomingPurgeJob.java @@ -21,10 +21,10 @@ package org.jumpmind.symmetric.job; -import static org.jumpmind.symmetric.job.JobDefaults.*; +import static org.jumpmind.symmetric.job.JobDefaults.EVERY_NIGHT_AT_MIDNIGHT; + import org.jumpmind.symmetric.ISymmetricEngine; -import org.jumpmind.symmetric.model.JobDefinition.ScheduleType; -import org.jumpmind.symmetric.model.JobDefinition.StartupType; +import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.service.ClusterConstants; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @@ -40,9 +40,7 @@ public IncomingPurgeJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskSch @Override public JobDefaults getDefaults() { return new JobDefaults() - .scheduleType(ScheduleType.CRON) .schedule(EVERY_NIGHT_AT_MIDNIGHT) - .startupType(StartupType.AUTOMATIC) .description("Purge incoming data"); } @@ -51,4 +49,9 @@ public void doJob(boolean force) throws Exception { engine.getPurgeService().purgeIncoming(force); } + @Override + public String getDeprecatedStartParameter() { + return ParameterConstants.START_PURGE_JOB_38; + } + } \ No newline at end of file diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/InitialLoadExtractorJob.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/InitialLoadExtractorJob.java index ceb6fb2028..8997ef34d3 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/InitialLoadExtractorJob.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/InitialLoadExtractorJob.java @@ -20,10 +20,9 @@ */ package org.jumpmind.symmetric.job; -import static org.jumpmind.symmetric.job.JobDefaults.*; +import static org.jumpmind.symmetric.job.JobDefaults.EVERY_10_SECONDS; + import org.jumpmind.symmetric.ISymmetricEngine; -import org.jumpmind.symmetric.model.JobDefinition.ScheduleType; -import org.jumpmind.symmetric.model.JobDefinition.StartupType; import org.jumpmind.symmetric.service.ClusterConstants; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @@ -37,9 +36,7 @@ public InitialLoadExtractorJob(ISymmetricEngine engine, ThreadPoolTaskScheduler public JobDefaults getDefaults() { return new JobDefaults() .description("Extract data for initial loads") - .scheduleType(ScheduleType.PERIODIC) - .schedule(EVERY_10_SECONDS) - .startupType(StartupType.AUTOMATIC); + .schedule(EVERY_10_SECONDS); } @Override diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/JavaJob.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/JavaJob.java index 1dd99b5104..eb2d9fdf7c 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/JavaJob.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/JavaJob.java @@ -20,12 +20,21 @@ */ package org.jumpmind.symmetric.job; +import org.apache.commons.lang.StringUtils; import org.jumpmind.symmetric.ISymmetricEngine; +import org.jumpmind.symmetric.SymmetricException; import org.jumpmind.symmetric.model.JobDefinition.JobType; +import org.jumpmind.util.SimpleClassCompiler; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; public class JavaJob extends AbstractJob { + private JavaJob configuredJob; + + public JavaJob() { + super(); + } + public JavaJob(String jobName, ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) { super(jobName, engine, taskScheduler); } @@ -41,7 +50,48 @@ public JobDefaults getDefaults() { @Override public void doJob(boolean force) throws Exception { - // TODO parse expression into class. + if (configuredJob == null) { + configuredJob = compileJob(); + } + + if (configuredJob != null) { + configuredJob.doJob(force); + } } + protected JavaJob compileJob() { + String jobExression = getJobDefinition().getJobExpression(); + if (StringUtils.isEmpty(jobExression)) { + return null; + } + final String code = CODE_START + jobExression + CODE_END; + + SimpleClassCompiler compiler = new SimpleClassCompiler(); + try { + JavaJob job = (JavaJob)compiler.getCompiledClass(code); + job.setEngine(engine); + job.setJobName(getJobName()); + job.setJobDefinition(getJobDefinition()); + job.setParameterService(engine.getParameterService()); + job.setTaskScheduler(getTaskScheduler()); + return job; + } catch (Exception ex) { + throw new SymmetricException("Failed to compile Java code for job " + + getJobDefinition().getJobName() + " code: \n" + code, ex); + } + } + + final static String CODE_START = + "import org.jumpmind.symmetric.job.JavaJob;\n" + + "import org.apache.commons.lang.StringUtils;\n" + + "import org.jumpmind.symmetric.ISymmetricEngine;\n" + + "import org.jumpmind.symmetric.model.JobDefinition.JobType;\n" + + "import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;\n" + + "\n" + + "public class CustomJavaJob extends JavaJob {\n" + + "\n " + + " public void doJob(boolean force) throws Exception {\n"; + final static String CODE_END = + "\n}\n}"; + } diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/JobCreator.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/JobCreator.java index 036d1cb5fb..a5483554f8 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/JobCreator.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/JobCreator.java @@ -35,9 +35,12 @@ public IJob createJob(JobDefinition jobDefinition, ISymmetricEngine engine, Thre AbstractJob job = null; if (jobDefinition.getJobType() == JobType.BSH) { job = new BshJob(jobDefinition.getJobName(), engine, taskScheduler); - } else if (jobDefinition.getJobType() == JobType.JAVA - || jobDefinition.getJobType() == JobType.BUILT_IN) { + } else if (jobDefinition.getJobType() == JobType.SQL) { + job = new SqlJob(jobDefinition.getJobName(), engine, taskScheduler); + } else if (jobDefinition.getJobType() == JobType.BUILT_IN) { job = instantiateJavaJob(jobDefinition, engine, taskScheduler); + } else if (jobDefinition.getJobType() == JobType.JAVA) { + job = new JavaJob(jobDefinition.getJobName(), engine, taskScheduler); } else { throw new SymmetricException("Unknown job type " + jobDefinition.getJobType()); } @@ -45,7 +48,7 @@ public IJob createJob(JobDefinition jobDefinition, ISymmetricEngine engine, Thre job.setJobDefinition(jobDefinition); return job; } - + protected AbstractJob instantiateJavaJob(JobDefinition jobDefinition, ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) { String className = jobDefinition.getJobExpression(); diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/JobDefaults.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/JobDefaults.java index 87f6291659..d07ae15a08 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/JobDefaults.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/JobDefaults.java @@ -20,9 +20,6 @@ */ package org.jumpmind.symmetric.job; -import org.jumpmind.symmetric.model.JobDefinition.ScheduleType; -import org.jumpmind.symmetric.model.JobDefinition.StartupType; - public class JobDefaults { public static final String EVERY_5_MINUTES = "0 0/5 * * * *"; @@ -33,27 +30,21 @@ public class JobDefaults { public static final String EVERY_HOUR = "3600000"; public static final String EVERY_NIGHT_AT_MIDNIGHT = "0 0 0 * * *"; - private ScheduleType scheduleType; private String schedule; - private StartupType startupType; private boolean requiresRegisteration = true; private String description; + private boolean enabled = true; public JobDefaults() {} - public JobDefaults scheduleType(ScheduleType scheduleType) { - this.scheduleType = scheduleType; - return this; - } - public JobDefaults schedule(String schedule) { this.schedule = schedule; return this; } - - public JobDefaults startupType(StartupType startupType) { - this.startupType = startupType; - return this; + + public JobDefaults enabled(boolean enabled) { + this.enabled = enabled; + return this; } public JobDefaults requiresRegisteration(boolean requiresRegisteration) { @@ -66,18 +57,10 @@ public JobDefaults description(String description) { return this; } - public ScheduleType getScheduleType() { - return scheduleType; - } - public String getSchedule() { return schedule; } - public StartupType getStartupType() { - return startupType; - } - public boolean isRequiresRegisteration() { return requiresRegisteration; } @@ -85,5 +68,28 @@ public boolean isRequiresRegisteration() { public String getDescription() { return description; } + + public boolean isEnabled() { + return enabled; + } + + public static String getJobNameParameter(String name) { + if (name != null) { + return name.toLowerCase().replace(' ', '.'); + } else { + return null; + } + } + public static String getStartParameter(String name) { + return String.format("start.%s.job", getJobNameParameter(name)); + } + + public static String getPeriodicParameter(String name) { + return String.format("job.%s.period.time.ms", getJobNameParameter(name)); + } + + public static String getCronParameter(String name) { + return String.format("job.%s.cron", getJobNameParameter(name)); + } } diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/JobManager.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/JobManager.java index 6263e55e63..013a3c8e60 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/JobManager.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/JobManager.java @@ -24,10 +24,11 @@ import java.util.List; import java.util.Map; +import org.apache.commons.lang.StringUtils; import org.jumpmind.symmetric.ISymmetricEngine; +import org.jumpmind.symmetric.SymmetricException; import org.jumpmind.symmetric.db.ISymmetricDialect; import org.jumpmind.symmetric.model.JobDefinition; -import org.jumpmind.symmetric.model.JobDefinition.StartupType; import org.jumpmind.symmetric.service.impl.AbstractService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,12 +79,15 @@ public void init() { List jobDefitions = loadJobs(engine); BuiltInJobs builtInJobs = new BuiltInJobs(); - jobDefitions = builtInJobs.syncBuiltInJobs(jobDefitions, engine, taskScheduler); // TODO save built in hobs + jobDefitions = builtInJobs.syncBuiltInJobs(jobDefitions, engine, taskScheduler); // TODO save built in jobs this.jobs = new ArrayList(); for (JobDefinition jobDefinition : jobDefitions) { - jobs.add(jobCreator.createJob(jobDefinition, engine, taskScheduler)); + IJob job = jobCreator.createJob(jobDefinition, engine, taskScheduler); + if (job != null) { + jobs.add(job); + } } } @@ -111,8 +115,7 @@ public IJob getJob(String name) { @Override public synchronized void startJobs() { for (IJob job : jobs) { - if (isAutoStartConfigured(job) - && StartupType.AUTOMATIC == job.getJobDefinition().getStartupType()) { + if (isAutoStartConfigured(job)) { job.start(); } else { log.info("Job {} not configured for auto start", job.getName()); @@ -125,7 +128,6 @@ public synchronized void startJobs() { public synchronized void startJobsAfterConfigChange() { for (IJob job : jobs) { if (isAutoStartConfigured(job) - && StartupType.AUTOMATIC == job.getJobDefinition().getStartupType() && !job.isStarted()) { job.start(); } @@ -133,8 +135,13 @@ public synchronized void startJobsAfterConfigChange() { } protected boolean isAutoStartConfigured(IJob job) { - String key = "start." + job.getName(); - return engine.getParameterService().is(key, true); + String startParameter = job.getJobDefinition().getStartParameter(); + String autoStartValue = engine.getParameterService().getString(startParameter); + if (StringUtils.isEmpty(autoStartValue) && job.getDeprecatedStartParameter() != null) { + startParameter = job.getDeprecatedStartParameter(); + } + + return engine.getParameterService().is(startParameter, true); } @Override @@ -160,9 +167,8 @@ public List getJobs() { @Override public void saveJob(JobDefinition job) { - Object[] args = { job.getDescription(), job.getJobType().toString(), job.getSchedule(), - job.getStartupType().toString(), job.getScheduleType().toString(), job.getJobExpression(), - job.getCreateBy(), job.getLastUpdateBy(), job.getJobName() }; + Object[] args = { job.getDescription(), job.getJobType().toString(), + job.getJobExpression(), job.getCreateBy(), job.getLastUpdateBy(), job.getJobName() }; if (sqlTemplate.update(getSql("updateJobSql"), args) == 0) { sqlTemplate.update(getSql("insertJobSql"), args); @@ -170,4 +176,15 @@ public void saveJob(JobDefinition job) { init(); startJobsAfterConfigChange(); } + + @Override + public void removeJob(String name) { + Object[] args = { name }; + + if (sqlTemplate.update(getSql("deleteJobSql"), args) == 0) { + throw new SymmetricException("Failed to remove job " + name + ". Note that BUILT_IN jobs cannot be removed."); + } + init(); + startJobsAfterConfigChange(); + } } diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/JobManagerSqlMap.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/JobManagerSqlMap.java index 569f90a62c..ce1ea57cef 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/JobManagerSqlMap.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/JobManagerSqlMap.java @@ -24,7 +24,6 @@ import org.jumpmind.db.platform.IDatabasePlatform; import org.jumpmind.symmetric.service.impl.AbstractSqlMap; - public class JobManagerSqlMap extends AbstractSqlMap { public JobManagerSqlMap(IDatabasePlatform platform, Map replacementTokens) { @@ -33,13 +32,14 @@ public JobManagerSqlMap(IDatabasePlatform platform, Map replacem putSql("loadCustomJobs", "select * from $(job) order by job_name"); - putSql("insertJobSql", "insert into $(job) (external_id, node_group_id, description, job_type, schedule, startup_type, schedule_type, job_expression, create_by, create_time, " + putSql("insertJobSql", "insert into $(job) (description, job_type, job_expression, create_by, create_time, " + "last_update_by, last_update_time, job_name) " + - "values ('ALL', 'ALL', ?, ?, ?, ?, ?, ?, ?, current_timestamp, ?, current_timestamp, ?)"); + "values (?, ?, ?, ?, current_timestamp, ?, current_timestamp, ?)"); + + putSql("updateJobSql", "update $(job) set description = ?, job_type = ?, job_expression = ?, " + + "create_by = ?, last_update_by = ?, last_update_time = current_timestamp where job_name = ?"); - putSql("updateJobSql", "update $(job) set description = ?, job_type = ?, " + - "schedule = ?, startup_type = ?, schedule_type = ?, job_expression = ?, create_by = ?, last_update_by = ?, " + - "last_update_time = current_timestamp where job_name = ?"); + putSql("deleteJobSql", "delete from $(job) where job_name = ? and job_type <> 'BUILT_IN'"); } } diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/JobMapper.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/JobMapper.java index a9bc52936d..24c49ae84a 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/JobMapper.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/JobMapper.java @@ -24,8 +24,6 @@ import org.jumpmind.db.sql.Row; import org.jumpmind.symmetric.model.JobDefinition; import org.jumpmind.symmetric.model.JobDefinition.JobType; -import org.jumpmind.symmetric.model.JobDefinition.ScheduleType; -import org.jumpmind.symmetric.model.JobDefinition.StartupType; public class JobMapper implements ISqlRowMapper { @@ -33,12 +31,7 @@ public class JobMapper implements ISqlRowMapper { public JobDefinition mapRow(Row row) { JobDefinition jobDefinition = new JobDefinition(); jobDefinition.setJobName(row.getString("job_name")); - jobDefinition.setExternalId(row.getString("external_id")); - jobDefinition.setNodeGroupId(row.getString("node_group_id")); jobDefinition.setJobType(JobType.valueOf(row.getString("job_type"))); - jobDefinition.setStartupType(StartupType.valueOf(row.getString("startup_type"))); - jobDefinition.setScheduleType(ScheduleType.valueOf(row.getString("schedule_type"))); - jobDefinition.setSchedule(row.getString("schedule")); jobDefinition.setRequiresRegistration(row.getBoolean("requires_registration")); jobDefinition.setJobExpression(row.getString("job_expression")); jobDefinition.setDescription(row.getString("description")); diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/MonitorJob.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/MonitorJob.java index b4c9a53627..e8fd19872d 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/MonitorJob.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/MonitorJob.java @@ -20,10 +20,9 @@ */ package org.jumpmind.symmetric.job; -import static org.jumpmind.symmetric.job.JobDefaults.*; +import static org.jumpmind.symmetric.job.JobDefaults.EVERY_10_SECONDS; + import org.jumpmind.symmetric.ISymmetricEngine; -import org.jumpmind.symmetric.model.JobDefinition.ScheduleType; -import org.jumpmind.symmetric.model.JobDefinition.StartupType; import org.jumpmind.symmetric.service.ClusterConstants; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @@ -36,9 +35,7 @@ public MonitorJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler @Override public JobDefaults getDefaults() { return new JobDefaults() - .scheduleType(ScheduleType.PERIODIC) .schedule(EVERY_10_SECONDS) - .startupType(StartupType.AUTOMATIC) .description("Run monitors and generate notifications"); } diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/OfflinePullJob.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/OfflinePullJob.java index eecde9f94b..3fc3035237 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/OfflinePullJob.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/OfflinePullJob.java @@ -20,10 +20,9 @@ */ package org.jumpmind.symmetric.job; -import static org.jumpmind.symmetric.job.JobDefaults.*; +import static org.jumpmind.symmetric.job.JobDefaults.EVERY_MINUTE; + import org.jumpmind.symmetric.ISymmetricEngine; -import org.jumpmind.symmetric.model.JobDefinition.ScheduleType; -import org.jumpmind.symmetric.model.JobDefinition.StartupType; import org.jumpmind.symmetric.service.ClusterConstants; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @@ -40,9 +39,8 @@ public OfflinePullJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskSched public JobDefaults getDefaults() { return new JobDefaults() .requiresRegisteration(false) - .scheduleType(ScheduleType.PERIODIC) .schedule(EVERY_MINUTE) - .startupType(StartupType.MANUAL) + .enabled(false) .description("Loads in offline batch files"); } diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/OfflinePushJob.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/OfflinePushJob.java index bbaf805db5..84eecbc14d 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/OfflinePushJob.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/OfflinePushJob.java @@ -20,10 +20,9 @@ */ package org.jumpmind.symmetric.job; -import static org.jumpmind.symmetric.job.JobDefaults.*; +import static org.jumpmind.symmetric.job.JobDefaults.EVERY_MINUTE; + import org.jumpmind.symmetric.ISymmetricEngine; -import org.jumpmind.symmetric.model.JobDefinition.ScheduleType; -import org.jumpmind.symmetric.model.JobDefinition.StartupType; import org.jumpmind.symmetric.service.ClusterConstants; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @@ -39,9 +38,8 @@ public OfflinePushJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskSched @Override public JobDefaults getDefaults() { return new JobDefaults() - .scheduleType(ScheduleType.PERIODIC) .schedule(EVERY_MINUTE) - .startupType(StartupType.MANUAL) + .enabled(false) .description("Creates offline batch files"); } diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/OutgoingPurgeJob.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/OutgoingPurgeJob.java index 265eafd9ef..38c83b670d 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/OutgoingPurgeJob.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/OutgoingPurgeJob.java @@ -24,8 +24,7 @@ import static org.jumpmind.symmetric.job.JobDefaults.EVERY_NIGHT_AT_MIDNIGHT; import org.jumpmind.symmetric.ISymmetricEngine; -import org.jumpmind.symmetric.model.JobDefinition.ScheduleType; -import org.jumpmind.symmetric.model.JobDefinition.StartupType; +import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.service.ClusterConstants; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @@ -41,9 +40,7 @@ public OutgoingPurgeJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskSch @Override public JobDefaults getDefaults() { return new JobDefaults() - .scheduleType(ScheduleType.CRON) .schedule(EVERY_NIGHT_AT_MIDNIGHT) - .startupType(StartupType.AUTOMATIC) .description("Purge sync'd outgoing data"); } @@ -52,4 +49,10 @@ public void doJob(boolean force) throws Exception { engine.getPurgeService().purgeOutgoing(force); } + @Override + public String getDeprecatedStartParameter() { + return ParameterConstants.START_PURGE_JOB_38; + } + + } \ No newline at end of file diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/PullJob.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/PullJob.java index 8e441e9800..5e30d7c234 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/PullJob.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/PullJob.java @@ -23,8 +23,6 @@ import static org.jumpmind.symmetric.job.JobDefaults.EVERY_30_SECONDS; import org.jumpmind.symmetric.ISymmetricEngine; -import org.jumpmind.symmetric.model.JobDefinition.ScheduleType; -import org.jumpmind.symmetric.model.JobDefinition.StartupType; import org.jumpmind.symmetric.service.ClusterConstants; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @@ -41,9 +39,7 @@ public PullJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) { public JobDefaults getDefaults() { return new JobDefaults() .requiresRegisteration(false) - .scheduleType(ScheduleType.PERIODIC) .schedule(EVERY_30_SECONDS) - .startupType(StartupType.AUTOMATIC) .description("Pull data from other nodes"); } diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/PushJob.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/PushJob.java index 5f7a1f668d..f7483d0f59 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/PushJob.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/PushJob.java @@ -23,8 +23,6 @@ import static org.jumpmind.symmetric.job.JobDefaults.EVERY_30_SECONDS; import org.jumpmind.symmetric.ISymmetricEngine; -import org.jumpmind.symmetric.model.JobDefinition.ScheduleType; -import org.jumpmind.symmetric.model.JobDefinition.StartupType; import org.jumpmind.symmetric.service.ClusterConstants; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @@ -40,9 +38,7 @@ public PushJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) { @Override public JobDefaults getDefaults() { return new JobDefaults() - .scheduleType(ScheduleType.PERIODIC) .schedule(EVERY_30_SECONDS) - .startupType(StartupType.AUTOMATIC) .description("Push batches to other nodes"); } diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/RefreshCacheJob.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/RefreshCacheJob.java index d936fd5c02..58f5d21a38 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/RefreshCacheJob.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/RefreshCacheJob.java @@ -21,8 +21,6 @@ package org.jumpmind.symmetric.job; import org.jumpmind.symmetric.ISymmetricEngine; -import org.jumpmind.symmetric.model.JobDefinition.ScheduleType; -import org.jumpmind.symmetric.model.JobDefinition.StartupType; import org.jumpmind.symmetric.service.ClusterConstants; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @@ -38,9 +36,7 @@ public RefreshCacheJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskSche @Override public JobDefaults getDefaults() { return new JobDefaults() - .scheduleType(ScheduleType.CRON) .schedule("0/30 * * * * *") - .startupType(StartupType.AUTOMATIC) .description("Refresh configuration cache"); } diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/ReportStatusJob.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/ReportStatusJob.java index aa7416a0e7..fc2d5de372 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/ReportStatusJob.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/ReportStatusJob.java @@ -20,15 +20,14 @@ */ package org.jumpmind.symmetric.job; +import static org.jumpmind.symmetric.job.JobDefaults.EVERY_5_MINUTES; + import java.util.Collections; import java.util.HashMap; import java.util.Map; -import static org.jumpmind.symmetric.job.JobDefaults.*; import org.jumpmind.symmetric.ISymmetricEngine; import org.jumpmind.symmetric.common.ParameterConstants; -import org.jumpmind.symmetric.model.JobDefinition.ScheduleType; -import org.jumpmind.symmetric.model.JobDefinition.StartupType; import org.jumpmind.symmetric.model.NetworkedNode; import org.jumpmind.symmetric.model.Node; import org.jumpmind.symmetric.service.ClusterConstants; @@ -52,9 +51,8 @@ public ReportStatusJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskSche @Override public JobDefaults getDefaults() { return new JobDefaults() - .scheduleType(ScheduleType.CRON) .schedule(EVERY_5_MINUTES) - .startupType(StartupType.DISABLED) + .enabled(false) .description("Related to hybrid-pull"); } diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/RouterJob.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/RouterJob.java index 88276030b2..94690457ba 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/RouterJob.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/RouterJob.java @@ -23,8 +23,7 @@ import static org.jumpmind.symmetric.job.JobDefaults.EVERY_10_SECONDS; import org.jumpmind.symmetric.ISymmetricEngine; -import org.jumpmind.symmetric.model.JobDefinition.ScheduleType; -import org.jumpmind.symmetric.model.JobDefinition.StartupType; +import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.service.ClusterConstants; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @@ -41,9 +40,7 @@ public RouterJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) @Override public JobDefaults getDefaults() { return new JobDefaults() - .scheduleType(ScheduleType.PERIODIC) .schedule(EVERY_10_SECONDS) - .startupType(StartupType.AUTOMATIC) .description("Create outgoing batches"); } @@ -51,5 +48,10 @@ public JobDefaults getDefaults() { public void doJob(boolean force) throws Exception { engine.getRouterService().routeData(force); } + + @Override + public String getDeprecatedStartParameter() { + return ParameterConstants.START_ROUTE_JOB_38; + } } \ No newline at end of file diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/SqlJob.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/SqlJob.java new file mode 100644 index 0000000000..e47c6c5db3 --- /dev/null +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/SqlJob.java @@ -0,0 +1,60 @@ +/** + * Licensed to JumpMind Inc under one or more contributor + * license agreements. See the NOTICE file distributed + * with this work for additional information regarding + * copyright ownership. JumpMind Inc licenses this file + * to you under the GNU General Public License, version 3.0 (GPLv3) + * (the "License"); you may not use this file except in compliance + * with the License. + * + * You should have received a copy of the GNU General Public License, + * version 3.0 (GPLv3) along with this library; if not, see + * . + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.jumpmind.symmetric.job; + +import org.jumpmind.db.sql.ISqlTemplate; +import org.jumpmind.db.sql.SqlScript; +import org.jumpmind.symmetric.ISymmetricEngine; +import org.jumpmind.symmetric.model.JobDefinition.JobType; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; + +public class SqlJob extends AbstractJob { + + static final boolean AUTO_COMMIT = true; + + public SqlJob(String jobName, ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) { + super(jobName, engine, taskScheduler); + } + + public JobType getJobType() { + return JobType.SQL; + } + + @Override + protected void doJob(boolean force) throws Exception { + try { + if (getJobDefinition().getJobExpression() != null) { + ISqlTemplate sqlTemplate = engine.getDatabasePlatform().getSqlTemplate(); + SqlScript script = new SqlScript(getJobDefinition().getJobExpression(), sqlTemplate, true, null); + script.execute(AUTO_COMMIT); + } + } catch (Exception ex) { + log.error("Exception during sql job '" + this.getName() + "'\n" + getJobDefinition().getJobExpression(), ex); + } + } + + + @Override + public JobDefaults getDefaults() { + return new JobDefaults(); + } + +} diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/StageManagementJob.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/StageManagementJob.java index 1f4cc2ada4..19b2dba793 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/StageManagementJob.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/StageManagementJob.java @@ -25,8 +25,6 @@ import org.jumpmind.symmetric.ISymmetricEngine; import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.io.stage.IStagingManager; -import org.jumpmind.symmetric.model.JobDefinition.ScheduleType; -import org.jumpmind.symmetric.model.JobDefinition.StartupType; import org.jumpmind.symmetric.service.ClusterConstants; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @@ -39,9 +37,7 @@ public StageManagementJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskS @Override public JobDefaults getDefaults() { return new JobDefaults() - .scheduleType(ScheduleType.PERIODIC) .schedule(EVERY_FIFTEEN_MINUTES) - .startupType(StartupType.AUTOMATIC) .description("Purges the staging area"); } diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/StatisticFlushJob.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/StatisticFlushJob.java index 3f490aa459..278861a8a8 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/StatisticFlushJob.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/StatisticFlushJob.java @@ -25,8 +25,6 @@ import org.jumpmind.symmetric.ISymmetricEngine; import org.jumpmind.symmetric.common.ParameterConstants; -import org.jumpmind.symmetric.model.JobDefinition.ScheduleType; -import org.jumpmind.symmetric.model.JobDefinition.StartupType; import org.jumpmind.symmetric.service.ClusterConstants; import org.jumpmind.symmetric.util.LogSummaryAppenderUtils; import org.jumpmind.util.LogSummaryAppender; @@ -45,9 +43,7 @@ public StatisticFlushJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskSc @Override public JobDefaults getDefaults() { return new JobDefaults() - .scheduleType(ScheduleType.CRON) .schedule(EVERY_5_MINUTES) - .startupType(StartupType.AUTOMATIC) .description("Write statistics out to the database"); } @@ -65,6 +61,5 @@ protected void purgeLogSummaryAppender() { - engine.getParameterService().getLong(ParameterConstants.PURGE_LOG_SUMMARY_MINUTES, 60) * 60000); } - } - + } } \ No newline at end of file diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/SyncTriggersJob.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/SyncTriggersJob.java index ae46192047..ed9daee844 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/SyncTriggersJob.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/SyncTriggersJob.java @@ -21,10 +21,9 @@ package org.jumpmind.symmetric.job; -import static org.jumpmind.symmetric.job.JobDefaults.*; +import static org.jumpmind.symmetric.job.JobDefaults.EVERY_NIGHT_AT_MIDNIGHT; + import org.jumpmind.symmetric.ISymmetricEngine; -import org.jumpmind.symmetric.model.JobDefinition.ScheduleType; -import org.jumpmind.symmetric.model.JobDefinition.StartupType; import org.jumpmind.symmetric.service.ClusterConstants; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @@ -40,9 +39,7 @@ public SyncTriggersJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskSche @Override public JobDefaults getDefaults() { return new JobDefaults() - .scheduleType(ScheduleType.CRON) .schedule(EVERY_NIGHT_AT_MIDNIGHT) - .startupType(StartupType.AUTOMATIC) .description("Sync trigger config with physical database triggers"); } @@ -50,5 +47,5 @@ public JobDefaults getDefaults() { public void doJob(boolean force) throws Exception { engine.getTriggerRouterService().syncTriggers(); } - + } \ No newline at end of file diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/WatchdogJob.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/WatchdogJob.java index b160704098..e03a66ad25 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/WatchdogJob.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/WatchdogJob.java @@ -20,10 +20,9 @@ */ package org.jumpmind.symmetric.job; -import static org.jumpmind.symmetric.job.JobDefaults.*; +import static org.jumpmind.symmetric.job.JobDefaults.EVERY_HOUR; + import org.jumpmind.symmetric.ISymmetricEngine; -import org.jumpmind.symmetric.model.JobDefinition.ScheduleType; -import org.jumpmind.symmetric.model.JobDefinition.StartupType; import org.jumpmind.symmetric.service.ClusterConstants; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @@ -41,9 +40,7 @@ public WatchdogJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskSchedule public JobDefaults getDefaults() { return new JobDefaults() .requiresRegisteration(false) - .scheduleType(ScheduleType.PERIODIC) .schedule(EVERY_HOUR) - .startupType(StartupType.AUTOMATIC) .description("Disable nodes that have been offline for a while"); } diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/util/SnapshotUtil.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/util/SnapshotUtil.java index 10d03d3275..d123329b24 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/util/SnapshotUtil.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/util/SnapshotUtil.java @@ -559,7 +559,7 @@ protected static void writeJobsStats(ISymmetricEngine engine, File tmpDir) { if (lock != null) { lastServerId = lock.getLastLockingServerId(); } - String schedule = job.getJobDefinition().getSchedule(); + String schedule = job.getSchedule(); String lastFinishTime = getLastFinishTime(job, lock); writer.write(StringUtils.rightPad(job.getName().replace("_", " "), 30)+ diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java index 0ffad9495b..b72647ce3c 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java @@ -51,9 +51,12 @@ private ParameterConstants() { public final static String JDBC_ISOLATION_LEVEL = "db.jdbc.isolation.level"; public final static String START_PULL_JOB = "start.pull.job"; - public final static String START_PUSH_JOB = "start.push.job"; - public final static String START_PURGE_JOB = "start.purge.job"; - public final static String START_ROUTE_JOB = "start.route.job"; + public final static String START_PUSH_JOB = "start.push.job"; + public final static String START_PURGE_OUTGOING_JOB = "start.purge.incoming.job"; // In <= 3.8m was start.purge.outgoing.job + public final static String START_PURGE_INCOMING_JOB = "start.purge.outgoing.job"; // In <= 3.8, was start.purge.outgoing.job + public final static String START_PURGE_JOB_38 = "start.purge.incoming.job"; + public final static String START_ROUTE_JOB = "start.routing.job"; // In <= 3.8, was start.route.job + public final static String START_ROUTE_JOB_38 = "start.route.job"; public final static String START_HEARTBEAT_JOB = "start.heartbeat.job"; public final static String START_SYNCTRIGGERS_JOB = "start.synctriggers.job"; public final static String START_STATISTIC_FLUSH_JOB = "start.stat.flush.job"; @@ -66,7 +69,7 @@ private ParameterConstants() { public final static String START_REFRESH_CACHE_JOB = "start.refresh.cache.job"; public final static String START_FILE_SYNC_TRACKER_JOB = "start.file.sync.tracker.job"; public final static String START_FILE_SYNC_PUSH_JOB = "start.file.sync.push.job"; - public final static String START_FILE_SYNC_PULL_JOB = "start.file.sync.pull.job"; + public final static String START_FILE_SYNC_PULL_JOB = "start.file.sync.pull.job"; public final static String PULL_THREAD_COUNT_PER_SERVER = "pull.thread.per.server.count"; public final static String PULL_MINIMUM_PERIOD_MS = "pull.period.minimum.ms"; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/job/IJob.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/job/IJob.java index 235e3de8c6..618b4774fc 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/job/IJob.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/job/IJob.java @@ -29,6 +29,12 @@ public interface IJob { public String getName(); public JobDefinition getJobDefinition(); + + public boolean isCronSchedule(); + + public boolean isPeriodicSchedule(); + + public String getSchedule(); public void start(); @@ -45,6 +51,8 @@ public interface IJob { public long getLastExecutionTimeInMs(); public Date getLastFinishTime(); + + public Date getNextExecutionTime(); public boolean isRunning(); @@ -56,4 +64,6 @@ public interface IJob { public boolean invoke(boolean force); + public String getDeprecatedStartParameter(); + } \ No newline at end of file diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/job/IJobManager.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/job/IJobManager.java index 3cdd90380f..59d611397a 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/job/IJobManager.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/job/IJobManager.java @@ -46,4 +46,6 @@ public interface IJobManager { public void saveJob(JobDefinition jobDefinition); + public void removeJob(String name); + } \ No newline at end of file diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/JobDefinition.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/JobDefinition.java index 15ed9aab21..6bcdb4c730 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/JobDefinition.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/JobDefinition.java @@ -26,32 +26,26 @@ public class JobDefinition { - public enum JobType {BUILT_IN, BSH, JAVA} - public enum ScheduleType {PERIODIC, CRON} - public enum StartupType {AUTOMATIC, MANUAL, DISABLED} + public enum JobType {BUILT_IN, BSH, JAVA, SQL} private String jobName; - private String externalId; - private String nodeGroupId; private JobType jobType; - private ScheduleType scheduleType; - private String schedule; private boolean requiresRegistration; private String jobExpression; private String description; - private StartupType startupType; private String createBy; private Date createTime; private String lastUpdateBy; private Date lastUpdateTime; + private boolean automaticStartup; + private transient String schedule; - public void commitChanges() { - if (NumberUtils.isNumber(getSchedule())) { - setScheduleType(ScheduleType.PERIODIC); - } else { -// CronTrigger cronTrigger = new CronTrigger(getSchedule()); // for validation - setScheduleType(ScheduleType.CRON); - } + public boolean isCronSchedule() { + return !isPeriodicSchedule(); + } + + public boolean isPeriodicSchedule() { + return NumberUtils.isDigits(schedule); } public String getJobName() { @@ -60,36 +54,12 @@ public String getJobName() { public void setJobName(String jobName) { this.jobName = jobName; } - public String getExternalId() { - return externalId; - } - public void setExternalId(String externalId) { - this.externalId = externalId; - } - public String getNodeGroupId() { - return nodeGroupId; - } - public void setNodeGroupId(String nodeGroupId) { - this.nodeGroupId = nodeGroupId; - } public JobType getJobType() { return jobType; } public void setJobType(JobType jobType) { this.jobType = jobType; } - public ScheduleType getScheduleType() { - return scheduleType; - } - public void setScheduleType(ScheduleType scheduleType) { - this.scheduleType = scheduleType; - } - public String getSchedule() { - return schedule; - } - public void setSchedule(String schedule) { - this.schedule = schedule; - } public boolean isRequiresRegistration() { return requiresRegistration; } @@ -132,11 +102,39 @@ public Date getLastUpdateTime() { public void setLastUpdateTime(Date lastUpdateTime) { this.lastUpdateTime = lastUpdateTime; } - public StartupType getStartupType() { - return startupType; + + public String getSchedule() { + return schedule; + } + + public void setSchedule(String schedule) { + this.schedule = schedule; + } + + public boolean isAutomaticStartup() { + return automaticStartup; + } + + public void setAutomaticStartup(boolean automaticStartup) { + this.automaticStartup = automaticStartup; + } + + public static String getJobNameParameter(String name) { + if (name != null) { + return name.toLowerCase().replace(' ', '.'); + } else { + return null; + } + } + public String getStartParameter() { + return String.format("start.%s.job", getJobNameParameter(jobName)); } - public void setStartupType(StartupType startupType) { - this.startupType = startupType; + + public String getPeriodicParameter() { + return String.format("job.%s.period.time.ms", getJobNameParameter(jobName)); } - + + public String getCronParameter() { + return String.format("job.%s.cron", getJobNameParameter(jobName)); + } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/ClusterConstants.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/ClusterConstants.java index e7d6ebd384..4c742d6490 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/ClusterConstants.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/ClusterConstants.java @@ -26,7 +26,7 @@ public class ClusterConstants { public static final String STAGE_MANAGEMENT = "Stage Management"; - public static final String ROUTE = "Route"; + public static final String ROUTE = "Routing"; public static final String PUSH = "Push"; public static final String PULL = "Pull"; public static final String OFFLINE_PUSH = "Offline Push"; @@ -39,12 +39,12 @@ public class ClusterConstants { public static final String PURGE_DATA_GAPS = "Purge Data Gaps"; public static final String HEARTBEAT = "Heartbeat"; public static final String INITIAL_LOAD_EXTRACT = "Initial Load Extract"; - public static final String SYNC_TRIGGERS = "Sync Triggers"; + public static final String SYNC_TRIGGERS = "SyncTriggers"; public static final String WATCHDOG = "Watchdog"; - public static final String STATISTICS = "Statistics"; - public static final String FILE_SYNC_TRACKER = "File Sync Tracker"; - public static final String FILE_SYNC_PULL = "File Sync Pull"; - public static final String FILE_SYNC_PUSH = "File Sync"; + public static final String STATISTICS = "Stat Flush"; + public static final String FILE_SYNC_TRACKER = "File Sync Tracker";// + public static final String FILE_SYNC_PULL = "File Sync Pull";// + public static final String FILE_SYNC_PUSH = "File Sync Push";// public static final String MONITOR = "Monitor"; public static final String FILE_SYNC_SCAN = "FILE_SYNC_SCAN"; diff --git a/symmetric-core/src/main/resources/symmetric-schema.xml b/symmetric-core/src/main/resources/symmetric-schema.xml index b9f58e7da0..69103aa7d5 100644 --- a/symmetric-core/src/main/resources/symmetric-schema.xml +++ b/symmetric-core/src/main/resources/symmetric-schema.xml @@ -860,12 +860,7 @@ - - - - - diff --git a/symmetric-util/src/main/java/org/jumpmind/util/SimpleClassCompiler.java b/symmetric-util/src/main/java/org/jumpmind/util/SimpleClassCompiler.java index 923d351ef0..a2cdef0b30 100644 --- a/symmetric-util/src/main/java/org/jumpmind/util/SimpleClassCompiler.java +++ b/symmetric-util/src/main/java/org/jumpmind/util/SimpleClassCompiler.java @@ -91,7 +91,7 @@ public Object getCompiledClass(String javaCode) throws Exception { JavaCompiler compiler = ToolProvider.getSystemJavaCompiler(); if (compiler == null) { - throw new SimpleClassCompilerException("Missing Java compiler: the JDK is required for compiling classes."); + throw new SimpleClassCompilerException("Missing Java compiler: the JDK (rather than just JRE) is required for compiling classes."); } JavaFileManager fileManager = new ClassFileManager(compiler.getStandardFileManager(null, null, null)); DiagnosticCollector diag = new DiagnosticCollector(); @@ -109,12 +109,14 @@ public Object getCompiledClass(String javaCode) throws Exception { throw new SimpleClassCompilerException("The '"+className+"' class could not be located"); } } else { - log.error("Compilation of '" + origClassName + "' failed"); + StringBuilder msg = new StringBuilder(256); + msg.append("Compilation of '").append(origClassName).append("' failed.\n"); + for (Diagnostic diagnostic : diag.getDiagnostics()) { - log.error(origClassName + " at line " + diagnostic.getLineNumber() + ", column " + diagnostic.getColumnNumber() + ": " + - diagnostic.getMessage(null)); + msg.append(origClassName + " at line " + diagnostic.getLineNumber() + ", column " + diagnostic.getColumnNumber() + ": " + + diagnostic.getMessage(null)).append("\n"); } - throw new SimpleClassCompilerException(diag.getDiagnostics()); + throw new SimpleClassCompilerException(msg.toString()); } }