Skip to content

Commit

Permalink
Fixed issue #14.
Browse files Browse the repository at this point in the history
  • Loading branch information
haocao committed Oct 19, 2015
1 parent c99ba2e commit d794086
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/**
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package com.dangdang.ddframe.job.internal.config;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;

import com.dangdang.ddframe.job.api.JobConfiguration;
import com.dangdang.ddframe.job.internal.listener.AbstractJobListener;
import com.dangdang.ddframe.job.internal.listener.AbstractListenerManager;
import com.dangdang.ddframe.job.schedule.JobController;
import com.dangdang.ddframe.job.schedule.JobRegistry;
import com.dangdang.ddframe.reg.base.CoordinatorRegistryCenter;

/**
* 配置文件监听管理器.
*
* @author caohao
*/
public final class ConfigurationListenerManager extends AbstractListenerManager {

private final ConfigurationNode configNode;

private final String jobName;

public ConfigurationListenerManager(final CoordinatorRegistryCenter coordinatorRegistryCenter, final JobConfiguration jobConfiguration) {
super(coordinatorRegistryCenter, jobConfiguration);
jobName = jobConfiguration.getJobName();
configNode = new ConfigurationNode(jobName);
}

@Override
public void start() {
listenCronSettingChanged();
}

void listenCronSettingChanged() {
addDataListener(new AbstractJobListener() {

@Override
protected void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path) {
if (configNode.isCronPath(path) && Type.NODE_UPDATED == event.getType()) {
String cronExpression = new String(event.getData().getData());
JobController jobController = JobRegistry.getInstance().getJob(jobName);
if (null != jobController) {
jobController.rescheduleJob(cronExpression);
}
}
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,14 @@ public boolean isMonitorExecutionPath(final String path) {
public boolean isFailoverPath(final String path) {
return jobNodePath.getFullPath(FAILOVER).equals(path);
}

/**
* 判断是否为作业调度配置路径.
*
* @param path 节点路径
* @return 是否为作业调度配置路径
*/
public boolean isCronPath(final String path) {
return jobNodePath.getFullPath(CRON).equals(path);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ void listenLeaderElection() {
@Override
protected void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path) {
if (Type.NODE_REMOVED == event.getType() && electionNode.isLeaderHostPath(path) && !leaderElectionService.hasLeader()) {
log.info("Elastic job: leader crashed, elect a new leader now.");
log.debug("Elastic job: leader crashed, elect a new leader now.");
leaderElectionService.leaderElection();
shardingService.setReshardingFlag();
log.info("Elastic job: leader election completed.");
log.debug("Elastic job: leader election completed.");
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public FailoverListenerManager(final CoordinatorRegistryCenter coordinatorRegist
public void start() {
listenJobCrashed();
listenFailoverJobCrashed();
listenFailoverSerttingsChanged();
listenFailoverSettingsChanged();
}

void listenJobCrashed() {
Expand Down Expand Up @@ -104,7 +104,7 @@ private boolean isJobCrashAndNeedFailover(final Integer item, final TreeCacheEve
return null != item && Type.NODE_REMOVED == event.getType() && !executionService.isCompleted(item) && configService.isFailover();
}

void listenFailoverSerttingsChanged() {
void listenFailoverSettingsChanged() {
addDataListener(new AbstractJobListener() {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.dangdang.ddframe.job.internal.listener;

import com.dangdang.ddframe.job.api.JobConfiguration;
import com.dangdang.ddframe.job.internal.config.ConfigurationListenerManager;
import com.dangdang.ddframe.job.internal.election.ElectionListenerManager;
import com.dangdang.ddframe.job.internal.execution.ExecutionListenerManager;
import com.dangdang.ddframe.job.internal.failover.FailoverListenerManager;
Expand All @@ -42,12 +43,15 @@ public final class ListenerManager {

private final JobOperationListenerManager jobOperationListenerManager;

private final ConfigurationListenerManager configurationListenerManager;

public ListenerManager(final CoordinatorRegistryCenter coordinatorRegistryCenter, final JobConfiguration jobConfiguration) {
electionListenerManager = new ElectionListenerManager(coordinatorRegistryCenter, jobConfiguration);
shardingListenerManager = new ShardingListenerManager(coordinatorRegistryCenter, jobConfiguration);
executionListenerManager = new ExecutionListenerManager(coordinatorRegistryCenter, jobConfiguration);
failoverListenerManager = new FailoverListenerManager(coordinatorRegistryCenter, jobConfiguration);
jobOperationListenerManager = new JobOperationListenerManager(coordinatorRegistryCenter, jobConfiguration);
configurationListenerManager = new ConfigurationListenerManager(coordinatorRegistryCenter, jobConfiguration);
}

/**
Expand All @@ -59,5 +63,6 @@ public void startAllListeners() {
executionListenerManager.start();
failoverListenerManager.start();
jobOperationListenerManager.start();
configurationListenerManager.start();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@
import lombok.extern.slf4j.Slf4j;

import org.quartz.CronScheduleBuilder;
import org.quartz.CronTrigger;
import org.quartz.JobBuilder;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.TriggerKey;
import org.quartz.impl.StdSchedulerFactory;

import com.dangdang.ddframe.job.api.AbstractElasticJob;
Expand All @@ -47,6 +49,7 @@
import com.dangdang.ddframe.job.internal.sharding.ShardingService;
import com.dangdang.ddframe.job.internal.statistics.StatisticsService;
import com.dangdang.ddframe.reg.base.CoordinatorRegistryCenter;
import com.google.common.base.Joiner;

/**
* 作业控制器.
Expand All @@ -56,6 +59,10 @@
@Slf4j
public class JobController {

private static final String SCHEDULER_INSTANCE_NAME_SUFFIX = "Scheduler";

private static final String CRON_TRIGGER_INDENTITY_SUFFIX = "Trigger";

private final JobConfiguration jobConfiguration;

private final ListenerManager listenerManager;
Expand All @@ -82,8 +89,6 @@ public class JobController {

private JobDetail jobDetail;

private Trigger trigger;

public JobController(final CoordinatorRegistryCenter coordinatorRegistryCenter, final JobConfiguration jobConfiguration) {
this.jobConfiguration = jobConfiguration;
listenerManager = new ListenerManager(coordinatorRegistryCenter, jobConfiguration);
Expand All @@ -105,10 +110,9 @@ public void init() {
log.debug("Elastic job: job controller init, job name is: {}.", jobConfiguration.getJobName());
registerElasticEnv();
jobDetail = createJobDetail();
trigger = createTrigger();
try {
scheduler = initializeScheduler(jobDetail.getKey().toString());
scheduleJob();
scheduleJob(createTrigger(configService.getCron()));
} catch (final SchedulerException ex) {
throw new JobException(ex);
}
Expand Down Expand Up @@ -136,15 +140,15 @@ private JobDetail createJobDetail() {
return result;
}

private Trigger createTrigger() {
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(configService.getCron());
private CronTrigger createTrigger(String cronExpression) {
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression);
if (configService.isMisfire()) {
cronScheduleBuilder = cronScheduleBuilder.withMisfireHandlingInstructionFireAndProceed();
} else {
cronScheduleBuilder = cronScheduleBuilder.withMisfireHandlingInstructionDoNothing();
}
return TriggerBuilder.newTrigger()
.withIdentity(jobConfiguration.getJobName() + "_Trigger")
.withIdentity(Joiner.on("_").join(jobConfiguration.getJobName(), CRON_TRIGGER_INDENTITY_SUFFIX))
.withSchedule(cronScheduleBuilder).build();
}

Expand All @@ -160,7 +164,7 @@ private Properties getBaseQuartzProperties(final String jobName) {
Properties result = new Properties();
result.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
result.put("org.quartz.threadPool.threadCount", "1");
result.put("org.quartz.scheduler.instanceName", jobName + "_Scheduler");
result.put("org.quartz.scheduler.instanceName", Joiner.on("_").join(jobName, SCHEDULER_INSTANCE_NAME_SUFFIX));
if (!configService.isMisfire()) {
result.put("org.quartz.jobStore.misfireThreshold", "1");
}
Expand All @@ -171,7 +175,7 @@ private Properties getBaseQuartzProperties(final String jobName) {
protected void prepareEnvironments(final Properties props) {
}

private void scheduleJob() throws SchedulerException {
private void scheduleJob(final CronTrigger trigger) throws SchedulerException {
if (!scheduler.checkExists(jobDetail.getKey())) {
scheduler.scheduleJob(jobDetail, trigger);
}
Expand Down Expand Up @@ -274,4 +278,16 @@ public void shutdown() {
throw new JobException(ex);
}
}

/**
* 重新调度作业.
*/
public void rescheduleJob(String cronExpression) {
try {
scheduler.rescheduleJob(TriggerKey.triggerKey(Joiner.on("_").join(
jobConfiguration.getJobName(), CRON_TRIGGER_INDENTITY_SUFFIX)), createTrigger(cronExpression));
} catch (SchedulerException ex) {
throw new JobException(ex);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,14 @@ public void assertIsFailoverPath() {
public void assertIsNotFailoverPath() {
assertFalse(configurationNode.isFailoverPath("/testJob/config/failover1"));
}

@Test
public void assertIsCronPath() {
assertTrue(configurationNode.isCronPath("/testJob/config/cron"));
}

@Test
public void assertIsNotCronPath() {
assertFalse(configurationNode.isCronPath("/testJob/config/cron1"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public final class FailoverListenerManagerTest extends AbstractBaseJobTest {
public void setUp() {
failoverListenerManager.listenJobCrashed();
failoverListenerManager.listenFailoverJobCrashed();
failoverListenerManager.listenFailoverSerttingsChanged();
failoverListenerManager.listenFailoverSettingsChanged();
}

@Test
Expand Down

0 comments on commit d794086

Please sign in to comment.