基于zookeeper实现的分布式开发客户端
- client 实现zookeeper watch,session过期处理,断线重连
- expansion 实现分布式锁(互斥锁),分布式信号量(samephore),分布式闭锁(CountDownLatch)等
<dependency>
<groupId>com.keeper</groupId>
<artifactId>keeper</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
//构造
KeeperClient client = new KeeperClient("127.0.0.1:2181");
//关闭
client.closeClient();
//create
String create(String path,byte[] bytes);
//createMode 指定创建临时节点或者持久节点,或者序列节点
String create(String path, byte[] bytes,CreateMode createMode);
//create 自动创建父亲节点
String createWtihParent(String path);
//create 字符串数据
String createStr(String path,String data,CreateMode createMode);
//create 对象数据
String createObject(String path,Object t,CreateMode createMode);
//read
byte[] read(String path);
//read 读取字符串数据
String readStr(String path);
//read 读取对象数据
<T> T readObject(String path,Class<T> clazz);
//update
void update(String path,byte[] bytes);
//update 更新字符串数据
void updateStr(String path,String data);
//update 更新对象数据
void updateObject(String path,Object data);
//delete
boolean delete(String path);
//delete 含子节点
boolean deleteRecurse(String path);
//getChildren
List<String> getChildren(String parent);
//getSortedChildren 用于子节点全部为int名称的默认排序
List<String> getSortedChildren(String parent) ;
//getSortedChildren 传入指定比较器
List<String> getSortedChildren(String parent,Comparator<String> comparator);
//监听数据变化,删;传入指定listener,当变化发生时事件回调
void listenNode(String path,KeeperNodeListener keeperNodeListener);
//监听子节点增删以及本节点删除;传入指定listener,当变化发生时事件回调
void listenChild(String path,KeeperChildListener keeperChildListener);
//构造分布式锁(互斥可重入),name是锁名称
final KeeperLock lock = new KeeperMutexLock("testlocka", client);
//加锁
lock.lock();
//解锁
lock.unlock();
//构造分布式信号量,name是名称,共享信号量的线程使用同一名称,int为最大许可总数,一经创建不可修改
KeeperSemaphore semaphore = KeeperSimpleSemaphore.getOrCreate("semaphore2", 3, client);
//请求许可,无可用许可会阻塞线程,当然也可以使用tryAcquire直接返回
semaphore.acquire();
//返还许可,若不主动返还,则本许可将一直被该线程占用,直到client离线
semaphore.release();
//构造分布式latch,name是名称,count为需要创建的count数值,注意如果该name标识的latch已经存在的话,则不会重新创建,使用已有count
KeeperCountDownLatch latch = KeeperCountDownLatch.getOrCreate("latch02", 2, client) ;
//等待count变为0
latch.await();
//count -1变为0后继续countDown将无实际变化
latch.countDown();
//销毁latch,销毁后其他阻塞的线程将不再阻塞,后续继续对latch执行await,countdown等操作将抛出异常。
latch.destroy();
KeeperClient client = new KeeperClient("127.0.0.1:2181");
static String testPath = "/hworld";
static String testData = "hello world";
@Test
public void testCreate(){
client.create(testPath, testData.getBytes());
Assert.assertTrue(testData .equals( new String(client.read(testPath))));
}
@Test
public void testCreateStr(){
String str = "hello World! \\n ****";
client.createStr(testPath, str,CreateMode.PERSISTENT);
Assert.assertTrue(str .equals(client.readStr(testPath)));
}
@Test
public void testCreateObject(){
Person person = new Person();
person.setAge(20);
person.setJoinDate(new Date());
person.setJoinTheTeam(true);
person.setJoinTime(System.currentTimeMillis());
person.setName("张三");
client.createObject(testPath, person,CreateMode.PERSISTENT);
Person p = client.readObject(testPath, Person.class);
System.out.println(p);
Assert.assertTrue(person.getName().equals(p.getName()));
Assert.assertTrue(person.getAge()==p.getAge());
Assert.assertTrue(person.getJoinDate().getTime()==p.getJoinDate().getTime());
Assert.assertTrue(person.getJoinTime()==p.getJoinTime());
Assert.assertTrue(p.isJoinTheTeam());
}
@Test
public void testRead(){
client.create(testPath, testData.getBytes());
Assert.assertTrue(testData .equals( new String(client.read(testPath))));
}
@Test
public void testGetChildren(){
client.createWtihParent("/A/B/C");
List<String> children = client.getChildren("/A");
Assert.assertArrayEquals(children.toArray(), new Object[]{"B"});
}
@Test
public void testUpdate(){
client.create(testPath, testData.getBytes());
client.update(testPath, "A".getBytes());
Assert.assertEquals("A", new String(client.read(testPath)));
}
@Test
public void testDelete(){
client.create(testPath, testData.getBytes());
client.delete(testPath);
Assert.assertTrue(!client.exist(testPath));
}
public void testDeleteRecurse(){
if (client.exist(testPath)){
client.deleteRecurse(testPath);
}
}
static String testPath = "/hworld";
static String testData = "hello world";
@Test
public void testNodeListen() throws InterruptedException{
final AtomicInteger onDataCall = new AtomicInteger();
final AtomicInteger onDeleteCall = new AtomicInteger();
client.listenNode(testPath, new KeeperNodeListener() {
public void onDelete(String path) {
System.out.println(testPath + " is deleted");
onDeleteCall.addAndGet(1);
}
public void onData(String path, byte[] bytes) {
System.out.println(testPath + " is created or updated with the data : " + new String(bytes));
onDataCall.addAndGet(1);
};
});
client.create(testPath, testData.getBytes());
Thread.sleep(100);
client.update(testPath, (testData+"xx").getBytes());
Thread.sleep(100);
client.delete(testPath);
Thread.sleep(100);
Assert.assertEquals(onDataCall.get(), 2);
Assert.assertEquals(onDeleteCall.get(), 1);
}
@Test
public void testChildListen() throws InterruptedException{
final AtomicInteger onChildCall = new AtomicInteger();
client.listenChild(testPath, new KeeperChildListener() {
public void onParentDelete(String path) {
System.out.println("parent " + path +" was been deleted!");
}
public void onChild(String parent, List<String> subs) {
System.out.println("child added or removed : " + subs);
onChildCall.addAndGet(1);
}
});
client.delete(testPath);
Thread.sleep(1000);
client.create(testPath, testData.getBytes());
Thread.sleep(1000);
client.create(testPath+"/"+"A", testData.getBytes());
Thread.sleep(1000);
client.create(testPath+"/"+"B", testData.getBytes());
Thread.sleep(1000);
client.delete(testPath+"/"+"B");
Thread.sleep(1000);
client.deleteRecurse(testPath);
Thread.sleep(1000);
}
final KeeperLock lock1 = new KeeperMutexLock("testlocka", client);
new ReentrantLockThread(lock1,"lock1a").start();
new ReentrantLockThread(lock1,"lock1b").start();
new ReentrantLockThread(lock1,"lock1c").start();
class ReentrantLockThread extends Thread{
private KeeperLock lock ;
public ReentrantLockThread(KeeperLock lock,String name) {
this.lock = lock;
this.setName(name);
}
@Override
public void run() {
try {
lock.lock();
System.out.println(Thread.currentThread().getName()+" locked 1 ");
} catch (InterruptedException e1) {
return ;
}
try {
lock.lock();
System.out.println(Thread.currentThread().getName()+" locked 2 ");
lock.unlock();
System.out.println(Thread.currentThread().getName()+" release 1 ");
//观察该线程占用锁5秒
Thread.sleep(5000);
lock.unlock();
System.out.println(Thread.currentThread().getName()+" release 2 ");
} catch (InterruptedException e) {
}
}
}
//观察给出3个许可,允许3个线程同时执行
KeeperSemaphore semaphore = KeeperSimpleSemaphore.getOrCreate("semaphore2", 3, client);
for (int i=1;i<=16;i++){
Thread myThread = new MyThread(i+"", semaphore);
myThread.start();
}
class MyThread extends Thread{
KeeperSemaphore semaphore ;
public MyThread(String name, KeeperSemaphore semaphore) {
this.setName(name);
this.semaphore = semaphore;
}
@Override
public void run() {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " got a release ,availablePermits is " +semaphore.availablePermits());
//观察sleep一段时间,在release之前,许可不会被其他线程拿到
Thread.sleep(new Random().nextInt(1000));
semaphore.release();
System.out.println(Thread.currentThread().getName() + " returnd a release ,availablePermits is " +semaphore.availablePermits());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Test
public void testLock() throws InterruptedException, SemaphoreException {
KeeperCountDownLatch latch = KeeperCountDownLatch.getOrCreate("latch02", 2, client) ;
//一直等到countdown到0的线程1
MyThread t1 = new MyThread(latch, "waitCountDownThread1", true);
//指定较长时间,时间未到而countdown变为0,最终返回true的线程
MyThread t2 = new MyThread(latch, "waitTimeDownThreadTrue", false);
t2.setWaitMilliseconds(10000);
//指定较短时间,时间到而countdown未变为0,最终返回false的线程
MyThread t3 = new MyThread(latch, "waitTimeDownThreadFalse", false);
t3.setWaitMilliseconds(3000);
//一直等到countdown到0的线程2
MyThread t4 = new MyThread(latch, "waitCountDownThread2", true);
t1.start();t2.start();t3.start();t4.start();
Thread.sleep(5000);
latch.countDown();
System.out.println("count down 1");
latch.countDown();
System.out.println("count down 2");
Thread.sleep(5000l);
}
class MyThread extends Thread{
private KeeperCountDownLatch keeperCountDownLatch ;
boolean waitUtilCountDown ;
private int waitMilliseconds ;
public void setWaitMilliseconds(int waitMilliseconds) {
this.waitMilliseconds = waitMilliseconds;
}
public MyThread(KeeperCountDownLatch keeperCountDownLatch,String name,boolean waitUtilCountDown) {
this.keeperCountDownLatch = keeperCountDownLatch;
this.waitUtilCountDown = waitUtilCountDown ;
this.setName(name);
}
@Override
public void run() {
try {
if (waitUtilCountDown){
keeperCountDownLatch.await();
System.out.println(Thread.currentThread().getName() + " return util countdown " + new Date().toLocaleString());
}else {
if (!keeperCountDownLatch.await(waitMilliseconds,TimeUnit.MILLISECONDS)){
System.out.println(Thread.currentThread().getName() + " returnd false util the specified waiting time elapses at " + new Date().toLocaleString());
}else {
System.out.println(Thread.currentThread().getName() + " returnd true util countdown " + new Date().toLocaleString());
}
}
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
- huangdou