Skip to content

Commit

Permalink
fix #69
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Mar 17, 2016
1 parent dcdd679 commit ffbeef8
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void shardingIfNecessary() {
}
log.debug("Elastic job: sharding begin.");
jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");
jobNodeStorage.executeInTransaction(new ClearShardingInfoInfoTransactionExecutionCallback());
clearShardingInfo();
JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(configService.getJobShardingStrategyClass());
JobShardingStrategyOption option = new JobShardingStrategyOption(jobName, configService.getShardingTotalCount(), configService.getShardingItemParameters());
jobNodeStorage.executeInTransaction(new PersistShardingInfoTransactionExecutionCallback(jobShardingStrategy.sharding(serverService.getAvailableServers(), option)));
Expand All @@ -127,6 +127,12 @@ private void waitingOtherJobCompleted() {
}
}

private void clearShardingInfo() {
for (String each : serverService.getAllServers()) {
jobNodeStorage.removeJobNodeIfExisted(ShardingNode.getShardingNode(each));
}
}

/**
* 获取运行在本作业服务器的分片序列号.
*
Expand All @@ -140,17 +146,6 @@ public List<Integer> getLocalHostShardingItems() {
return ItemUtils.toItemList(jobNodeStorage.getJobNodeDataDirectly(ShardingNode.getShardingNode(ip)));
}

class ClearShardingInfoInfoTransactionExecutionCallback implements TransactionExecutionCallback {

@Override
public void execute(final CuratorTransactionFinal curatorTransactionFinal) throws Exception {
for (String each : serverService.getAllServers()) {
String shardingNode = jobNodePath.getFullPath(ShardingNode.getShardingNode(each));
curatorTransactionFinal.check().forPath(shardingNode).and().delete().forPath(shardingNode).and();
}
}
}

@RequiredArgsConstructor
class PersistShardingInfoTransactionExecutionCallback implements TransactionExecutionCallback {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public void assertShardingNecessaryWhenMonitorExecutionEnabled() {
when(jobNodeStorage.isJobNodeExisted("leader/sharding/necessary")).thenReturn(true);
when(leaderElectionService.isLeader()).thenReturn(true);
when(configService.isMonitorExecution()).thenReturn(true);
when(serverService.getAllServers()).thenReturn(Arrays.asList("ip1", "ip2"));
when(executionService.hasRunningItems()).thenReturn(true, false);
when(configService.getJobShardingStrategyClass()).thenReturn(AverageAllocationJobShardingStrategy.class.getCanonicalName());
when(configService.getShardingTotalCount()).thenReturn(3);
Expand All @@ -135,30 +136,35 @@ public void assertShardingNecessaryWhenMonitorExecutionEnabled() {
verify(leaderElectionService).isLeader();
verify(configService).isMonitorExecution();
verify(executionService, times(2)).hasRunningItems();
verify(jobNodeStorage).removeJobNodeIfExisted("servers/ip1/sharding");
verify(jobNodeStorage).removeJobNodeIfExisted("servers/ip2/sharding");
verify(jobNodeStorage).fillEphemeralJobNode("leader/sharding/processing", "");
verify(configService).getJobShardingStrategyClass();
verify(configService).getShardingTotalCount();
verify(configService).getShardingItemParameters();
verify(jobNodeStorage, times(2)).executeInTransaction(any(TransactionExecutionCallback.class));
verify(jobNodeStorage).executeInTransaction(any(TransactionExecutionCallback.class));
}

@Test
public void assertShardingNecessaryWhenMonitorExecutionDisabled() throws Exception {
when(jobNodeStorage.isJobNodeExisted("leader/sharding/necessary")).thenReturn(true);
when(leaderElectionService.isLeader()).thenReturn(true);
when(configService.isMonitorExecution()).thenReturn(false);
when(serverService.getAllServers()).thenReturn(Arrays.asList("ip1", "ip2"));
when(configService.getJobShardingStrategyClass()).thenReturn(AverageAllocationJobShardingStrategy.class.getCanonicalName());
when(configService.getShardingTotalCount()).thenReturn(3);
when(configService.getShardingItemParameters()).thenReturn(Collections.<Integer, String>emptyMap());
shardingService.shardingIfNecessary();
verify(jobNodeStorage).isJobNodeExisted("leader/sharding/necessary");
verify(leaderElectionService).isLeader();
verify(configService).isMonitorExecution();
verify(jobNodeStorage).removeJobNodeIfExisted("servers/ip1/sharding");
verify(jobNodeStorage).removeJobNodeIfExisted("servers/ip2/sharding");
verify(jobNodeStorage).fillEphemeralJobNode("leader/sharding/processing", "");
verify(configService).getJobShardingStrategyClass();
verify(configService).getShardingTotalCount();
verify(configService).getShardingItemParameters();
verify(jobNodeStorage, times(2)).executeInTransaction(any(TransactionExecutionCallback.class));
verify(jobNodeStorage).executeInTransaction(any(TransactionExecutionCallback.class));
}

@Test
Expand All @@ -177,29 +183,6 @@ public void assertGetLocalHostShardingWhenNodeNotExisted() {
verify(jobNodeStorage).isJobNodeExisted("servers/mockedIP/sharding");
}

@Test
public void assertClearShardingInfoInfoTransactionExecutionCallback() throws Exception {
when(serverService.getAllServers()).thenReturn(Collections.singletonList("host0"));
CuratorTransactionFinal curatorTransactionFinal = mock(CuratorTransactionFinal.class);
TransactionCheckBuilder transactionCheckBuilder = mock(TransactionCheckBuilder.class);
TransactionDeleteBuilder transactionDeleteBuilder = mock(TransactionDeleteBuilder.class);
CuratorTransactionBridge curatorTransactionBridge = mock(CuratorTransactionBridge.class);
when(curatorTransactionFinal.check()).thenReturn(transactionCheckBuilder);
when(transactionCheckBuilder.forPath("/testJob/servers/host0/sharding")).thenReturn(curatorTransactionBridge);
when(curatorTransactionBridge.and()).thenReturn(curatorTransactionFinal);
when(curatorTransactionFinal.delete()).thenReturn(transactionDeleteBuilder);
when(transactionDeleteBuilder.forPath("/testJob/servers/host0/sharding")).thenReturn(curatorTransactionBridge);
when(curatorTransactionBridge.and()).thenReturn(curatorTransactionFinal);
ShardingService.ClearShardingInfoInfoTransactionExecutionCallback actual = shardingService.new ClearShardingInfoInfoTransactionExecutionCallback();
actual.execute(curatorTransactionFinal);
verify(serverService).getAllServers();
verify(curatorTransactionFinal).check();
verify(transactionCheckBuilder).forPath("/testJob/servers/host0/sharding");
verify(curatorTransactionFinal).delete();
verify(transactionDeleteBuilder).forPath("/testJob/servers/host0/sharding");
verify(curatorTransactionBridge, times(2)).and();
}

@Test
public void assertPersistShardingInfoTransactionExecutionCallback() throws Exception {
CuratorTransactionFinal curatorTransactionFinal = mock(CuratorTransactionFinal.class);
Expand Down

0 comments on commit ffbeef8

Please sign in to comment.