Skip to content

Commit

Permalink
Check time diff between job server and register center #6
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Oct 15, 2015
1 parent 2321b41 commit 3b8a9dc
Show file tree
Hide file tree
Showing 21 changed files with 186 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public final class JobSettings implements Serializable {

private int fetchDataCount;

private int maxTimeDiffSeconds;

private boolean failover;

private boolean misfire;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public JobSettings getJobSettings(final String jobName) {
result.setProcessCountIntervalSeconds(Integer.parseInt(curatorRepository.getData(JobNodePath.getConfigNodePath(jobName, "processCountIntervalSeconds"))));
result.setConcurrentDataProcessThreadCount(Integer.parseInt(curatorRepository.getData(JobNodePath.getConfigNodePath(jobName, "concurrentDataProcessThreadCount"))));
result.setFetchDataCount(Integer.parseInt(curatorRepository.getData(JobNodePath.getConfigNodePath(jobName, "fetchDataCount"))));
result.setMaxTimeDiffSeconds(Integer.parseInt(curatorRepository.getData(JobNodePath.getConfigNodePath(jobName, "maxTimeDiffSeconds"))));
result.setFailover(Boolean.valueOf(curatorRepository.getData(JobNodePath.getConfigNodePath(jobName, "failover"))));
result.setMisfire(Boolean.valueOf(curatorRepository.getData(JobNodePath.getConfigNodePath(jobName, "misfire"))));
result.setJobShardingStrategyClass(curatorRepository.getData(JobNodePath.getConfigNodePath(jobName, "jobShardingStrategyClass")));
Expand All @@ -114,6 +115,7 @@ public void updateJobSettings(final JobSettings jobSettings) {
updateIfchanged(jobSettings.getJobName(), "processCountIntervalSeconds", jobSettings.getProcessCountIntervalSeconds());
updateIfchanged(jobSettings.getJobName(), "concurrentDataProcessThreadCount", jobSettings.getConcurrentDataProcessThreadCount());
updateIfchanged(jobSettings.getJobName(), "fetchDataCount", jobSettings.getFetchDataCount());
updateIfchanged(jobSettings.getJobName(), "maxTimeDiffSeconds", jobSettings.getMaxTimeDiffSeconds());
updateIfchanged(jobSettings.getJobName(), "failover", jobSettings.isFailover());
updateIfchanged(jobSettings.getJobName(), "misfire", jobSettings.isMisfire());
updateIfchanged(jobSettings.getJobName(), "jobShardingStrategyClass", jobSettings.getJobShardingStrategyClass());
Expand Down
4 changes: 3 additions & 1 deletion elastic-job-console/src/main/webapp/js/job_detail.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ function renderSettings() {
$("#processCountIntervalSeconds").attr("value", data.processCountIntervalSeconds);
$("#concurrentDataProcessThreadCount").attr("value", data.concurrentDataProcessThreadCount);
$("#fetchDataCount").attr("value", data.fetchDataCount);
$("#maxTimeDiffSeconds").attr("value", data.maxTimeDiffSeconds);
$("#jobShardingStrategyClass").attr("value", data.jobShardingStrategyClass);
$("#description").text(data.description);
if (!data.monitorExecution) {
Expand All @@ -48,13 +49,14 @@ function bindSubmitJobSettingsForm() {
var concurrentDataProcessThreadCount = $("#concurrentDataProcessThreadCount").val();
var processCountIntervalSeconds = $("#processCountIntervalSeconds").val();
var fetchDataCount = $("#fetchDataCount").val();
var maxTimeDiffSeconds = $("#maxTimeDiffSeconds").val();
var monitorExecution = $("#monitorExecution").prop("checked");
var failover = $("#failover").prop("checked");
var misfire = $("#misfire").prop("checked");
var shardingItemParameters = $("#shardingItemParameters").val();
var jobShardingStrategyClass = $("#jobShardingStrategyClass").val();
var description = $("#description").val();
$.post("job/settings", {jobName: jobName, jobClass : jobClass, shardingTotalCount: shardingTotalCount, jobParameter: jobParameter, cron: cron, concurrentDataProcessThreadCount: concurrentDataProcessThreadCount, processCountIntervalSeconds: processCountIntervalSeconds, fetchDataCount: fetchDataCount, monitorExecution: monitorExecution, failover: failover, misfire: misfire, shardingItemParameters: shardingItemParameters, jobShardingStrategyClass: jobShardingStrategyClass, description: description}, function(data) {
$.post("job/settings", {jobName: jobName, jobClass : jobClass, shardingTotalCount: shardingTotalCount, jobParameter: jobParameter, cron: cron, concurrentDataProcessThreadCount: concurrentDataProcessThreadCount, processCountIntervalSeconds: processCountIntervalSeconds, fetchDataCount: fetchDataCount, maxTimeDiffSeconds: maxTimeDiffSeconds, monitorExecution: monitorExecution, failover: failover, misfire: misfire, shardingItemParameters: shardingItemParameters, jobShardingStrategyClass: jobShardingStrategyClass, description: description}, function(data) {
showSuccessDialog();
if (monitorExecution) {
$("#execution_info_tab").removeClass("disabled");
Expand Down
7 changes: 7 additions & 0 deletions elastic-job-console/src/main/webapp/templates/job_detail.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@
</div>
</div>

<div class="form-group">
<label for="maxTimeDiffSeconds" class="col-sm-2 control-label">最大容忍的本机与注册中心的时间误差秒数</label>
<div class="col-sm-1">
<input type="number" id="maxTimeDiffSeconds" name="maxTimeDiffSeconds" class="form-control" data-toggle="tooltip" data-placement="bottom" title="如果时间误差超过配置秒数则作业启动时将抛异常。配置为-1表示不检查时间误差。" />
</div>
</div>

<div class="form-group">
<label for="monitorExecution" class="col-sm-2 control-label">监控作业执行时状态</label>
<div class="col-sm-2">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public abstract class AbstractElasticJob implements ElasticJob {
@Override
public final void execute(final JobExecutionContext context) throws JobExecutionException {
log.debug("Elastic job: job execute begin, job execution context:{}.", context);
configService.checkMaxTimeDiffSecondsTolerable();
shardingService.shardingIfNecessary();
JobExecutionMultipleShardingContext shardingContext = executionContextService.getJobExecutionShardingContext();
if (executionService.misfireIfNecessary(shardingContext.getShardingItems())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,16 @@ public class JobConfiguration {
*/
private int fetchDataCount = 1;

/**
* 最大容忍的本机与注册中心的时间误差秒数.
*
* <p>
* 如果时间误差超过配置秒数则作业启动时将抛异常.
* 配置为-1表示不检查时间误差.
* </p>
*/
private int maxTimeDiffSeconds = -1;

/**
* 是否开启失效转移.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package com.dangdang.ddframe.job.exception;

/**
* 本机与注册中心的时间误差超过容忍范围抛出的异常.
*
* @author zhangliang
*/
public final class TimeDiffIntolerableException extends JobException {

private static final long serialVersionUID = -6287464997081326084L;

private static final String ERROR_MSG = "Time different between job server and register center exceed [%s] seconds, max time different is [%s] seconds.";

public TimeDiffIntolerableException(final int timeDiffSeconds, final int maxTimeDiffSeconds) {
super(ERROR_MSG, timeDiffSeconds, maxTimeDiffSeconds);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public final class ConfigurationNode {

static final String FETCH_DATA_COUNT = ROOT + "/fetchDataCount";

static final String MAX_TIME_DIFF_SECONDS = ROOT + "/maxTimeDiffSeconds";

static final String FAILOVER = ROOT + "/failover";

static final String MISFIRE = ROOT + "/misfire";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.dangdang.ddframe.job.api.JobConfiguration;
import com.dangdang.ddframe.job.exception.JobConflictException;
import com.dangdang.ddframe.job.exception.ShardingItemParametersException;
import com.dangdang.ddframe.job.exception.TimeDiffIntolerableException;
import com.dangdang.ddframe.job.internal.storage.JobNodeStorage;
import com.dangdang.ddframe.reg.base.CoordinatorRegistryCenter;
import com.google.common.base.Strings;
Expand Down Expand Up @@ -69,6 +70,7 @@ private void registerJobInfo() {
jobNodeStorage.fillJobNodeIfNullOrOverwrite(ConfigurationNode.PROCESS_COUNT_INTERVAL_SECONDS, jobNodeStorage.getJobConfiguration().getProcessCountIntervalSeconds());
jobNodeStorage.fillJobNodeIfNullOrOverwrite(ConfigurationNode.CONCURRENT_DATA_PROCESS_THREAD_COUNT, jobNodeStorage.getJobConfiguration().getConcurrentDataProcessThreadCount());
jobNodeStorage.fillJobNodeIfNullOrOverwrite(ConfigurationNode.FETCH_DATA_COUNT, jobNodeStorage.getJobConfiguration().getFetchDataCount());
jobNodeStorage.fillJobNodeIfNullOrOverwrite(ConfigurationNode.MAX_TIME_DIFF_SECONDS, jobNodeStorage.getJobConfiguration().getMaxTimeDiffSeconds());
jobNodeStorage.fillJobNodeIfNullOrOverwrite(ConfigurationNode.FAILOVER, jobNodeStorage.getJobConfiguration().isFailover());
jobNodeStorage.fillJobNodeIfNullOrOverwrite(ConfigurationNode.MISFIRE, jobNodeStorage.getJobConfiguration().isMisfire());
jobNodeStorage.fillJobNodeIfNullOrOverwrite(ConfigurationNode.JOB_SHARDING_STRATEGY_CLASS, jobNodeStorage.getJobConfiguration().getJobShardingStrategyClass());
Expand Down Expand Up @@ -164,6 +166,20 @@ public int getFetchDataCount() {
return Integer.parseInt(jobNodeStorage.getJobNodeData(ConfigurationNode.FETCH_DATA_COUNT));
}

/**
* 检查本机与注册中心的时间误差秒数是否在允许范围.
*/
public void checkMaxTimeDiffSecondsTolerable() {
int maxTimeDiffSeconds = Integer.parseInt(jobNodeStorage.getJobNodeData(ConfigurationNode.MAX_TIME_DIFF_SECONDS));
if (-1 == maxTimeDiffSeconds) {
return;
}
long timeDiff = Math.abs(System.currentTimeMillis() - jobNodeStorage.getRegistryCenterTime());
if (timeDiff > maxTimeDiffSeconds * 1000L) {
throw new TimeDiffIntolerableException(Long.valueOf(timeDiff / 1000).intValue(), maxTimeDiffSeconds);
}
}

/**
* 获取是否开启失效转移.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,4 +199,13 @@ public void addDataListener(final TreeCacheListener listener) {
TreeCache cache = (TreeCache) coordinatorRegistryCenter.getRawCache();
cache.getListenable().addListener(listener);
}

/**
* 获取注册中心当前时间.
*
* @return 注册中心当前时间
*/
public long getRegistryCenterTime() {
return coordinatorRegistryCenter.getRegistryCenterTime(jobNodePath.getFullPath("systemTime/current"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ public interface RegistryCenter {
*/
void remove(String key);

/**
* 获取注册中心当前时间.
*
* @param key 用于获取时间的键
* @return 注册中心当前时间
*/
long getRegistryCenterTime(String key);

/**
* 直接获取操作注册中心的原生客户端.
* 如:Zookeeper或Redis等原生客户端.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import com.dangdang.ddframe.reg.base.CoordinatorRegistryCenter;
import com.dangdang.ddframe.reg.exception.LocalPropertiesFileNotFoundException;
import com.dangdang.ddframe.reg.exception.RegExceptionHandler;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;

/**
Expand Down Expand Up @@ -268,6 +269,21 @@ public void remove(final String key) {
}
}

@Override
public long getRegistryCenterTime(final String key) {
long result = 0L;
try {
String path = client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(key);
result = client.checkExists().forPath(path).getCtime();
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
RegExceptionHandler.handleException(ex);
}
Preconditions.checkArgument(0L != result, "Cannot get registry center time.");
return result;
}

@Override
public Object getRawClient() {
return client;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.dangdang.ddframe.job.api.JobExecutionMultipleShardingContext;
import com.dangdang.ddframe.job.exception.JobConflictException;
import com.dangdang.ddframe.job.exception.ShardingItemParametersException;
import com.dangdang.ddframe.job.exception.TimeDiffIntolerableException;
import com.dangdang.ddframe.job.internal.AbstractBaseJobTest;
import com.dangdang.ddframe.job.internal.sharding.strategy.JobShardingStrategy;

Expand Down Expand Up @@ -191,6 +192,26 @@ public void assertGetFetchDataCount() {
assertThat(configService.getFetchDataCount(), is(1));
}

@Test
public void assertIsMaxTimeDiffSecondsTolerableWithDefaultValue() {
configService.persistJobConfiguration();
configService.checkMaxTimeDiffSecondsTolerable();
}

@Test
public void assertIsMaxTimeDiffSecondsTolerable() {
getJobConfig().setMaxTimeDiffSeconds(60);
configService.persistJobConfiguration();
configService.checkMaxTimeDiffSecondsTolerable();
}

@Test(expected = TimeDiffIntolerableException.class)
public void assertIsNotMaxTimeDiffSecondsTolerable() {
getJobConfig().setMaxTimeDiffSeconds(-60);
configService.persistJobConfiguration();
configService.checkMaxTimeDiffSecondsTolerable();
}

@Test
public void assertIsNotFailoverWhenNotMonitorExecution() {
getJobConfig().setMonitorExecution(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,12 @@ public void remove() {
assertFalse(zkRegCenter.isExisted("/test"));
}

@Test
public void getRegistryCenterTime() {
zkRegCenter.init();
assertTrue(zkRegCenter.getRegistryCenterTime("/_systemTime/current") <= System.currentTimeMillis());
}

@Test
public void getRawClient() {
zkRegCenter.init();
Expand Down
24 changes: 24 additions & 0 deletions elastic-job-example/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

<elastic-job.version>1.0.2-SNAPSHOT</elastic-job.version>

<slf4j.version>1.7.7</slf4j.version>
<logback.version>1.1.2</logback.version>

<junit.version>4.12</junit.version>

<maven-compiler-plugin.version>3.3</maven-compiler-plugin.version>
Expand All @@ -35,6 +38,27 @@
<version>${elastic-job.version}</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@

<job:bean id="oneOffElasticJob" class="com.dangdang.example.elasticjob.spring.job.OneOffElasticDemoJob" regCenter="regCenter" shardingTotalCount="${oneOffElasticJob.shardingTotalCount}" cron="${oneOffElasticJob.cron}" shardingItemParameters="${oneOffElasticJob.shardingItemParameters}" monitorExecution="${oneOffElasticJob.monitorExecution}" failover="${oneOffElasticJob.failover}" description="${oneOffElasticJob.description}" disabled="${oneOffElasticJob.disabled}" overwrite="${oneOffElasticJob.overwrite}" />
<job:bean id="perpetualElasticJob" class="com.dangdang.example.elasticjob.spring.job.PerpetualElasticDemoJob" regCenter="regCenter" shardingTotalCount="${perpetualElasticJob.shardingTotalCount}" cron="${perpetualElasticJob.cron}" shardingItemParameters="${perpetualElasticJob.shardingItemParameters}" monitorExecution="${perpetualElasticJob.monitorExecution}" failover="${perpetualElasticJob.failover}" processCountIntervalSeconds="${perpetualElasticJob.processCountIntervalSeconds}" concurrentDataProcessThreadCount="${perpetualElasticJob.concurrentDataProcessThreadCount}" description="${perpetualElasticJob.description}" disabled="${perpetualElasticJob.disabled}" overwrite="${perpetualElasticJob.overwrite}" />
<job:bean id="sequencePerpetualElasticJob" class="com.dangdang.example.elasticjob.spring.job.SequencePerpetualElasticDemoJob" regCenter="regCenter" shardingTotalCount="${sequencePerpetualElasticJob.shardingTotalCount}" cron="${sequencePerpetualElasticJob.cron}" shardingItemParameters="${sequencePerpetualElasticJob.shardingItemParameters}" monitorExecution="${sequencePerpetualElasticJob.monitorExecution}" failover="${sequencePerpetualElasticJob.failover}" processCountIntervalSeconds="${sequencePerpetualElasticJob.processCountIntervalSeconds}" description="${sequencePerpetualElasticJob.description}" disabled="${sequencePerpetualElasticJob.disabled}" overwrite="${sequencePerpetualElasticJob.overwrite}" />
<job:bean id="sequencePerpetualElasticJob" class="com.dangdang.example.elasticjob.spring.job.SequencePerpetualElasticDemoJob" regCenter="regCenter" shardingTotalCount="${sequencePerpetualElasticJob.shardingTotalCount}" cron="${sequencePerpetualElasticJob.cron}" shardingItemParameters="${sequencePerpetualElasticJob.shardingItemParameters}" monitorExecution="${sequencePerpetualElasticJob.monitorExecution}" failover="${sequencePerpetualElasticJob.failover}" processCountIntervalSeconds="${sequencePerpetualElasticJob.processCountIntervalSeconds}" maxTimeDiffSeconds="${sequencePerpetualElasticJob.maxTimeDiffSeconds}" description="${sequencePerpetualElasticJob.description}" disabled="${sequencePerpetualElasticJob.disabled}" overwrite="${sequencePerpetualElasticJob.overwrite}" />
</beans>
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
<property name="monitorExecution" value="${sequencePerpetualElasticJob.monitorExecution}" />
<property name="failover" value="${sequencePerpetualElasticJob.failover}" />
<property name="processCountIntervalSeconds" value="${sequencePerpetualElasticJob.processCountIntervalSeconds}" />
<property name="maxTimeDiffSeconds" value="${sequencePerpetualElasticJob.maxTimeDiffSeconds}" />
<property name="description" value="${sequencePerpetualElasticJob.description}" />
<property name="disabled" value="${sequencePerpetualElasticJob.disabled}" />
<property name="overwrite" value="${sequencePerpetualElasticJob.overwrite}" />
Expand Down
1 change: 1 addition & 0 deletions elastic-job-example/src/main/resources/conf/job.properties
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ perpetualElasticJob.overwrite=true
sequencePerpetualElasticJob.cron=0/5 * * * * ?
sequencePerpetualElasticJob.shardingTotalCount=10
sequencePerpetualElasticJob.shardingItemParameters=0=A,1=B,2=C,3=D,4=E,5=F,6=G,7=H,8=I,9=J
sequencePerpetualElasticJob.maxTimeDiffSeconds=-1
sequencePerpetualElasticJob.monitorExecution=true
sequencePerpetualElasticJob.failover=true
sequencePerpetualElasticJob.processCountIntervalSeconds=10
Expand Down
Loading

0 comments on commit 3b8a9dc

Please sign in to comment.