diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/SymmetricEngine.java b/symmetric/src/main/java/org/jumpmind/symmetric/SymmetricEngine.java index 7b9de7201d..c2e1fc76af 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/SymmetricEngine.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/SymmetricEngine.java @@ -35,6 +35,7 @@ import org.jumpmind.symmetric.common.Constants; import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.db.IDbDialect; +import org.jumpmind.symmetric.job.AbstractJob; import org.jumpmind.symmetric.job.PullJob; import org.jumpmind.symmetric.job.PurgeJob; import org.jumpmind.symmetric.job.PushJob; @@ -47,6 +48,7 @@ import org.jumpmind.symmetric.service.IPurgeService; import org.jumpmind.symmetric.service.IPushService; import org.jumpmind.symmetric.service.IRegistrationService; +import org.jumpmind.symmetric.util.AppUtils; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; @@ -89,6 +91,8 @@ public class SymmetricEngine { private IDbDialect dbDialect; + private Set jobs; + private static Map registeredEnginesByUrl = new HashMap(); private static Map registeredEnginesByName = new HashMap(); @@ -126,6 +130,7 @@ protected SymmetricEngine(ApplicationContext ctx) { public void stop() { logger.info("Closing SymmetricDS externalId=" + parameterService.getExternalId() + " version=" + Version.version() + " database=" + dbDialect.getName()); + stopJobs(); removeMeFromMap(registeredEnginesByName); removeMeFromMap(registeredEnginesByUrl); DataSource ds = dbDialect.getJdbcTemplate().getDataSource(); @@ -219,39 +224,61 @@ private void startDefaultServerJMXExport() { } } + private void startJob(String name) { + if (jobs == null) { + jobs = new HashSet(); + } + logger.info("Starting " + name); + AbstractJob job = AppUtils.find(Constants.PUSH_JOB_TIMER, this); + jobs.add(job); + } + /** * Start the jobs if they are configured to be started in * symmetric.properties */ private void startJobs() { if (Boolean.TRUE.toString().equalsIgnoreCase(parameterService.getString(ParameterConstants.START_PUSH_JOB))) { - applicationContext.getBean(Constants.PUSH_JOB_TIMER); + startJob(Constants.PUSH_JOB_TIMER); } + if (Boolean.TRUE.toString().equalsIgnoreCase(parameterService.getString(ParameterConstants.START_PULL_JOB))) { - applicationContext.getBean(Constants.PULL_JOB_TIMER); + startJob(Constants.PULL_JOB_TIMER); } if (Boolean.TRUE.toString().equalsIgnoreCase(parameterService.getString(ParameterConstants.START_PURGE_JOB))) { - applicationContext.getBean(Constants.PURGE_JOB_TIMER); + startJob(Constants.PURGE_JOB_TIMER); } if (Boolean.TRUE.toString() .equalsIgnoreCase(parameterService.getString(ParameterConstants.START_HEARTBEAT_JOB))) { - applicationContext.getBean(Constants.HEARTBEAT_JOB_TIMER); + startJob(Constants.HEARTBEAT_JOB_TIMER); } if (Boolean.TRUE.toString().equalsIgnoreCase( parameterService.getString(ParameterConstants.START_SYNCTRIGGERS_JOB))) { - applicationContext.getBean(Constants.SYNC_TRIGGERS_JOB_TIMER); + startJob(Constants.SYNC_TRIGGERS_JOB_TIMER); } if (Boolean.TRUE.toString().equalsIgnoreCase( parameterService.getString(ParameterConstants.START_STATISTIC_FLUSH_JOB))) { - applicationContext.getBean(Constants.STATISTIC_FLUSH_JOB_TIMER); + startJob(Constants.STATISTIC_FLUSH_JOB_TIMER); } } + private void stopJobs() { + if (jobs != null) { + for (AbstractJob job : jobs) { + try { + job.stop(); + } catch (RuntimeException e) { + logger.error(e, e); + } + } + } + } + /** * Get a list of configured properties for Symmetric. Read-only. */ diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/job/AbstractJob.java b/symmetric/src/main/java/org/jumpmind/symmetric/job/AbstractJob.java index c6d0e80d7e..563ea1e89d 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/job/AbstractJob.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/job/AbstractJob.java @@ -27,6 +27,7 @@ import org.apache.commons.dbcp.BasicDataSource; import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.jumpmind.symmetric.SymmetricEngine; import org.jumpmind.symmetric.common.Constants; import org.jumpmind.symmetric.common.ParameterConstants; @@ -38,7 +39,9 @@ abstract public class AbstractJob extends TimerTask implements BeanFactoryAware, BeanNameAware { DataSource dataSource; - + + protected static final Log logger = LogFactory.getLog(AbstractJob.class); + private boolean needsRescheduled; private String rescheduleDelayParameter; @@ -50,6 +53,12 @@ abstract public class AbstractJob extends TimerTask implements BeanFactoryAware, private String beanName; private boolean requiresRegistration = true; + + public void stop() { + setNeedsRescheduled(false); + cancel(); + logger.info("Requested that " + beanName + " be stopped."); + } @Override public void run() {