Skip to content

Commit

Permalink
fixed #283
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Apr 13, 2017
1 parent abc2994 commit f314d95
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,11 @@ private void setGuaranteeServiceForElasticJobListeners(final CoordinatorRegistry
* 初始化作业.
*/
public void init() {
LiteJobConfiguration liteJobConfig = schedulerFacade.updateJobConfiguration(this.liteJobConfig);
JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfig.getJobName(), liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount());
JobScheduleController jobScheduleController = new JobScheduleController(createScheduler(), createJobDetail(liteJobConfig.getTypeConfig().getJobClass()), liteJobConfig.getJobName());
JobRegistry.getInstance().registerJob(liteJobConfig.getJobName(), jobScheduleController, regCenter);
schedulerFacade.registerStartUpInfo(liteJobConfig);
schedulerFacade.registerStartUpInfo(!this.liteJobConfig.isDisabled());
jobScheduleController.scheduleJob(liteJobConfig.getTypeConfig().getCoreConfig().getCron());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,26 @@ public SchedulerFacade(final CoordinatorRegistryCenter regCenter, final String j
listenerManager = new ListenerManager(regCenter, jobName, elasticJobListeners);
}

/**
* 更新作业配置.
*
* @param liteJobConfig 作业配置
* @return 更新后的作业配置
*/
public LiteJobConfiguration updateJobConfiguration(final LiteJobConfiguration liteJobConfig) {
configService.persist(liteJobConfig);
return configService.load(false);
}

/**
* 注册作业启动信息.
*
* @param liteJobConfig 作业配置
* @param enabled 作业是否启用
*/
public void registerStartUpInfo(final LiteJobConfiguration liteJobConfig) {
public void registerStartUpInfo(final boolean enabled) {
listenerManager.startAllListeners();
leaderService.electLeader();
configService.persist(liteJobConfig);
LiteJobConfiguration liteJobConfigFromZk = configService.load(false);
serverService.persistOnline(!liteJobConfigFromZk.isDisabled());
serverService.persistOnline(enabled);
instanceService.persistOnline();
shardingService.setReshardingFlag();
monitorService.listen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public final class JobSchedulerTest {

Expand Down Expand Up @@ -66,8 +67,9 @@ public void initMocks() throws NoSuchFieldException {

@Test
public void assertInit() throws NoSuchFieldException, SchedulerException {
when(schedulerFacade.updateJobConfiguration(liteJobConfig)).thenReturn(liteJobConfig);
jobScheduler.init();
verify(schedulerFacade).registerStartUpInfo(liteJobConfig);
verify(schedulerFacade).registerStartUpInfo(true);
Scheduler scheduler = ReflectionUtils.getFieldValue(JobRegistry.getInstance().getJobScheduleController("test_job"), JobScheduleController.class.getDeclaredField("scheduler"));
assertTrue(scheduler.isStarted());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@

import java.util.Collections;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -98,18 +100,25 @@ public void setUp() throws NoSuchFieldException {
ReflectionUtils.setFieldValue(schedulerFacade, "listenerManager", listenerManager);
}

@Test
public void assertUpdateJobConfiguration() {
LiteJobConfiguration jobConfig = LiteJobConfiguration.newBuilder(
new DataflowJobConfiguration(JobCoreConfiguration.newBuilder("test_job", "0/1 * * * * ?", 3).build(), TestDataflowJob.class.getCanonicalName(), false)).build();
when(configService.load(false)).thenReturn(jobConfig);
assertThat(schedulerFacade.updateJobConfiguration(jobConfig), is(jobConfig));
verify(configService).persist(jobConfig);
}

@Test
public void assertRegisterStartUpInfo() {
when(configService.load(false)).thenReturn(LiteJobConfiguration.newBuilder(new DataflowJobConfiguration(JobCoreConfiguration.newBuilder("test_job", "0/1 * * * * ?", 3).build(),
TestDataflowJob.class.getCanonicalName(), false)).build());
schedulerFacade.registerStartUpInfo(liteJobConfig);
schedulerFacade.registerStartUpInfo(true);
verify(listenerManager).startAllListeners();
verify(leaderService).electLeader();
verify(configService).persist(liteJobConfig);
verify(serverService).persistOnline(true);
verify(shardingService).setReshardingFlag();
verify(monitorService).listen();
verify(configService).load(false);
}

@Test
Expand Down

0 comments on commit f314d95

Please sign in to comment.