Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.helix.manager.zk.ZkAsyncCallbacks.ExistsCallbackHandler;
import org.apache.helix.manager.zk.ZkAsyncCallbacks.GetDataCallbackHandler;
import org.apache.helix.manager.zk.ZkAsyncCallbacks.SetDataCallbackHandler;
import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
import org.apache.helix.manager.zk.client.HelixZkClient;
import org.apache.helix.manager.zk.client.SharedZkClientFactory;
import org.apache.helix.store.zk.ZNode;
Expand All @@ -56,6 +57,24 @@
import org.slf4j.LoggerFactory;

public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {

// Designates which mode ZkBaseDataAccessor should be created in. If not specified, it will be
// created on SHARED mode.
public enum ZkClientType {
/*
* When ZkBaseDataAccessor is created with the DEDICATED type, it supports ephemeral node
* creation, callback functionality, and session management. But note that this is more
* resource-heavy since it creates a dedicated ZK connection so should be used sparingly only
* when the aforementioned features are needed.
*/
DEDICATED,
/*
* When ZkBaseDataAccessor is created with the SHARED type, it only supports CRUD
* functionalities. This will be the default mode of creation.
*/
SHARED
}

enum RetCode {
OK,
NODE_EXISTS,
Expand Down Expand Up @@ -103,35 +122,103 @@ public ZkBaseDataAccessor(HelixZkClient zkClient) {

/**
* The ZkBaseDataAccessor with custom serializer support of ZkSerializer type.
* Note: This constructor will use a shared ZkConnection.
* Do NOT use this for ephemeral node creation/callbacks/session management.
* Do use this for simple CRUD operations to ZooKeeper.
* @param zkAddress The zookeeper address
*/
public ZkBaseDataAccessor(String zkAddress, ZkSerializer zkSerializer) {
_zkClient = SharedZkClientFactory.getInstance().buildZkClient(
new HelixZkClient.ZkConnectionConfig(zkAddress),
new HelixZkClient.ZkClientConfig().setZkSerializer(zkSerializer));
_usesExternalZkClient = false;
this(zkAddress, zkSerializer, ZkClientType.SHARED);
}

/**
* The ZkBaseDataAccessor with custom serializer support of PathBasedZkSerializer type.
* Note: This constructor will use a shared ZkConnection.
* Do NOT use this for ephemeral node creation/callbacks/session management.
* Do use this for simple CRUD operations to ZooKeeper.
* @param zkAddress The zookeeper address
*/
public ZkBaseDataAccessor(String zkAddress, PathBasedZkSerializer zkSerializer) {
_zkClient = SharedZkClientFactory.getInstance().buildZkClient(
new HelixZkClient.ZkConnectionConfig(zkAddress),
new HelixZkClient.ZkClientConfig().setZkSerializer(zkSerializer));
_usesExternalZkClient = false;
public ZkBaseDataAccessor(String zkAddress, PathBasedZkSerializer pathBasedZkSerializer) {
this(zkAddress, pathBasedZkSerializer, ZkClientType.SHARED);
}

/**
* The default ZkBaseDataAccessor with {@link org.apache.helix.ZNRecord} as the data model;
* Creates a ZkBaseDataAccessor with {@link org.apache.helix.ZNRecord} as the data model.
* Uses a shared ZkConnection resource.
* Does NOT support ephemeral node creation, callbacks, or session management.
* Uses {@link ZNRecordSerializer} serializer
* @param zkAddress The zookeeper address
*/
public ZkBaseDataAccessor(String zkAddress) {
this(zkAddress, new ZNRecordSerializer());
}

/**
* Creates a ZkBaseDataAccessor with {@link org.apache.helix.ZNRecord} as the data model.
* If DEDICATED, it will use a dedicated ZkConnection, which allows ephemeral
* node creation, callbacks, and session management.
* If SHARED, it will use a shared ZkConnection, which only allows simple
* CRUD operations to ZooKeeper.
* @param zkAddress
* @param zkClientType
*/
public ZkBaseDataAccessor(String zkAddress, ZkClientType zkClientType) {
this(zkAddress, new ZNRecordSerializer(), zkClientType);
}

/**
* Creates a ZkBaseDataAccessor with a custom implementation of ZkSerializer.
* If DEDICATED, it will use a dedicated ZkConnection, which allows ephemeral
* node creation, callbacks, and session management.
* If SHARED, it will use a shared ZkConnection, which only allows simple
* CRUD operations to ZooKeeper.
* @param zkAddress
* @param zkSerializer
*/
public ZkBaseDataAccessor(String zkAddress, ZkSerializer zkSerializer,
ZkClientType zkClientType) {
switch (zkClientType) {
case DEDICATED:
_zkClient = DedicatedZkClientFactory.getInstance().buildZkClient(
new HelixZkClient.ZkConnectionConfig(zkAddress),
new HelixZkClient.ZkClientConfig().setZkSerializer(zkSerializer));
break;
case SHARED:
default:
_zkClient = SharedZkClientFactory.getInstance().buildZkClient(
new HelixZkClient.ZkConnectionConfig(zkAddress),
new HelixZkClient.ZkClientConfig().setZkSerializer(zkSerializer));
}
_usesExternalZkClient = false;
}

/**
* Creates a ZkBaseDataAccessor with a custom implementation of PathBasedZkSerializer.
* If created with DEDICATED mode, it will use a dedicated ZkConnection, which allows ephemeral
* node creation, callbacks, and session management.
* If SHARED, it will use a shared ZkConnection, which only allows simple
* CRUD operations to ZooKeeper.
* @param zkAddress
* @param pathBasedZkSerializer
* @param zkClientType
*/
public ZkBaseDataAccessor(String zkAddress, PathBasedZkSerializer pathBasedZkSerializer,
ZkClientType zkClientType) {
switch (zkClientType) {
case DEDICATED:
_zkClient = DedicatedZkClientFactory.getInstance().buildZkClient(
new HelixZkClient.ZkConnectionConfig(zkAddress),
new HelixZkClient.ZkClientConfig().setZkSerializer(pathBasedZkSerializer));
break;
case SHARED:
default:
_zkClient = SharedZkClientFactory.getInstance().buildZkClient(
new HelixZkClient.ZkConnectionConfig(zkAddress),
new HelixZkClient.ZkClientConfig().setZkSerializer(pathBasedZkSerializer));
}
_usesExternalZkClient = false;
}

/**
* sync create
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.helix.HelixException;
import org.apache.helix.manager.zk.ZkAsyncCallbacks.CreateCallbackHandler;
import org.apache.helix.manager.zk.ZkBaseDataAccessor.RetCode;
import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
import org.apache.helix.manager.zk.client.HelixZkClient;
import org.apache.helix.manager.zk.client.SharedZkClientFactory;
import org.apache.helix.store.HelixPropertyListener;
Expand Down Expand Up @@ -106,18 +107,32 @@ public int compare(String o1, String o2) {

public ZkCacheBaseDataAccessor(String zkAddress, ZkSerializer serializer, String chrootPath,
List<String> wtCachePaths, List<String> zkCachePaths) {
this(zkAddress, serializer, chrootPath, wtCachePaths, zkCachePaths, null, null);
this(zkAddress, serializer, chrootPath, wtCachePaths, zkCachePaths, null, null,
ZkBaseDataAccessor.ZkClientType.SHARED);
}

public ZkCacheBaseDataAccessor(String zkAddress, ZkSerializer serializer, String chrootPath,
List<String> wtCachePaths, List<String> zkCachePaths, String monitorType, String monitorkey) {
HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
clientConfig.setZkSerializer(serializer)
.setMonitorType(monitorType)
.setMonitorKey(monitorkey);
_zkClient = SharedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress), clientConfig);
this(zkAddress, serializer, chrootPath, wtCachePaths, zkCachePaths, monitorType, monitorkey,
ZkBaseDataAccessor.ZkClientType.SHARED);
}

public ZkCacheBaseDataAccessor(String zkAddress, ZkSerializer serializer, String chrootPath,
List<String> wtCachePaths, List<String> zkCachePaths, String monitorType, String monitorkey,
ZkBaseDataAccessor.ZkClientType zkClientType) {
HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
clientConfig.setZkSerializer(serializer).setMonitorType(monitorType).setMonitorKey(monitorkey);
switch (zkClientType) {
case DEDICATED:
_zkClient = DedicatedZkClientFactory.getInstance().buildZkClient(
new HelixZkClient.ZkConnectionConfig(zkAddress),
new HelixZkClient.ZkClientConfig().setZkSerializer(serializer));
break;
case SHARED:
default:
_zkClient = SharedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress), clientConfig);
}
_zkClient.waitUntilConnected(HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
_baseAccessor = new ZkBaseDataAccessor<>(_zkClient);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,9 @@ public ZkHelixPropertyStore(String zkAddress, ZkSerializer serializer, String ch
public ZkHelixPropertyStore(String zkAddress, ZkSerializer serializer, String chrootPath) {
super(zkAddress, serializer, chrootPath, null, null, MONITOR_TYPE, chrootPath);
}

public ZkHelixPropertyStore(String zkAddress, ZkSerializer serializer, String chrootPath,
ZkBaseDataAccessor.ZkClientType zkClientType) {
super(zkAddress, serializer, chrootPath, null, null, MONITOR_TYPE, chrootPath, zkClientType);
}
}