Skip to content

Commit

Permalink
fix #40
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Jan 27, 2016
1 parent 08baffd commit 3e5f30f
Show file tree
Hide file tree
Showing 14 changed files with 495 additions and 291 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ public class JobScheduler {

private final JobConfiguration jobConfiguration;

private final CoordinatorRegistryCenter coordinatorRegistryCenter;

private final ListenerManager listenerManager;

private final ConfigurationService configService;
Expand Down Expand Up @@ -95,6 +97,7 @@ public class JobScheduler {

public JobScheduler(final CoordinatorRegistryCenter coordinatorRegistryCenter, final JobConfiguration jobConfiguration) {
this.jobConfiguration = jobConfiguration;
this.coordinatorRegistryCenter = coordinatorRegistryCenter;
listenerManager = new ListenerManager(coordinatorRegistryCenter, jobConfiguration);
configService = new ConfigurationService(coordinatorRegistryCenter, jobConfiguration);
leaderElectionService = new LeaderElectionService(coordinatorRegistryCenter, jobConfiguration);
Expand All @@ -113,6 +116,7 @@ public JobScheduler(final CoordinatorRegistryCenter coordinatorRegistryCenter, f
*/
public void init() {
log.debug("Elastic job: job controller init, job name is: {}.", jobConfiguration.getJobName());
coordinatorRegistryCenter.addCacheData("/" + jobConfiguration.getJobName());
registerElasticEnv();
jobDetail = createJobDetail();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ private void dumpDirectly(final String path, final List<String> result) {
for (String each : coordinatorRegistryCenter.getChildrenKeys(path)) {
String zkPath = path + "/" + each;
String zkValue = coordinatorRegistryCenter.get(zkPath);
TreeCache treeCache = (TreeCache) coordinatorRegistryCenter.getRawCache();
TreeCache treeCache = (TreeCache) coordinatorRegistryCenter.getRawCache("/" + jobName);
ChildData treeCacheData = treeCache.getCurrentData(zkPath);
String treeCachePath = null == treeCacheData ? "" : treeCacheData.getPath();
String treeCacheValue = null == treeCacheData ? "" : new String(treeCacheData.getData());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ private CuratorFramework getClient() {
* 注册数据监听器.
*/
public void addDataListener(final TreeCacheListener listener) {
TreeCache cache = (TreeCache) coordinatorRegistryCenter.getRawCache();
TreeCache cache = (TreeCache) coordinatorRegistryCenter.getRawCache("/" + jobConfiguration.getJobName());
cache.getListenable().addListener(listener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,22 @@ public interface CoordinatorRegistryCenter extends RegistryCenter {
/**
* 持久化临时顺序注册数据.
*
* @param key
* @param key
*/
void persistEphemeralSequential(String key);

/**
* 添加本地缓存.
*
* @param watcherPath 需加入缓存的路径
*/
void addCacheData(String cachePath);

/**
* 获取注册中心数据缓存对象.
*
* @param cachePath 缓存的节点路径
* @return 注册中心数据缓存对象
*/
Object getRawCache();
Object getRawCache(String cachePath);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import java.nio.charset.Charset;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;

Expand Down Expand Up @@ -59,12 +61,12 @@ public class ZookeeperRegistryCenter implements CoordinatorRegistryCenter {
@Getter(AccessLevel.PROTECTED)
private ZookeeperConfiguration zkConfig;

private CuratorFramework client;
private final Map<String, TreeCache> caches = new HashMap<>();

private TreeCache cache;
private CuratorFramework client;

public ZookeeperRegistryCenter(final ZookeeperConfiguration zookeeperConfiguration) {
zkConfig = zookeeperConfiguration;
public ZookeeperRegistryCenter(final ZookeeperConfiguration zkConfig) {
this.zkConfig = zkConfig;
}

public void init() {
Expand Down Expand Up @@ -101,7 +103,6 @@ public List<ACL> getAclForPath(final String path) {
if (!Strings.isNullOrEmpty(zkConfig.getLocalPropertiesPath())) {
fillData();
}
cacheData();
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
Expand Down Expand Up @@ -134,15 +135,10 @@ private Properties loadLocalProperties() {
return result;
}

private void cacheData() throws Exception {
cache = new TreeCache(client, "/");
cache.start();
}

@Override
public void close() {
if (null != cache) {
cache.close();
for (Entry<String, TreeCache> each : caches.entrySet()) {
each.getValue().close();
}
waitForCacheClose();
CloseableUtils.closeQuietly(client);
Expand All @@ -163,8 +159,9 @@ private void waitForCacheClose() {

@Override
public String get(final String key) {
if (null == cache) {
return null;
TreeCache cache = findTreeCache(key);
if (null == findTreeCache(key)) {
return getDirectly(key);
}
ChildData resultIncache = cache.getCurrentData(key);
if (null != resultIncache) {
Expand All @@ -173,6 +170,15 @@ public String get(final String key) {
return getDirectly(key);
}

private TreeCache findTreeCache(final String key) {
for (Entry<String, TreeCache> entry : caches.entrySet()) {
if (key.startsWith(entry.getKey())) {
return entry.getValue();
}
}
return null;
}

@Override
public String getDirectly(final String key) {
try {
Expand Down Expand Up @@ -299,7 +305,20 @@ public Object getRawClient() {
}

@Override
public Object getRawCache() {
return cache;
public void addCacheData(final String cachePath) {
TreeCache cache = new TreeCache(client, cachePath);
try {
cache.start();
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
RegExceptionHandler.handleException(ex);
}
caches.put(cachePath, cache);
}

@Override
public Object getRawCache(final String cachePath) {
return caches.get(cachePath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,15 @@
import com.dangdang.ddframe.reg.AllRegTests;
import com.dangdang.ddframe.test.NestedZookeeperServers;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;

@RunWith(Suite.class)
@SuiteClasses({
AllRegTests.class,
AllJobTests.class
})
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class AllTests {

@AfterClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,20 @@
import com.dangdang.ddframe.reg.exception.LocalPropertiesFileNotFoundExceptionTest;
import com.dangdang.ddframe.reg.exception.RegExceptionHandlerTest;
import com.dangdang.ddframe.reg.zookeeper.ZookeeperRegistryCenterForAuthTest;
import com.dangdang.ddframe.reg.zookeeper.ZookeeperRegistryCenterTest;
import com.dangdang.ddframe.reg.zookeeper.ZookeeperRegistryCenterForLocalPropertiesTest;
import com.dangdang.ddframe.reg.zookeeper.ZookeeperRegistryCenterMiscellaneousTest;
import com.dangdang.ddframe.reg.zookeeper.ZookeeperRegistryCenterModifyTest;
import com.dangdang.ddframe.reg.zookeeper.ZookeeperRegistryCenterQueryWithCacheTest;
import com.dangdang.ddframe.reg.zookeeper.ZookeeperRegistryCenterQueryWithoutCacheTest;

@RunWith(Suite.class)
@SuiteClasses({
ZookeeperRegistryCenterTest.class,
ZookeeperRegistryCenterForLocalPropertiesTest.class,
ZookeeperRegistryCenterForAuthTest.class,
ZookeeperRegistryCenterQueryWithCacheTest.class,
ZookeeperRegistryCenterQueryWithoutCacheTest.class,
ZookeeperRegistryCenterModifyTest.class,
ZookeeperRegistryCenterMiscellaneousTest.class,
RegExceptionHandlerTest.class,
LocalPropertiesFileNotFoundExceptionTest.class
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,18 @@
import org.junit.Test;

import com.dangdang.ddframe.test.NestedZookeeperServers;
import com.dangdang.ddframe.test.TestEnvironmentException;

public final class ZookeeperRegistryCenterForAuthTest {


private ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(NestedZookeeperServers.ZK_CONNECTION_STRING, "zkRegTestCenter", 1000, 3000, 3);
private ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(NestedZookeeperServers.ZK_CONNECTION_STRING, ZookeeperRegistryCenterForAuthTest.class.getName(), 1000, 3000, 3);

private ZookeeperRegistryCenter zkRegCenter;

@Before
public void setUp() {
NestedZookeeperServers.getInstance().startServerIfNotStarted();
zkConfig.setDigest("digest:password");
zkConfig.setLocalPropertiesPath("conf/reg/local.properties");
zkConfig.setSessionTimeoutMilliseconds(5000);
zkConfig.setConnectionTimeoutMilliseconds(5000);
zkRegCenter = new ZookeeperRegistryCenter(zkConfig);
Expand All @@ -49,53 +49,28 @@ public void setUp() {
@After
public void tearDown() {
zkRegCenter.close();
clear();
}

private void clear() {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(NestedZookeeperServers.ZK_CONNECTION_STRING).retryPolicy(new RetryOneTime(2000)).authorization("digest", "digest:correct".getBytes()).build();
client.start();
try {
client.blockUntilConnected();
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
}
try {
if (null != client.checkExists().forPath("/zkRegTestCenter")) {
client.delete().deletingChildrenIfNeeded().forPath("/zkRegTestCenter");
}
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
throw new TestEnvironmentException(ex);
}
}

@Test
public void initWithDigestSuccess() throws Exception {
zkConfig.setLocalPropertiesPath("conf/reg/local.properties");
zkConfig.setDigest("digest:correct");
public void assertInitWithDigestSuccess() throws Exception {
zkRegCenter.init();
zkRegCenter.close();
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(NestedZookeeperServers.ZK_CONNECTION_STRING)
.retryPolicy(new RetryOneTime(2000))
.authorization("digest", "digest:correct".getBytes()).build();
.authorization("digest", "digest:password".getBytes()).build();
client.start();
client.blockUntilConnected();
assertThat(client.getData().forPath("/zkRegTestCenter/test/deep/nested"), is("deepNested".getBytes()));
assertThat(client.getData().forPath("/" + ZookeeperRegistryCenterForAuthTest.class.getName() + "/test/deep/nested"), is("deepNested".getBytes()));
}

@Test(expected = NoAuthException.class)
public void initWithDigestFail() throws Exception {
zkConfig.setLocalPropertiesPath("conf/reg/local.properties");
zkConfig.setDigest("digest:correct");
public void assertInitWithDigestFailure() throws Exception {
zkRegCenter.init();
zkRegCenter.close();
CuratorFramework client = CuratorFrameworkFactory.newClient(NestedZookeeperServers.ZK_CONNECTION_STRING, new RetryOneTime(2000));
client.start();
client.blockUntilConnected();
client.getData().forPath("/zkRegTestCenter/test/deep/nested");
client.getData().forPath("/" + ZookeeperRegistryCenterForAuthTest.class.getName() + "/test/deep/nested");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/**
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package com.dangdang.ddframe.reg.zookeeper;

import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;

import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

import com.dangdang.ddframe.reg.exception.LocalPropertiesFileNotFoundException;
import com.dangdang.ddframe.reg.exception.RegException;
import com.dangdang.ddframe.test.NestedZookeeperServers;

public final class ZookeeperRegistryCenterForLocalPropertiesTest {

private ZookeeperRegistryCenter zkRegCenter;

@BeforeClass
public static void init() {
NestedZookeeperServers.getInstance().startServerIfNotStarted();
}

@Before
public void setUp() {
zkRegCenter = createZookeeperRegistryCenter();
}

@After
public void tearDown() {
zkRegCenter.close();
}

@Test
public void assertInitWithoutLocalProperties() {
zkRegCenter.init();
assertFalse(zkRegCenter.isExisted("/notExisted"));
}

@Test(expected = LocalPropertiesFileNotFoundException.class)
public void assertInitWhenLocalPropertiesCannotFind() {
zkRegCenter.getZkConfig().setLocalPropertiesPath("conf/reg/notExisted.properties");
try {
zkRegCenter.init();
} catch (final RegException ex) {
throw (LocalPropertiesFileNotFoundException) ex.getCause();
}
}

@Test
public void assertInitForOverwriteDisabled() {
createInitData();
zkRegCenter.getZkConfig().setLocalPropertiesPath("conf/reg/local_overwrite.properties");
zkRegCenter.init();
assertThat(zkRegCenter.get("/test"), is("test"));
assertThat(zkRegCenter.get("/test/deep/nested"), is("deepNested"));
assertThat(zkRegCenter.get("/new"), is("new"));
}

@Test
public void assertInitForOverwriteEnabled() {
createInitData();
zkRegCenter.getZkConfig().setLocalPropertiesPath("conf/reg/local_overwrite.properties");
zkRegCenter.getZkConfig().setOverwrite(true);
zkRegCenter.init();
assertThat(zkRegCenter.get("/test"), is("test_overwrite"));
assertThat(zkRegCenter.get("/test/deep/nested"), is("deepNested_overwrite"));
assertThat(zkRegCenter.get("/new"), is("new"));
}

private ZookeeperRegistryCenter createZookeeperRegistryCenter() {
return new ZookeeperRegistryCenter(new ZookeeperConfiguration(NestedZookeeperServers.ZK_CONNECTION_STRING, getCurrentRunningMethodName(), 1000, 3000, 3));
}

private void createInitData() {
ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(NestedZookeeperServers.ZK_CONNECTION_STRING, getCurrentRunningMethodName(), 1000, 3000, 3);
zkConfig.setLocalPropertiesPath("conf/reg/local.properties");
ZookeeperRegistryCenter zkRegCenter = new ZookeeperRegistryCenter(zkConfig);
zkRegCenter.init();
zkRegCenter.close();
}

private String getCurrentRunningMethodName() {
return Thread.currentThread().getStackTrace()[1].getMethodName();
}
}
Loading

0 comments on commit 3e5f30f

Please sign in to comment.