Skip to content
Permalink
Browse files
Add TTL and Container Modes to ZkClient and BaseDataAccessor APIs (#2138
)

Add TTL and Container Modes to ZkClient and BaseDataAccessor APIs
  • Loading branch information
rabashizade committed Jun 3, 2022
1 parent 4faa5f6 commit 3ed681be0746ac0383924ecb716f8e0741589be6
Show file tree
Hide file tree
Showing 19 changed files with 1,135 additions and 40 deletions.
@@ -52,7 +52,7 @@ under the License.
<dependency org="org.apache.logging.log4j" name="log4j-slf4j-impl" rev="2.17.1" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)">
<artifact name="log4j-slf4j-impl" ext="jar"/>
</dependency>
<dependency org="org.apache.zookeeper" name="zookeeper" rev="3.4.13" conf="compile->compile(default);runtime->runtime(default);default->default"/>
<dependency org="org.apache.zookeeper" name="zookeeper" rev="3.5.9" conf="compile->compile(default);runtime->runtime(default);default->default"/>
<dependency org="com.fasterxml.jackson.core" name="jackson-databind" rev="2.12.6.1" conf="compile->compile(default);runtime->runtime(default);default->default"/>
<dependency org="com.fasterxml.jackson.core" name="jackson-core" rev="2.12.6" conf="compile->compile(default);runtime->runtime(default);default->default"/>
<dependency org="commons-io" name="commons-io" rev="2.11.0" conf="compile->compile(default);runtime->runtime(default);default->default"/>
@@ -43,6 +43,18 @@
*/
boolean create(String path, T record, int options);

/**
* This will always attempt to create the znode, if it exists it will return false. Will
* create parents if they do not exist. For performance reasons, it may try to create
* child first and only if it fails it will try to create parents
* @param path path to the ZNode to create
* @param record the data to write to the ZNode
* @param options Set the type of ZNode see the valid values in {@link AccessOption}
* @param ttl TTL of the node in milliseconds, if options supports it
* @return true if creation succeeded, false otherwise (e.g. if the ZNode exists)
*/
boolean create(String path, T record, int options, long ttl);

/**
* This will always attempt to set the data on existing node. If the ZNode does not
* exist it will create it and all its parents ZNodes if necessary
@@ -95,6 +107,17 @@
*/
boolean[] createChildren(List<String> paths, List<T> records, int options);

/**
* Use it when creating children under a parent node. This will use async api for better
* performance. If the child already exists it will return false.
* @param paths the paths to the children ZNodes
* @param records List of data to write to each of the path
* @param options Set the type of ZNode see the valid values in {@link AccessOption}
* @param ttl TTL of the node in milliseconds, if options supports it
* @return For each child: true if creation succeeded, false otherwise (e.g. if the child exists)
*/
boolean[] createChildren(List<String> paths, List<T> records, int options, long ttl);

/**
* can set multiple children under a parent node. This will use async api for better
* performance. If this child does not exist it will create it.
@@ -47,6 +47,7 @@
import org.apache.helix.zookeeper.zkclient.DataUpdater;
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
import org.apache.helix.zookeeper.zkclient.IZkDataListener;
import org.apache.helix.zookeeper.zkclient.ZkClient;
import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks;
import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
import org.apache.helix.zookeeper.zkclient.exception.ZkException;
@@ -249,14 +250,29 @@ public ZkBaseDataAccessor(String zkAddress, PathBasedZkSerializer pathBasedZkSer
*/
@Override
public boolean create(String path, T record, int options) {
AccessResult result = doCreate(path, record, options);
return create(path, record, options, ZkClient.TTL_NOT_SET);
}

/**
* sync create with TTL
*/
@Override
public boolean create(String path, T record, int options, long ttl) {
AccessResult result = doCreate(path, record, options, ttl);
return result._retCode == RetCode.OK;
}

/**
* sync create
*/
public AccessResult doCreate(String path, T record, int options) {
return doCreate(path, record, options, ZkClient.TTL_NOT_SET);
}

/**
* sync create with TTL
*/
public AccessResult doCreate(String path, T record, int options, long ttl) {
AccessResult result = new AccessResult();
CreateMode mode = AccessOption.getMode(options);
if (mode == null) {
@@ -269,7 +285,7 @@ public AccessResult doCreate(String path, T record, int options) {
do {
retry = false;
try {
_zkClient.create(path, record, mode);
_zkClient.create(path, record, mode, ttl);
result._pathCreated.add(path);

result._retCode = RetCode.OK;
@@ -278,7 +294,14 @@ public AccessResult doCreate(String path, T record, int options) {
// this will happen if parent node does not exist
String parentPath = HelixUtil.getZkParentPath(path);
try {
AccessResult res = doCreate(parentPath, null, AccessOption.PERSISTENT);
AccessResult res;
if (mode.isTTL()) {
res = doCreate(parentPath, null, options, ttl);
} else if (mode.isContainer()) {
res = doCreate(parentPath, null, AccessOption.CONTAINER);
} else {
res = doCreate(parentPath, null, AccessOption.PERSISTENT);
}
result._pathCreated.addAll(res._pathCreated);
RetCode rc = res._retCode;
if (rc == RetCode.OK || rc == RetCode.NODE_EXISTS) {
@@ -720,6 +743,14 @@ public boolean remove(String path, int options) {
*/
ZkAsyncCallbacks.CreateCallbackHandler[] create(List<String> paths, List<T> records,
boolean[] needCreate, List<List<String>> pathsCreated, int options) {
return create(paths, records, needCreate, pathsCreated, options, ZkClient.TTL_NOT_SET);
}

/**
* async create with TTL. give up on error other than NONODE
*/
ZkAsyncCallbacks.CreateCallbackHandler[] create(List<String> paths, List<T> records,
boolean[] needCreate, List<List<String>> pathsCreated, int options, long ttl) {
if ((records != null && records.size() != paths.size()) || needCreate.length != paths.size()
|| (pathsCreated != null && pathsCreated.size() != paths.size())) {
throw new IllegalArgumentException(
@@ -747,7 +778,11 @@ ZkAsyncCallbacks.CreateCallbackHandler[] create(List<String> paths, List<T> reco
String path = paths.get(i);
T record = records == null ? null : records.get(i);
cbList[i] = new ZkAsyncCallbacks.CreateCallbackHandler();
_zkClient.asyncCreate(path, record, mode, cbList[i]);
if (mode.isTTL()) {
_zkClient.asyncCreate(path, record, mode, ttl, cbList[i]);
} else {
_zkClient.asyncCreate(path, record, mode, cbList[i]);
}
}

List<String> parentPaths = new ArrayList<>(Collections.<String>nCopies(paths.size(), null));
@@ -784,8 +819,16 @@ ZkAsyncCallbacks.CreateCallbackHandler[] create(List<String> paths, List<T> reco
if (failOnNoNode) {
boolean[] needCreateParent = Arrays.copyOf(needCreate, needCreate.length);

ZkAsyncCallbacks.CreateCallbackHandler[] parentCbList =
create(parentPaths, null, needCreateParent, pathsCreated, AccessOption.PERSISTENT);
ZkAsyncCallbacks.CreateCallbackHandler[] parentCbList;
if (mode.isTTL()) {
parentCbList = create(parentPaths, null, needCreateParent, pathsCreated, options, ttl);
} else if (mode.isContainer()) {
parentCbList =
create(parentPaths, null, needCreateParent, pathsCreated, AccessOption.CONTAINER);
} else {
parentCbList =
create(parentPaths, null, needCreateParent, pathsCreated, AccessOption.PERSISTENT);
}
for (int i = 0; i < parentCbList.length; i++) {
ZkAsyncCallbacks.CreateCallbackHandler parentCb = parentCbList[i];
if (parentCb == null) {
@@ -812,6 +855,15 @@ ZkAsyncCallbacks.CreateCallbackHandler[] create(List<String> paths, List<T> reco
*/
@Override
public boolean[] createChildren(List<String> paths, List<T> records, int options) {
return createChildren(paths, records, options, ZkClient.TTL_NOT_SET);
}

/**
* async create with TTL
* TODO: rename to create
*/
@Override
public boolean[] createChildren(List<String> paths, List<T> records, int options, long ttl) {
boolean[] success = new boolean[paths.size()];

CreateMode mode = AccessOption.getMode(options);
@@ -829,7 +881,7 @@ public boolean[] createChildren(List<String> paths, List<T> records, int options
try {

ZkAsyncCallbacks.CreateCallbackHandler[] cbList =
create(paths, records, needCreate, pathsCreated, options);
create(paths, records, needCreate, pathsCreated, options, ttl);

for (int i = 0; i < cbList.length; i++) {
ZkAsyncCallbacks.CreateCallbackHandler cb = cbList[i];
@@ -38,6 +38,7 @@
import org.apache.helix.zookeeper.zkclient.DataUpdater;
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
import org.apache.helix.zookeeper.zkclient.IZkDataListener;
import org.apache.helix.zookeeper.zkclient.ZkClient;
import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks;
import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
@@ -225,6 +226,11 @@ private void updateCache(Cache<T> cache, List<String> createPaths, boolean succe

@Override
public boolean create(String path, T data, int options) {
return create(path, data, options, ZkClient.TTL_NOT_SET);
}

@Override
public boolean create(String path, T data, int options, long ttl) {
String clientPath = path;
String serverPath = prependChroot(clientPath);

@@ -233,7 +239,7 @@ public boolean create(String path, T data, int options) {
try {
cache.lockWrite();
ZkBaseDataAccessor<T>.AccessResult result =
_baseAccessor.doCreate(serverPath, data, options);
_baseAccessor.doCreate(serverPath, data, options, ttl);
boolean success = (result._retCode == RetCode.OK);

updateCache(cache, result._pathCreated, success, serverPath, data, ZNode.ZERO_STAT);
@@ -245,7 +251,7 @@ public boolean create(String path, T data, int options) {
}

// no cache
return _baseAccessor.create(serverPath, data, options);
return _baseAccessor.create(serverPath, data, options, ttl);
}

@Override
@@ -426,6 +432,11 @@ public Stat getStat(String path, int options) {

@Override
public boolean[] createChildren(List<String> paths, List<T> records, int options) {
return createChildren(paths, records, options, ZkClient.TTL_NOT_SET);
}

@Override
public boolean[] createChildren(List<String> paths, List<T> records, int options, long ttl) {
final int size = paths.size();
List<String> serverPaths = prependChroot(paths);

@@ -438,7 +449,7 @@ public boolean[] createChildren(List<String> paths, List<T> records, int options
List<List<String>> pathsCreatedList =
new ArrayList<List<String>>(Collections.<List<String>>nCopies(size, null));
ZkAsyncCallbacks.CreateCallbackHandler[] createCbList =
_baseAccessor.create(serverPaths, records, needCreate, pathsCreatedList, options);
_baseAccessor.create(serverPaths, records, needCreate, pathsCreatedList, options, ttl);

boolean[] success = new boolean[size];
for (int i = 0; i < size; i++) {
@@ -456,7 +467,7 @@ public boolean[] createChildren(List<String> paths, List<T> records, int options
}

// no cache
return _baseAccessor.createChildren(serverPaths, records, options);
return _baseAccessor.createChildren(serverPaths, records, options, ttl);
}

@Override
@@ -210,6 +210,69 @@ public void testSyncCreate() {
System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
}

@Test
public void testSyncCreateWithTTL() {
System.setProperty("zookeeper.extendedTypesEnabled", "true");
String className = TestHelper.getTestClassName();
String methodName = TestHelper.getTestMethodName();
String testName = className + "_" + methodName;

System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));

String path = String.format("/%s/%s", _rootPath, "msg_0");
ZNRecord record = new ZNRecord("msg_0");
ZkBaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<>(_gZkClient);

boolean success = accessor.create(path, record, AccessOption.PERSISTENT_WITH_TTL);
Assert.assertFalse(success);
long ttl = 1L;
success = accessor.create(path, record, AccessOption.PERSISTENT_WITH_TTL, ttl);
Assert.assertTrue(success);
ZNRecord getRecord = _gZkClient.readData(path);
Assert.assertNotNull(getRecord);
Assert.assertEquals(getRecord.getId(), "msg_0");

record.setSimpleField("key0", "value0");
success = accessor.create(path, record, AccessOption.PERSISTENT_WITH_TTL, ttl);
Assert.assertFalse(success, "Should fail since node already exists");
getRecord = _gZkClient.readData(path);
Assert.assertNotNull(getRecord);
Assert.assertEquals(getRecord.getSimpleFields().size(), 0);

System.clearProperty("zookeeper.extendedTypesEnabled");
System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
}

@Test
public void testSyncCreateContainer() {
System.setProperty("zookeeper.extendedTypesEnabled", "true");
String className = TestHelper.getTestClassName();
String methodName = TestHelper.getTestMethodName();
String testName = className + "_" + methodName;

System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));

String path = String.format("/%s/%s", _rootPath, "msg_0");
ZNRecord record = new ZNRecord("msg_0");
ZkBaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<>(_gZkClient);

boolean success = accessor.create(path, record, AccessOption.CONTAINER);
Assert.assertTrue(success);
ZNRecord getRecord = _gZkClient.readData(path);
Assert.assertNotNull(getRecord);
Assert.assertEquals(getRecord.getId(), "msg_0");

record.setSimpleField("key0", "value0");
success = accessor.create(path, record, AccessOption.CONTAINER);
Assert.assertFalse(success, "Should fail since node already exists");
getRecord = _gZkClient.readData(path);
Assert.assertNotNull(getRecord);
Assert.assertEquals(getRecord.getSimpleFields().size(), 0);

System.clearProperty("zookeeper.extendedTypesEnabled");
System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
}

@Test
public void testDefaultAccessorCreateCustomData() {
String className = TestHelper.getTestClassName();
@@ -513,6 +576,52 @@ public void testAsyncZkBaseDataAccessor() {
Assert.assertEquals(record.getId(), msgId, "Should get what we created");
}

// test async createChildren with TTL
System.setProperty("zookeeper.extendedTypesEnabled", "true");
records = new ArrayList<>();
paths = new ArrayList<>();
for (int i = 0; i < 10; i++) {
String msgId = "msg_" + i;
paths.add(PropertyPathBuilder.instanceMessage(root, "host_2", msgId));
records.add(new ZNRecord(msgId));
}
success = accessor.createChildren(paths, records, AccessOption.PERSISTENT_WITH_TTL, 1L);
for (int i = 0; i < 10; i++) {
String msgId = "msg_" + i;
Assert.assertTrue(success[i], "Should succeed in create " + msgId);
}

// test get what we created
for (int i = 0; i < 10; i++) {
String msgId = "msg_" + i;
String path = PropertyPathBuilder.instanceMessage(root, "host_2", msgId);
ZNRecord record = _gZkClient.readData(path);
Assert.assertEquals(record.getId(), msgId, "Should get what we created");
}

// test async createChildren with Container mode
records = new ArrayList<>();
paths = new ArrayList<>();
for (int i = 0; i < 10; i++) {
String msgId = "msg_" + i;
paths.add(PropertyPathBuilder.instanceMessage(root, "host_3", msgId));
records.add(new ZNRecord(msgId));
}
success = accessor.createChildren(paths, records, AccessOption.CONTAINER);
for (int i = 0; i < 10; i++) {
String msgId = "msg_" + i;
Assert.assertTrue(success[i], "Should succeed in create " + msgId);
}

// test get what we created
for (int i = 0; i < 10; i++) {
String msgId = "msg_" + i;
String path = PropertyPathBuilder.instanceMessage(root, "host_3", msgId);
ZNRecord record = _gZkClient.readData(path);
Assert.assertEquals(record.getId(), msgId, "Should get what we created");
}
System.clearProperty("zookeeper.extendedTypesEnabled");

// test async setChildren
records = new ArrayList<>();
paths = new ArrayList<>();
@@ -67,6 +67,11 @@ public boolean create(String path, ZNRecord record, int options) {
return set(path, record, options);
}

@Override
public boolean create(String path, ZNRecord record, int options, long ttl) {
return set(path, record, options);
}

@Override
public boolean set(String path, ZNRecord record, int options) {
ZNode zNode = _recordMap.get(path);
@@ -112,6 +117,12 @@ public boolean[] createChildren(List<String> paths, List<ZNRecord> records,
return setChildren(paths, records, options);
}

@Override
public boolean[] createChildren(List<String> paths, List<ZNRecord> records,
int options, long ttl) {
return setChildren(paths, records, options);
}

@Override
public boolean[] setChildren(List<String> paths, List<ZNRecord> records, int options) {
boolean [] ret = new boolean[paths.size()];

0 comments on commit 3ed681b

Please sign in to comment.