From b2661825aa94cd1465763201eaf6172bf4bbd5e6 Mon Sep 17 00:00:00 2001 From: Hunter Lee Date: Fri, 14 Feb 2020 13:41:26 -0800 Subject: [PATCH 01/14] WIP: Add DedicatedZkClient and update DedicatedZkClientFactory --- .../impl/client/DedicatedZkClient.java | 558 ++++++++++++++++++ .../factory/DedicatedZkClientFactory.java | 13 +- .../helix/zookeeper/zkclient/ZkClient.java | 4 +- 3 files changed, 568 insertions(+), 7 deletions(-) create mode 100644 zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java new file mode 100644 index 0000000000..32d5958bc6 --- /dev/null +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java @@ -0,0 +1,558 @@ +package org.apache.helix.zookeeper.impl.client; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; +import org.apache.helix.zookeeper.zkclient.DataUpdater; +import org.apache.helix.zookeeper.zkclient.IZkChildListener; +import org.apache.helix.zookeeper.zkclient.IZkConnection; +import org.apache.helix.zookeeper.zkclient.IZkDataListener; +import org.apache.helix.zookeeper.zkclient.ZkConnection; +import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks; +import org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener; +import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException; +import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer; +import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.Op; +import org.apache.zookeeper.OpResult; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; + + +public class DedicatedZkClient implements RealmAwareZkClient { + private final ZkClient _rawZkClient; + private final Map _routingDataCache; + private final String _zkRealmShardingKey; + + public DedicatedZkClient(RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig, + RealmAwareZkClient.RealmAwareZkClientConfig clientConfig, + Map routingDataCache) { + + if (connectionConfig == null) { + throw new IllegalArgumentException("RealmAwareZkConnectionConfig cannot be null!"); + } + _zkRealmShardingKey = connectionConfig.getZkRealmShardingKey(); + + // TODO: Replace this Map with a real RoutingDataCache + if (routingDataCache == null) { + throw new IllegalArgumentException("RoutingDataCache cannot be null!"); + } + _routingDataCache = routingDataCache; + + // Get the ZkRealm address based on the ZK path sharding key + String zkRealmAddress = _routingDataCache.get(_zkRealmShardingKey); + + // Create a ZK connection + IZkConnection zkConnection = + new ZkConnection(zkRealmAddress, connectionConfig.getSessionTimeout()); + + // Create a ZkClient + _rawZkClient = new ZkClient(zkConnection, (int) clientConfig.getConnectInitTimeout(), + clientConfig.getOperationRetryTimeout(), clientConfig.getZkSerializer(), + clientConfig.getMonitorType(), clientConfig.getMonitorKey(), + clientConfig.getMonitorInstanceName(), clientConfig.isMonitorRootPathOnly()); + } + + @Override + public List subscribeChildChanges(String path, IZkChildListener listener) { + if (!checkIfPathBelongsToZkRealm(path)) { + throw new IllegalArgumentException( + "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path + + " ZK realm sharding key: " + _zkRealmShardingKey); + } + return _rawZkClient.subscribeChildChanges(path, listener); + } + + @Override + public void unsubscribeChildChanges(String path, IZkChildListener listener) { + if (!checkIfPathBelongsToZkRealm(path)) { + throw new IllegalArgumentException( + "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path + + " ZK realm sharding key: " + _zkRealmShardingKey); + } + _rawZkClient.unsubscribeChildChanges(path, listener); + } + + @Override + public void subscribeDataChanges(String path, IZkDataListener listener) { + if (!checkIfPathBelongsToZkRealm(path)) { + throw new IllegalArgumentException( + "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path + + " ZK realm sharding key: " + _zkRealmShardingKey); + } + _rawZkClient.subscribeDataChanges(path, listener); + } + + @Override + public void unsubscribeDataChanges(String path, IZkDataListener listener) { + if (!checkIfPathBelongsToZkRealm(path)) { + throw new IllegalArgumentException( + "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path + + " ZK realm sharding key: " + _zkRealmShardingKey); + } + _rawZkClient.unsubscribeDataChanges(path, listener); + } + + @Override + public void subscribeStateChanges(IZkStateListener listener) { + _rawZkClient.subscribeStateChanges(listener); + } + + @Override + public void unsubscribeStateChanges(IZkStateListener listener) { + _rawZkClient.unsubscribeStateChanges(listener); + } + + @Override + public void unsubscribeAll() { + _rawZkClient.unsubscribeAll(); + } + + @Override + public void createPersistent(String path) { + createPersistent(path, false); + } + + @Override + public void createPersistent(String path, boolean createParents) { + createPersistent(path, createParents, ZooDefs.Ids.OPEN_ACL_UNSAFE); + } + + @Override + public void createPersistent(String path, boolean createParents, List acl) { + if (!checkIfPathBelongsToZkRealm(path)) { + throw new IllegalArgumentException( + "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path + + " ZK realm sharding key: " + _zkRealmShardingKey); + } + _rawZkClient.createPersistent(path, createParents, acl); + } + + @Override + public void createPersistent(String path, Object data) { + create(path, data, CreateMode.PERSISTENT); + } + + @Override + public void createPersistent(String path, Object data, List acl) { + create(path, data, acl, CreateMode.PERSISTENT); + } + + @Override + public String createPersistentSequential(String path, Object data) { + return create(path, data, CreateMode.PERSISTENT_SEQUENTIAL); + } + + @Override + public String createPersistentSequential(String path, Object data, List acl) { + return create(path, data, acl, CreateMode.PERSISTENT_SEQUENTIAL); + } + + @Override + public void createEphemeral(String path) { + create(path, null, CreateMode.EPHEMERAL); + } + + @Override + public void createEphemeral(String path, String sessionId) { + createEphemeral(path, null, sessionId); + } + + @Override + public void createEphemeral(String path, List acl) { + create(path, null, acl, CreateMode.EPHEMERAL); + } + + @Override + public void createEphemeral(String path, List acl, String sessionId) { + create(path, null, acl, CreateMode.EPHEMERAL, sessionId); + } + + @Override + public String create(String path, Object data, CreateMode mode) { + return create(path, data, mode); + } + + @Override + public String create(String path, Object datat, List acl, CreateMode mode) { + return create(path, datat, acl, mode, null); + } + + @Override + public void createEphemeral(String path, Object data) { + create(path, data, CreateMode.EPHEMERAL); + } + + @Override + public void createEphemeral(String path, Object data, String sessionId) { + create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, sessionId); + } + + @Override + public void createEphemeral(String path, Object data, List acl) { + create(path, data, acl, CreateMode.EPHEMERAL); + } + + @Override + public void createEphemeral(String path, Object data, List acl, String sessionId) { + create(path, data, acl, CreateMode.EPHEMERAL, sessionId); + } + + @Override + public String createEphemeralSequential(String path, Object data) { + return create(path, data, CreateMode.EPHEMERAL_SEQUENTIAL); + } + + @Override + public String createEphemeralSequential(String path, Object data, List acl) { + return create(path, data, acl, CreateMode.EPHEMERAL_SEQUENTIAL); + } + + @Override + public String createEphemeralSequential(String path, Object data, String sessionId) { + return create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, + sessionId); + } + + @Override + public String createEphemeralSequential(String path, Object data, List acl, + String sessionId) { + return create(path, data, acl, CreateMode.EPHEMERAL_SEQUENTIAL, sessionId); + } + + @Override + public List getChildren(String path) { + if (!checkIfPathBelongsToZkRealm(path)) { + throw new IllegalArgumentException( + "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path + + " ZK realm sharding key: " + _zkRealmShardingKey); + } + return _rawZkClient.getChildren(path); + } + + @Override + public int countChildren(String path) { + if (!checkIfPathBelongsToZkRealm(path)) { + throw new IllegalArgumentException( + "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path + + " ZK realm sharding key: " + _zkRealmShardingKey); + } + return countChildren(path); + } + + @Override + public boolean exists(String path) { + if (!checkIfPathBelongsToZkRealm(path)) { + throw new IllegalArgumentException( + "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path + + " ZK realm sharding key: " + _zkRealmShardingKey); + } + return _rawZkClient.exists(path); + } + + @Override + public Stat getStat(String path) { + if (!checkIfPathBelongsToZkRealm(path)) { + throw new IllegalArgumentException( + "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path + + " ZK realm sharding key: " + _zkRealmShardingKey); + } + return _rawZkClient.getStat(path); + } + + @Override + public boolean waitUntilExists(String path, TimeUnit timeUnit, long time) { + if (!checkIfPathBelongsToZkRealm(path)) { + throw new IllegalArgumentException( + "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path + + " ZK realm sharding key: " + _zkRealmShardingKey); + } + return _rawZkClient.waitUntilExists(path, timeUnit, time); + } + + @Override + public void deleteRecursively(String path) { + if (!checkIfPathBelongsToZkRealm(path)) { + throw new IllegalArgumentException( + "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path + + " ZK realm sharding key: " + _zkRealmShardingKey); + } + _rawZkClient.deleteRecursively(path); + } + + @Override + public boolean delete(String path) { + if (!checkIfPathBelongsToZkRealm(path)) { + throw new IllegalArgumentException( + "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path + + " ZK realm sharding key: " + _zkRealmShardingKey); + } + return _rawZkClient.delete(path); + } + + @Override + public T readData(String path) { + return readData(path, false); + } + + @Override + public T readData(String path, boolean returnNullIfPathNotExists) { + T data = null; + try { + return readData(path, null); + } catch (ZkNoNodeException e) { + if (!returnNullIfPathNotExists) { + throw e; + } + } + return data; + } + + @Override + public T readData(String path, Stat stat) { + return readData(path, stat, _rawZkClient.hasListeners(path)); + } + + @Override + public T readData(String path, Stat stat, boolean watch) { + if (!checkIfPathBelongsToZkRealm(path)) { + throw new IllegalArgumentException( + "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path + + " ZK realm sharding key: " + _zkRealmShardingKey); + } + return _rawZkClient.readData(path, stat, watch); + } + + @Override + public T readDataAndStat(String path, Stat stat, boolean returnNullIfPathNotExists) { + T data = null; + try { + data = readData(path, stat); + } catch (ZkNoNodeException e) { + if (!returnNullIfPathNotExists) { + throw e; + } + } + return data; + } + + @Override + public void writeData(String path, Object object) { + writeData(path, object, -1); + } + + @Override + public void updateDataSerialized(String path, DataUpdater updater) { + if (!checkIfPathBelongsToZkRealm(path)) { + throw new IllegalArgumentException( + "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path + + " ZK realm sharding key: " + _zkRealmShardingKey); + } + _rawZkClient.updateDataSerialized(path, updater); + } + + @Override + public void writeData(String path, Object datat, int expectedVersion) { + writeDataReturnStat(path, datat, expectedVersion); + } + + @Override + public Stat writeDataReturnStat(String path, Object datat, int expectedVersion) { + if (!checkIfPathBelongsToZkRealm(path)) { + throw new IllegalArgumentException( + "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path + + " ZK realm sharding key: " + _zkRealmShardingKey); + } + return _rawZkClient.writeDataReturnStat(path, datat, expectedVersion); + } + + @Override + public Stat writeDataGetStat(String path, Object datat, int expectedVersion) { + return writeDataReturnStat(path, datat, expectedVersion); + } + + @Override + public void asyncCreate(String path, Object datat, CreateMode mode, + ZkAsyncCallbacks.CreateCallbackHandler cb) { + if (!checkIfPathBelongsToZkRealm(path)) { + throw new IllegalArgumentException( + "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path + + " ZK realm sharding key: " + _zkRealmShardingKey); + } + _rawZkClient.asyncCreate(path, datat, mode, cb); + } + + @Override + public void asyncSetData(String path, Object datat, int version, + ZkAsyncCallbacks.SetDataCallbackHandler cb) { + if (!checkIfPathBelongsToZkRealm(path)) { + throw new IllegalArgumentException( + "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path + + " ZK realm sharding key: " + _zkRealmShardingKey); + } + _rawZkClient.asyncSetData(path, datat, version, cb); + } + + @Override + public void asyncGetData(String path, ZkAsyncCallbacks.GetDataCallbackHandler cb) { + if (!checkIfPathBelongsToZkRealm(path)) { + throw new IllegalArgumentException( + "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path + + " ZK realm sharding key: " + _zkRealmShardingKey); + } + _rawZkClient.asyncGetData(path, cb); + } + + @Override + public void asyncExists(String path, ZkAsyncCallbacks.ExistsCallbackHandler cb) { + if (!checkIfPathBelongsToZkRealm(path)) { + throw new IllegalArgumentException( + "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path + + " ZK realm sharding key: " + _zkRealmShardingKey); + } + _rawZkClient.asyncExists(path, cb); + } + + @Override + public void asyncDelete(String path, ZkAsyncCallbacks.DeleteCallbackHandler cb) { + if (!checkIfPathBelongsToZkRealm(path)) { + throw new IllegalArgumentException( + "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path + + " ZK realm sharding key: " + _zkRealmShardingKey); + } + _rawZkClient.asyncDelete(path, cb); + } + + @Override + public void watchForData(String path) { + if (!checkIfPathBelongsToZkRealm(path)) { + throw new IllegalArgumentException( + "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path + + " ZK realm sharding key: " + _zkRealmShardingKey); + } + _rawZkClient.watchForData(path); + } + + @Override + public List watchForChilds(String path) { + if (!checkIfPathBelongsToZkRealm(path)) { + throw new IllegalArgumentException( + "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path + + " ZK realm sharding key: " + _zkRealmShardingKey); + } + return _rawZkClient.watchForChilds(path); + } + + @Override + public long getCreationTime(String path) { + if (!checkIfPathBelongsToZkRealm(path)) { + throw new IllegalArgumentException( + "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path + + " ZK realm sharding key: " + _zkRealmShardingKey); + } + return _rawZkClient.getCreationTime(path); + } + + @Override + public List multi(Iterable ops) { + return _rawZkClient.multi(ops); + } + + @Override + public boolean waitUntilConnected(long time, TimeUnit timeUnit) { + return _rawZkClient.waitUntilConnected(time, timeUnit); + } + + @Override + public String getServers() { + return _rawZkClient.getServers(); + } + + @Override + public long getSessionId() { + return _rawZkClient.getSessionId(); + } + + @Override + public void close() { + _rawZkClient.close(); + } + + @Override + public boolean isClosed() { + return _rawZkClient.isClosed(); + } + + @Override + public byte[] serialize(Object data, String path) { + return _rawZkClient.serialize(data, path); + } + + @Override + public T deserialize(byte[] data, String path) { + return _rawZkClient.deserialize(data, path); + } + + @Override + public void setZkSerializer(ZkSerializer zkSerializer) { + _rawZkClient.setZkSerializer(zkSerializer); + } + + @Override + public void setZkSerializer(PathBasedZkSerializer zkSerializer) { + _rawZkClient.setZkSerializer(zkSerializer); + } + + @Override + public PathBasedZkSerializer getZkSerializer() { + return _rawZkClient.getZkSerializer(); + } + + /** + * The most comprehensive create() method that checks the path to see if the create request should be authorized to go through. + * @param path + * @param dataObject + * @param acl + * @param mode + * @param expectedSessionId + * @return + */ + private String create(final String path, final Object dataObject, final List acl, + final CreateMode mode, final String expectedSessionId) { + if (!checkIfPathBelongsToZkRealm(path)) { + throw new IllegalArgumentException( + "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path + + " ZK realm sharding key: " + _zkRealmShardingKey); + } + return _rawZkClient.create(path, dataObject, acl, mode, expectedSessionId); + } + + private boolean checkIfPathBelongsToZkRealm(String path) { + // TODO: Check if the path's sharding key equals the sharding key + // TODO: Implement this with TrieRoutingData + return true; + } +} diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/DedicatedZkClientFactory.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/DedicatedZkClientFactory.java index 2695a5d6fa..842e4bb56d 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/DedicatedZkClientFactory.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/DedicatedZkClientFactory.java @@ -19,8 +19,11 @@ * under the License. */ +import java.util.HashMap; + import org.apache.helix.zookeeper.api.client.HelixZkClient; import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; +import org.apache.helix.zookeeper.impl.client.DedicatedZkClient; import org.apache.helix.zookeeper.impl.client.ZkClient; @@ -36,16 +39,16 @@ protected DedicatedZkClientFactory() { public RealmAwareZkClient buildZkClient( RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig, RealmAwareZkClient.RealmAwareZkClientConfig clientConfig) { - // TODO: Implement the logic - // Return an instance of DedicatedZkClient - return null; + // TODO: put a real routing data in as a parameter instead of new HashMap<>(); + return new DedicatedZkClient(connectionConfig, clientConfig, new HashMap<>()); } @Override public RealmAwareZkClient buildZkClient( RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig) { - // TODO: Implement the logic - return null; + // TODO: put a real routing data in as a parameter instead of new HashMap<>(); + return new DedicatedZkClient(connectionConfig, + new RealmAwareZkClient.RealmAwareZkClientConfig(), new HashMap<>()); } private static class SingletonHelper { diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java index 0507c3f396..3bc8985120 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java @@ -636,7 +636,7 @@ public String create(final String path, Object datat, final List acl, final * @throws IllegalArgumentException if called from anything else except the ZooKeeper event thread * @throws ZkException if any zookeeper exception occurs */ - private String create(final String path, final Object dataObject, final List acl, + public String create(final String path, final Object dataObject, final List acl, final CreateMode mode, final String expectedSessionId) throws IllegalArgumentException, ZkException { if (path == null) { @@ -1188,7 +1188,7 @@ public void run() throws Exception { } } - private boolean hasListeners(String path) { + public boolean hasListeners(String path) { Set dataListeners = _dataListener.get(path); if (dataListeners != null && dataListeners.size() > 0) { return true; From 9c86d1d148d4a45c250915756de6cb7c53e6366b Mon Sep 17 00:00:00 2001 From: Hunter Lee Date: Fri, 14 Feb 2020 17:07:27 -0800 Subject: [PATCH 02/14] Add Javadoc --- .../helix/zookeeper/impl/client/DedicatedZkClient.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java index 32d5958bc6..47ff2adc9f 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java @@ -42,6 +42,12 @@ import org.apache.zookeeper.data.Stat; +/** + * NOTE: DO NOT USE THIS CLASS DIRECTLY. Use DedicatedZkClientFactory to create instances of DedicatedZkClient. + * + * An implementation of the RealmAwareZkClient interface. + * Supports CRUD, data change subscriptiona, and ephemeral mode operations. + */ public class DedicatedZkClient implements RealmAwareZkClient { private final ZkClient _rawZkClient; private final Map _routingDataCache; From 673a9754d05d61387b3b9f880f22282f924ed375 Mon Sep 17 00:00:00 2001 From: Hunter Lee Date: Sun, 16 Feb 2020 14:38:54 -0800 Subject: [PATCH 03/14] Fix --- .../impl/client/DedicatedZkClient.java | 23 +++++++++++++++++-- .../helix/zookeeper/zkclient/ZkClient.java | 2 +- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java index 47ff2adc9f..948f4347f2 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java @@ -40,15 +40,19 @@ import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * NOTE: DO NOT USE THIS CLASS DIRECTLY. Use DedicatedZkClientFactory to create instances of DedicatedZkClient. * * An implementation of the RealmAwareZkClient interface. - * Supports CRUD, data change subscriptiona, and ephemeral mode operations. + * Supports CRUD, data change subscription, and ephemeral mode operations. */ public class DedicatedZkClient implements RealmAwareZkClient { + private static Logger LOG = LoggerFactory.getLogger(DedicatedZkClient.class); + private final ZkClient _rawZkClient; private final Map _routingDataCache; private final String _zkRealmShardingKey; @@ -339,7 +343,12 @@ public T readData(String path, boolean returnNullIfPathNotExists) { @Override public T readData(String path, Stat stat) { - return readData(path, stat, _rawZkClient.hasListeners(path)); + if (!checkIfPathBelongsToZkRealm(path)) { + throw new IllegalArgumentException( + "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path + + " ZK realm sharding key: " + _zkRealmShardingKey); + } + return _rawZkClient.readData(path, stat); } @Override @@ -514,11 +523,21 @@ public boolean isClosed() { @Override public byte[] serialize(Object data, String path) { + if (!checkIfPathBelongsToZkRealm(path)) { + throw new IllegalArgumentException( + "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path + + " ZK realm sharding key: " + _zkRealmShardingKey); + } return _rawZkClient.serialize(data, path); } @Override public T deserialize(byte[] data, String path) { + if (!checkIfPathBelongsToZkRealm(path)) { + throw new IllegalArgumentException( + "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path + + " ZK realm sharding key: " + _zkRealmShardingKey); + } return _rawZkClient.deserialize(data, path); } diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java index 3bc8985120..696d6a5298 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java @@ -1188,7 +1188,7 @@ public void run() throws Exception { } } - public boolean hasListeners(String path) { + private boolean hasListeners(String path) { Set dataListeners = _dataListener.get(path); if (dataListeners != null && dataListeners.size() > 0) { return true; From bcff9fc8e903227ce548951e82f551d8c9a55dba Mon Sep 17 00:00:00 2001 From: Hunter Lee Date: Mon, 17 Feb 2020 18:34:58 -0800 Subject: [PATCH 04/14] Add MetadataStoreRoutingData temporarily --- .../factory/RealmAwareZkClientFactory.java | 10 +- .../impl/client/DedicatedZkClient.java | 31 +- .../factory/DedicatedZkClientFactory.java | 11 +- .../factory/MetadataStoreRoutingData.java | 52 ++++ .../impl/factory/SharedZkClientFactory.java | 6 +- .../impl/factory/TrieRoutingData.java | 277 ++++++++++++++++++ 6 files changed, 366 insertions(+), 21 deletions(-) create mode 100644 zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/MetadataStoreRoutingData.java create mode 100644 zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/TrieRoutingData.java diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/factory/RealmAwareZkClientFactory.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/factory/RealmAwareZkClientFactory.java index f68ffe4045..4d3622a9a3 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/factory/RealmAwareZkClientFactory.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/factory/RealmAwareZkClientFactory.java @@ -20,6 +20,7 @@ */ import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; +import org.apache.helix.zookeeper.impl.factory.MetadataStoreRoutingData; /** @@ -30,16 +31,19 @@ public interface RealmAwareZkClientFactory { * Build a RealmAwareZkClient using specified connection config and client config. * @param connectionConfig * @param clientConfig + * @param metadataStoreRoutingData * @return HelixZkClient */ RealmAwareZkClient buildZkClient(RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig, - RealmAwareZkClient.RealmAwareZkClientConfig clientConfig); + RealmAwareZkClient.RealmAwareZkClientConfig clientConfig, + MetadataStoreRoutingData metadataStoreRoutingData); /** * Builds a RealmAwareZkClient using specified connection config and default client config. * @param connectionConfig + * @param metadataStoreRoutingData * @return RealmAwareZkClient */ - RealmAwareZkClient buildZkClient( - RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig); + RealmAwareZkClient buildZkClient(RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig, + MetadataStoreRoutingData metadataStoreRoutingData); } diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java index 948f4347f2..75553a6647 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java @@ -20,10 +20,10 @@ */ import java.util.List; -import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; +import org.apache.helix.zookeeper.impl.factory.MetadataStoreRoutingData; import org.apache.helix.zookeeper.zkclient.DataUpdater; import org.apache.helix.zookeeper.zkclient.IZkChildListener; import org.apache.helix.zookeeper.zkclient.IZkConnection; @@ -54,26 +54,32 @@ public class DedicatedZkClient implements RealmAwareZkClient { private static Logger LOG = LoggerFactory.getLogger(DedicatedZkClient.class); private final ZkClient _rawZkClient; - private final Map _routingDataCache; + private final MetadataStoreRoutingData _metadataStoreRoutingData; private final String _zkRealmShardingKey; + private final String _zkRealmAddress; public DedicatedZkClient(RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig, RealmAwareZkClient.RealmAwareZkClientConfig clientConfig, - Map routingDataCache) { + MetadataStoreRoutingData metadataStoreRoutingData) { if (connectionConfig == null) { throw new IllegalArgumentException("RealmAwareZkConnectionConfig cannot be null!"); } _zkRealmShardingKey = connectionConfig.getZkRealmShardingKey(); - // TODO: Replace this Map with a real RoutingDataCache - if (routingDataCache == null) { - throw new IllegalArgumentException("RoutingDataCache cannot be null!"); + if (metadataStoreRoutingData == null) { + throw new IllegalArgumentException("MetadataStoreRoutingData cannot be null!"); } - _routingDataCache = routingDataCache; + _metadataStoreRoutingData = metadataStoreRoutingData; // Get the ZkRealm address based on the ZK path sharding key - String zkRealmAddress = _routingDataCache.get(_zkRealmShardingKey); + String zkRealmAddress = _metadataStoreRoutingData.getMetadataStoreRealm(_zkRealmShardingKey); + if (zkRealmAddress == null || zkRealmAddress.isEmpty()) { + throw new IllegalArgumentException( + "ZK realm address for the given ZK realm sharding key is invalid! ZK realm address: " + + zkRealmAddress + " ZK realm sharding key: " + _zkRealmShardingKey); + } + _zkRealmAddress = zkRealmAddress; // Create a ZK connection IZkConnection zkConnection = @@ -575,9 +581,12 @@ private String create(final String path, final Object dataObject, final List(); - return new DedicatedZkClient(connectionConfig, clientConfig, new HashMap<>()); + RealmAwareZkClient.RealmAwareZkClientConfig clientConfig, + MetadataStoreRoutingData metadataStoreRoutingData) { + return new DedicatedZkClient(connectionConfig, clientConfig, metadataStoreRoutingData); } @Override public RealmAwareZkClient buildZkClient( - RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig) { + RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig, + MetadataStoreRoutingData metadataStoreRoutingData) { // TODO: put a real routing data in as a parameter instead of new HashMap<>(); return new DedicatedZkClient(connectionConfig, - new RealmAwareZkClient.RealmAwareZkClientConfig(), new HashMap<>()); + new RealmAwareZkClient.RealmAwareZkClientConfig(), metadataStoreRoutingData); } private static class SingletonHelper { diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/MetadataStoreRoutingData.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/MetadataStoreRoutingData.java new file mode 100644 index 0000000000..1b7003e893 --- /dev/null +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/MetadataStoreRoutingData.java @@ -0,0 +1,52 @@ +package org.apache.helix.zookeeper.impl.factory; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Map; +import java.util.NoSuchElementException; + + +/** + * TODO: remove when there's a separate module + */ +public interface MetadataStoreRoutingData { + /** + * Given a path, return all the "metadata store sharding key-metadata store realm address" pairs + * where the sharding keys contain the given path. For example, given "/a/b", return {"/a/b/c": + * "realm.address.c.com:1234", "/a/b/d": "realm.address.d.com:1234"} where "a/b/c" and "a/b/d" are + * sharding keys and the urls are realm addresses. If the path is invalid, returns an empty + * mapping. + * @param path - the path where the search is conducted + * @return all "sharding key-realm address" pairs where the sharding keys contain the given + * path if the path is valid; empty mapping otherwise + * @throws IllegalArgumentException - when the path is invalid + */ + Map getAllMappingUnderPath(String path) throws IllegalArgumentException; + + /** + * Given a path, return the realm address corresponding to the sharding key contained in the + * path. If the path doesn't contain a sharding key, throw NoSuchElementException. + * @param path - the path where the search is conducted + * @return the realm address corresponding to the sharding key contained in the path + * @throws IllegalArgumentException - when the path is invalid + * @throws NoSuchElementException - when the path doesn't contain a sharding key + */ + String getMetadataStoreRealm(String path) throws IllegalArgumentException, NoSuchElementException; +} diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/SharedZkClientFactory.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/SharedZkClientFactory.java index a9b8e33077..9f22274060 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/SharedZkClientFactory.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/SharedZkClientFactory.java @@ -44,7 +44,8 @@ protected SharedZkClientFactory() { @Override public RealmAwareZkClient buildZkClient( RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig, - RealmAwareZkClient.RealmAwareZkClientConfig clientConfig) { + RealmAwareZkClient.RealmAwareZkClientConfig clientConfig, + MetadataStoreRoutingData metadataStoreRoutingData) { // TODO: Implement the logic // Return an instance of SharedZkClient return null; @@ -52,7 +53,8 @@ public RealmAwareZkClient buildZkClient( @Override public RealmAwareZkClient buildZkClient( - RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig) { + RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig, + MetadataStoreRoutingData metadataStoreRoutingData) { // TODO: Implement the logic return null; } diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/TrieRoutingData.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/TrieRoutingData.java new file mode 100644 index 0000000000..573e19980c --- /dev/null +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/TrieRoutingData.java @@ -0,0 +1,277 @@ +package org.apache.helix.zookeeper.impl.factory; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.Deque; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; + + +/** + * TODO: remove when there's a separate module. + * This is a class that uses a data structure similar to trie to represent metadata store routing + * data. It is not exactly a trie because it in essence stores a mapping (from sharding keys to + * realm addresses) instead of pure text information; also, only the terminal nodes store meaningful + * information (realm addresses). + */ +public class TrieRoutingData implements MetadataStoreRoutingData { + private static final String DELIMITER = "/"; + + private final TrieNode _rootNode; + + public TrieRoutingData(Map> routingData) throws Exception { + if (routingData == null || routingData.isEmpty()) { + throw new Exception("routingData cannot be null or empty"); + } + + if (isRootShardingKey(routingData)) { + Map.Entry> entry = routingData.entrySet().iterator().next(); + _rootNode = new TrieNode(Collections.emptyMap(), "/", true, entry.getKey()); + } else { + _rootNode = new TrieNode(new HashMap<>(), "/", false, ""); + constructTrie(routingData); + } + } + + public Map getAllMappingUnderPath(String path) throws IllegalArgumentException { + if (path.isEmpty() || !path.substring(0, 1).equals(DELIMITER)) { + throw new IllegalArgumentException("Provided path is empty or does not have a leading \"" + + DELIMITER + "\" character: " + path); + } + + TrieNode curNode; + try { + curNode = findTrieNode(path, false); + } catch (NoSuchElementException e) { + return Collections.emptyMap(); + } + + Map resultMap = new HashMap<>(); + Deque nodeStack = new ArrayDeque<>(); + nodeStack.push(curNode); + while (!nodeStack.isEmpty()) { + curNode = nodeStack.pop(); + if (curNode.isShardingKey()) { + resultMap.put(curNode.getPath(), curNode.getRealmAddress()); + } else { + for (TrieNode child : curNode.getChildren().values()) { + nodeStack.push(child); + } + } + } + return resultMap; + } + + public String getMetadataStoreRealm(String path) + throws IllegalArgumentException, NoSuchElementException { + if (path.isEmpty() || !path.substring(0, 1).equals(DELIMITER)) { + throw new IllegalArgumentException("Provided path is empty or does not have a leading \"" + + DELIMITER + "\" character: " + path); + } + + TrieNode leafNode = findTrieNode(path, true); + return leafNode.getRealmAddress(); + } + + /** + * If findLeafAlongPath is false, then starting from the root node, find the trie node that the + * given path is pointing to and return it; raise NoSuchElementException if the path does + * not point to any node. If findLeafAlongPath is true, then starting from the root node, find the + * leaf node along the provided path; raise NoSuchElementException if the path does not + * point to any node or if there is no leaf node along the path. + * @param path - the path where the search is conducted + * @param findLeafAlongPath - whether the search is for a leaf node on the path + * @return the node pointed by the path or a leaf node along the path + * @throws NoSuchElementException - when the path points to nothing or when no leaf node is + * found + */ + private TrieNode findTrieNode(String path, boolean findLeafAlongPath) + throws NoSuchElementException { + if (path.equals(DELIMITER)) { + if (findLeafAlongPath && !_rootNode.isShardingKey()) { + throw new NoSuchElementException("No leaf node found along the path. Path: " + path); + } + return _rootNode; + } + + TrieNode curNode = _rootNode; + if (findLeafAlongPath && curNode.isShardingKey()) { + return curNode; + } + Map curChildren = curNode.getChildren(); + for (String pathSection : path.substring(1).split(DELIMITER, 0)) { + curNode = curChildren.get(pathSection); + if (curNode == null) { + throw new NoSuchElementException( + "The provided path is missing from the trie. Path: " + path); + } + if (findLeafAlongPath && curNode.isShardingKey()) { + return curNode; + } + curChildren = curNode.getChildren(); + } + if (findLeafAlongPath) { + throw new NoSuchElementException("No leaf node found along the path. Path: " + path); + } + return curNode; + } + + /** + * Checks for the edge case when the only sharding key in provided routing data is the delimiter + * or an empty string. When this is the case, the trie is valid and contains only one node, which + * is the root node, and the root node is a leaf node with a realm address associated with it. + * @param routingData - a mapping from "sharding keys" to "realm addresses" to be parsed into a + * trie + * @return whether the edge case is true + */ + private boolean isRootShardingKey(Map> routingData) { + if (routingData.size() == 1) { + for (List shardingKeys : routingData.values()) { + return shardingKeys.size() == 1 && shardingKeys.get(0).equals(DELIMITER); + } + } + + return false; + } + + /** + * Constructs a trie based on the provided routing data. It loops through all sharding keys and + * constructs the trie in a top down manner. + * @param routingData- a mapping from "sharding keys" to "realm addresses" to be parsed into a + * trie + * @throws Exception - when there is an empty sharding key (edge case that + * always renders the routing data invalid); when there is a sharding key which already + * contains a sharding key (invalid); when there is a sharding key that is a part of + * another sharding key (invalid); when a sharding key doesn't have a leading delimiter + */ + private void constructTrie(Map> routingData) + throws Exception { + for (Map.Entry> entry : routingData.entrySet()) { + for (String shardingKey : entry.getValue()) { + // Missing leading delimiter is invalid + if (shardingKey.isEmpty() || !shardingKey.substring(0, 1).equals(DELIMITER)) { + throw new Exception("Sharding key does not have a leading \"" + + DELIMITER + "\" character: " + shardingKey); + } + + // Root can only be a sharding key if it's the only sharding key. Since this method is + // running, the special case has already been checked, therefore it's definitely invalid + if (shardingKey.equals(DELIMITER)) { + throw new Exception( + "There exist other sharding keys. Root cannot be a sharding key."); + } + + // Locate the next delimiter + int nextDelimiterIndex = shardingKey.indexOf(DELIMITER, 1); + int prevDelimiterIndex = 0; + String keySection = shardingKey.substring(prevDelimiterIndex + 1, + nextDelimiterIndex > 0 ? nextDelimiterIndex : shardingKey.length()); + TrieNode curNode = _rootNode; + TrieNode nextNode = curNode.getChildren().get(keySection); + + // If the key section is not the last section yet, go in the loop; if the key section is the + // last section, exit + while (nextDelimiterIndex > 0) { + // If the node is already a leaf node, the current sharding key is invalid; if the node + // doesn't exist, construct a node and continue + if (nextNode != null && nextNode.isShardingKey()) { + throw new Exception(shardingKey + " cannot be a sharding key because " + + shardingKey.substring(0, nextDelimiterIndex) + + " is its parent key and is also a sharding key."); + } else if (nextNode == null) { + nextNode = new TrieNode(new HashMap<>(), shardingKey.substring(0, nextDelimiterIndex), + false, ""); + curNode.addChild(keySection, nextNode); + } + prevDelimiterIndex = nextDelimiterIndex; + nextDelimiterIndex = shardingKey.indexOf(DELIMITER, prevDelimiterIndex + 1); + keySection = shardingKey.substring(prevDelimiterIndex + 1, + nextDelimiterIndex > 0 ? nextDelimiterIndex : shardingKey.length()); + curNode = nextNode; + nextNode = curNode.getChildren().get(keySection); + } + + // If the last node already exists, it's a part of another sharding key, making the current + // sharding key invalid + if (nextNode != null) { + throw new Exception(shardingKey + + " cannot be a sharding key because it is a parent key to another sharding key."); + } + nextNode = new TrieNode(new HashMap<>(), shardingKey, true, entry.getKey()); + curNode.addChild(keySection, nextNode); + } + } + } + + private static class TrieNode { + /** + * This field is a mapping between trie key and children nodes. For example, node "a" has + * children "ab" and "ac", therefore the keys are "b" and "c" respectively. + */ + private Map _children; + /** + * This field states whether the path represented by the node is a sharding key + */ + private final boolean _isShardingKey; + /** + * This field contains the complete path/prefix leading to the current node. For example, the + * name of root node is "/", then the name of its child node + * is "/a", and the name of the child's child node is "/a/b". + */ + private final String _path; + /** + * This field represents the data contained in a node(which represents a path), and is only + * available to the terminal nodes. + */ + private final String _realmAddress; + + TrieNode(Map children, String path, boolean isShardingKey, + String realmAddress) { + _children = children; + _isShardingKey = isShardingKey; + _path = path; + _realmAddress = realmAddress; + } + + public Map getChildren() { + return _children; + } + + public boolean isShardingKey() { + return _isShardingKey; + } + + public String getPath() { + return _path; + } + + public String getRealmAddress() { + return _realmAddress; + } + + public void addChild(String key, TrieNode node) { + _children.put(key, node); + } + } +} From b132cf30a6c804a90d53d70aec40707c43021fc8 Mon Sep 17 00:00:00 2001 From: Hunter Lee Date: Tue, 18 Feb 2020 19:16:26 -0800 Subject: [PATCH 05/14] Add tests --- zookeeper-api/pom.xml | 6 + .../impl/client/DedicatedZkClient.java | 9 +- .../factory/DedicatedZkClientFactory.java | 2 - zookeeper-api/src/test/conf/testng.xml | 2 +- .../impl/client/TestDedicatedZkClient.java | 154 ++++++++++++++++++ .../zookeeper/impl/client/ZkTestBase.java | 148 +++++++++++++++++ 6 files changed, 316 insertions(+), 5 deletions(-) create mode 100644 zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestDedicatedZkClient.java create mode 100644 zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/ZkTestBase.java diff --git a/zookeeper-api/pom.xml b/zookeeper-api/pom.xml index 5ec1e561fe..395fe39000 100644 --- a/zookeeper-api/pom.xml +++ b/zookeeper-api/pom.xml @@ -79,6 +79,12 @@ under the License. testng test + + commons-io + commons-io + 2.6 + test + diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java index 75553a6647..301bee1d8b 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java @@ -20,6 +20,7 @@ */ import java.util.List; +import java.util.NoSuchElementException; import java.util.concurrent.TimeUnit; import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; @@ -209,7 +210,7 @@ public void createEphemeral(String path, List acl, String sessionId) { @Override public String create(String path, Object data, CreateMode mode) { - return create(path, data, mode); + return create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode); } @Override @@ -587,6 +588,10 @@ private String create(final String path, final Object dataObject, final List - + diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestDedicatedZkClient.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestDedicatedZkClient.java new file mode 100644 index 0000000000..52885aafbe --- /dev/null +++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestDedicatedZkClient.java @@ -0,0 +1,154 @@ +package org.apache.helix.zookeeper.impl.client; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer; +import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory; +import org.apache.helix.zookeeper.impl.factory.MetadataStoreRoutingData; +import org.apache.helix.zookeeper.impl.factory.TrieRoutingData; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class TestDedicatedZkClient extends ZkTestBase { + private static final String ZK_SHARDING_KEY_PREFIX = "/TEST_SHARDING_KEY_"; + private static final String TEST_VALID_PATH = ZK_SHARDING_KEY_PREFIX + "_" + 0 + "/a/b/c"; + private static final String TEST_INVALID_PATH = ZK_SHARDING_KEY_PREFIX + "_invalid" + "/a/b/c"; + + // Mapping + private static final Map> RAW_ROUTING_DATA = new HashMap<>(); + + private RealmAwareZkClient _realmAwareZkClient; + private MetadataStoreRoutingData _metadataStoreRoutingData; + + @BeforeClass + public void beforeClass() + throws Exception { + // Populate RAW_ROUTING_DATA + for (int i = 0; i < _numZk; i++) { + List shardingKeyList = new ArrayList<>(); + shardingKeyList.add(ZK_SHARDING_KEY_PREFIX + "_" + i); + String realmName = ZK_PREFIX + (ZK_START_PORT + i); + RAW_ROUTING_DATA.put(realmName, shardingKeyList); + } + + // Feed the raw routing data into TrieRoutingData to construct an in-memory representation of routing information + _metadataStoreRoutingData = new TrieRoutingData(RAW_ROUTING_DATA); + } + + /** + * 1. Create a DedicatedZkClient with a non-existing sharding key (for which there is no valid ZK realm) + * -> This should fail with an exception + * 2. Create a DedicatedZkClient with a valid sharding key + * -> This should pass + */ + @Test + public void testDedicatedZkClientCreation() { + // Create a DedicatedZkClient + String invalidShardingKey = "InvalidShardingKey"; + RealmAwareZkClient.RealmAwareZkClientConfig clientConfig = + new RealmAwareZkClient.RealmAwareZkClientConfig(); + + // Create a connection config with the invalid sharding key + RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig = + new RealmAwareZkClient.RealmAwareZkConnectionConfig(invalidShardingKey); + + try { + _realmAwareZkClient = DedicatedZkClientFactory.getInstance() + .buildZkClient(connectionConfig, clientConfig, _metadataStoreRoutingData); + Assert.fail("Should not succeed with an invalid sharding key!"); + } catch (IllegalArgumentException e) { + // Expected + } + + // Use a valid sharding key this time around + String validShardingKey = ZK_SHARDING_KEY_PREFIX + "_" + 0; // Use TEST_SHARDING_KEY_0 + connectionConfig = new RealmAwareZkClient.RealmAwareZkConnectionConfig(validShardingKey); + _realmAwareZkClient = DedicatedZkClientFactory.getInstance() + .buildZkClient(connectionConfig, clientConfig, _metadataStoreRoutingData); + } + + /** + * Test the persistent create() call against a valid path and an invalid path. + * Valid path is one that belongs to the realm designated by the sharding key. + * Invalid path is one that does not belong to the realm designated by the sharding key. + */ + @Test(dependsOnMethods = "testDedicatedZkClientCreation") + public void testDedicatedZkClientCreatePersistent() { + _realmAwareZkClient.setZkSerializer(new ZNRecordSerializer()); + + // Create a dummy ZNRecord + ZNRecord znRecord = new ZNRecord("DummyRecord"); + znRecord.setSimpleField("Dummy", "Value"); + + // Test writing and reading against the validPath + _realmAwareZkClient.createPersistent(TEST_VALID_PATH, true); + _realmAwareZkClient.writeData(TEST_VALID_PATH, znRecord); + Assert.assertEquals(_realmAwareZkClient.readData(TEST_VALID_PATH), znRecord); + + // Test writing and reading against the invalid path + try { + _realmAwareZkClient.createPersistent(TEST_INVALID_PATH, true); + _realmAwareZkClient.writeData(TEST_INVALID_PATH, znRecord); + Assert.fail("Create() should not succeed on an invalid path!"); + } catch (IllegalArgumentException e) { + // Okay - expected + } + } + + /** + * Test that exists() works on valid path and fails on invalid path. + */ + @Test(dependsOnMethods = "testDedicatedZkClientCreatePersistent") + public void testExists() { + Assert.assertTrue(_realmAwareZkClient.exists(TEST_VALID_PATH)); + + try { + _realmAwareZkClient.exists(TEST_INVALID_PATH); + Assert.fail("Exists() should not succeed on an invalid path!"); + } catch (IllegalArgumentException e) { + // Okay - expected + } + } + + /** + * Test that delete() works on valid path and fails on invalid path. + */ + @Test(dependsOnMethods = "testExists") + public void testDelete() { + try { + _realmAwareZkClient.delete(TEST_INVALID_PATH); + Assert.fail("Exists() should not succeed on an invalid path!"); + } catch (IllegalArgumentException e) { + // Okay - expected + } + + Assert.assertTrue(_realmAwareZkClient.delete(TEST_VALID_PATH)); + Assert.assertFalse(_realmAwareZkClient.exists(TEST_VALID_PATH)); + } +} \ No newline at end of file diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/ZkTestBase.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/ZkTestBase.java new file mode 100644 index 0000000000..7027838484 --- /dev/null +++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/ZkTestBase.java @@ -0,0 +1,148 @@ +package org.apache.helix.zookeeper.impl.client; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.File; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.util.HashMap; +import java.util.Map; + +import javax.management.MBeanServerConnection; +import javax.management.ObjectName; + +import org.apache.commons.io.FileUtils; +import org.apache.helix.zookeeper.zkclient.IDefaultNameSpace; +import org.apache.helix.zookeeper.zkclient.ZkServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterSuite; +import org.testng.annotations.BeforeSuite; + + +/** + * Test base class for various integration tests with an in-memory ZooKeeper. + */ +public class ZkTestBase { + private static final Logger LOG = LoggerFactory.getLogger(ZkTestBase.class); + private static final MBeanServerConnection MBEAN_SERVER = + ManagementFactory.getPlatformMBeanServer(); + + // maven surefire-plugin's multiple ZK config keys + private static final String MULTI_ZK_PROPERTY_KEY = "multiZk"; + private static final String NUM_ZK_PROPERTY_KEY = "numZk"; + + protected static final String ZK_PREFIX = "localhost:"; + protected static final int ZK_START_PORT = 2183; + + /* + * Multiple ZK references + */ + // The following maps hold ZK connect string as keys + protected Map _zkServerMap = new HashMap<>(); + protected int _numZk = 1; // Initial value + + @BeforeSuite + public void beforeSuite() + throws IOException { + // Due to ZOOKEEPER-2693 fix, we need to specify whitelist for execute zk commends + System.setProperty("zookeeper.4lw.commands.whitelist", "*"); + + // Set up in-memory ZooKeepers + setupZooKeepers(); + + // Clean up all JMX objects + for (ObjectName mbean : MBEAN_SERVER.queryNames(null, null)) { + try { + MBEAN_SERVER.unregisterMBean(mbean); + } catch (Exception e) { + // OK + } + } + } + + @AfterSuite + public void afterSuite() + throws IOException { + // Clean up all JMX objects + for (ObjectName mbean : MBEAN_SERVER.queryNames(null, null)) { + try { + MBEAN_SERVER.unregisterMBean(mbean); + } catch (Exception e) { + // OK + } + } + + // Shut down all ZkServers + _zkServerMap.values().forEach(ZkServer::shutdown); + } + + private void setupZooKeepers() { + // If multi-ZooKeeper is enabled, start more ZKs. Otherwise, just set up one ZK + String multiZkConfig = System.getProperty(MULTI_ZK_PROPERTY_KEY); + if (multiZkConfig != null && multiZkConfig.equalsIgnoreCase(Boolean.TRUE.toString())) { + String numZkFromConfig = System.getProperty(NUM_ZK_PROPERTY_KEY); + if (numZkFromConfig != null) { + try { + _numZk = Math.max(Integer.parseInt(numZkFromConfig), _numZk); + } catch (Exception e) { + Assert.fail("Failed to parse the number of ZKs from config!"); + } + } else { + Assert.fail("multiZk config is set but numZk config is missing!"); + } + } + + // Start "numZkFromConfigInt" ZooKeepers + for (int i = 0; i < _numZk; i++) { + String zkAddress = ZK_PREFIX + (ZK_START_PORT + i); + ZkServer zkServer = startZkServer(zkAddress); + _zkServerMap.put(zkAddress, zkServer); + } + } + + /** + * Creates an in-memory ZK at the given ZK address. + * @param zkAddress + * @return + */ + private ZkServer startZkServer(final String zkAddress) { + String zkDir = zkAddress.replace(':', '_'); + final String logDir = "/tmp/" + zkDir + "/logs"; + final String dataDir = "/tmp/" + zkDir + "/dataDir"; + + // Clean up local directory + try { + FileUtils.deleteDirectory(new File(dataDir)); + FileUtils.deleteDirectory(new File(logDir)); + } catch (IOException e) { + e.printStackTrace(); + } + + IDefaultNameSpace defaultNameSpace = zkClient -> { + }; + + int port = Integer.parseInt(zkAddress.substring(zkAddress.lastIndexOf(':') + 1)); + ZkServer zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, port); + zkServer.start(); + return zkServer; + } +} From 19326d32ee50faac686e4e4d3b24ade3a2d23d31 Mon Sep 17 00:00:00 2001 From: Hunter Lee Date: Wed, 19 Feb 2020 09:29:54 -0800 Subject: [PATCH 06/14] Just do a sharding key check --- .../impl/client/DedicatedZkClient.java | 175 ++++-------------- 1 file changed, 33 insertions(+), 142 deletions(-) diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java index 301bee1d8b..db0266c61d 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java @@ -20,7 +20,6 @@ */ import java.util.List; -import java.util.NoSuchElementException; import java.util.concurrent.TimeUnit; import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; @@ -95,41 +94,25 @@ public DedicatedZkClient(RealmAwareZkClient.RealmAwareZkConnectionConfig connect @Override public List subscribeChildChanges(String path, IZkChildListener listener) { - if (!checkIfPathBelongsToZkRealm(path)) { - throw new IllegalArgumentException( - "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path - + " ZK realm sharding key: " + _zkRealmShardingKey); - } + checkIfPathContainsShardingKey(path); return _rawZkClient.subscribeChildChanges(path, listener); } @Override public void unsubscribeChildChanges(String path, IZkChildListener listener) { - if (!checkIfPathBelongsToZkRealm(path)) { - throw new IllegalArgumentException( - "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path - + " ZK realm sharding key: " + _zkRealmShardingKey); - } + checkIfPathContainsShardingKey(path); _rawZkClient.unsubscribeChildChanges(path, listener); } @Override public void subscribeDataChanges(String path, IZkDataListener listener) { - if (!checkIfPathBelongsToZkRealm(path)) { - throw new IllegalArgumentException( - "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path - + " ZK realm sharding key: " + _zkRealmShardingKey); - } + checkIfPathContainsShardingKey(path); _rawZkClient.subscribeDataChanges(path, listener); } @Override public void unsubscribeDataChanges(String path, IZkDataListener listener) { - if (!checkIfPathBelongsToZkRealm(path)) { - throw new IllegalArgumentException( - "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path - + " ZK realm sharding key: " + _zkRealmShardingKey); - } + checkIfPathContainsShardingKey(path); _rawZkClient.unsubscribeDataChanges(path, listener); } @@ -160,11 +143,7 @@ public void createPersistent(String path, boolean createParents) { @Override public void createPersistent(String path, boolean createParents, List acl) { - if (!checkIfPathBelongsToZkRealm(path)) { - throw new IllegalArgumentException( - "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path - + " ZK realm sharding key: " + _zkRealmShardingKey); - } + checkIfPathContainsShardingKey(path); _rawZkClient.createPersistent(path, createParents, acl); } @@ -262,71 +241,43 @@ public String createEphemeralSequential(String path, Object data, List acl, @Override public List getChildren(String path) { - if (!checkIfPathBelongsToZkRealm(path)) { - throw new IllegalArgumentException( - "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path - + " ZK realm sharding key: " + _zkRealmShardingKey); - } + checkIfPathContainsShardingKey(path); return _rawZkClient.getChildren(path); } @Override public int countChildren(String path) { - if (!checkIfPathBelongsToZkRealm(path)) { - throw new IllegalArgumentException( - "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path - + " ZK realm sharding key: " + _zkRealmShardingKey); - } + checkIfPathContainsShardingKey(path); return countChildren(path); } @Override public boolean exists(String path) { - if (!checkIfPathBelongsToZkRealm(path)) { - throw new IllegalArgumentException( - "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path - + " ZK realm sharding key: " + _zkRealmShardingKey); - } + checkIfPathContainsShardingKey(path); return _rawZkClient.exists(path); } @Override public Stat getStat(String path) { - if (!checkIfPathBelongsToZkRealm(path)) { - throw new IllegalArgumentException( - "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path - + " ZK realm sharding key: " + _zkRealmShardingKey); - } + checkIfPathContainsShardingKey(path); return _rawZkClient.getStat(path); } @Override public boolean waitUntilExists(String path, TimeUnit timeUnit, long time) { - if (!checkIfPathBelongsToZkRealm(path)) { - throw new IllegalArgumentException( - "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path - + " ZK realm sharding key: " + _zkRealmShardingKey); - } + checkIfPathContainsShardingKey(path); return _rawZkClient.waitUntilExists(path, timeUnit, time); } @Override public void deleteRecursively(String path) { - if (!checkIfPathBelongsToZkRealm(path)) { - throw new IllegalArgumentException( - "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path - + " ZK realm sharding key: " + _zkRealmShardingKey); - } + checkIfPathContainsShardingKey(path); _rawZkClient.deleteRecursively(path); } @Override public boolean delete(String path) { - if (!checkIfPathBelongsToZkRealm(path)) { - throw new IllegalArgumentException( - "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path - + " ZK realm sharding key: " + _zkRealmShardingKey); - } + checkIfPathContainsShardingKey(path); return _rawZkClient.delete(path); } @@ -350,21 +301,13 @@ public T readData(String path, boolean returnNullIfPathNotExists) { @Override public T readData(String path, Stat stat) { - if (!checkIfPathBelongsToZkRealm(path)) { - throw new IllegalArgumentException( - "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path - + " ZK realm sharding key: " + _zkRealmShardingKey); - } + checkIfPathContainsShardingKey(path); return _rawZkClient.readData(path, stat); } @Override public T readData(String path, Stat stat, boolean watch) { - if (!checkIfPathBelongsToZkRealm(path)) { - throw new IllegalArgumentException( - "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path - + " ZK realm sharding key: " + _zkRealmShardingKey); - } + checkIfPathContainsShardingKey(path); return _rawZkClient.readData(path, stat, watch); } @@ -388,11 +331,7 @@ public void writeData(String path, Object object) { @Override public void updateDataSerialized(String path, DataUpdater updater) { - if (!checkIfPathBelongsToZkRealm(path)) { - throw new IllegalArgumentException( - "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path - + " ZK realm sharding key: " + _zkRealmShardingKey); - } + checkIfPathContainsShardingKey(path); _rawZkClient.updateDataSerialized(path, updater); } @@ -403,11 +342,7 @@ public void writeData(String path, Object datat, int expectedVersion) { @Override public Stat writeDataReturnStat(String path, Object datat, int expectedVersion) { - if (!checkIfPathBelongsToZkRealm(path)) { - throw new IllegalArgumentException( - "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path - + " ZK realm sharding key: " + _zkRealmShardingKey); - } + checkIfPathContainsShardingKey(path); return _rawZkClient.writeDataReturnStat(path, datat, expectedVersion); } @@ -419,82 +354,50 @@ public Stat writeDataGetStat(String path, Object datat, int expectedVersion) { @Override public void asyncCreate(String path, Object datat, CreateMode mode, ZkAsyncCallbacks.CreateCallbackHandler cb) { - if (!checkIfPathBelongsToZkRealm(path)) { - throw new IllegalArgumentException( - "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path - + " ZK realm sharding key: " + _zkRealmShardingKey); - } + checkIfPathContainsShardingKey(path); _rawZkClient.asyncCreate(path, datat, mode, cb); } @Override public void asyncSetData(String path, Object datat, int version, ZkAsyncCallbacks.SetDataCallbackHandler cb) { - if (!checkIfPathBelongsToZkRealm(path)) { - throw new IllegalArgumentException( - "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path - + " ZK realm sharding key: " + _zkRealmShardingKey); - } + checkIfPathContainsShardingKey(path); _rawZkClient.asyncSetData(path, datat, version, cb); } @Override public void asyncGetData(String path, ZkAsyncCallbacks.GetDataCallbackHandler cb) { - if (!checkIfPathBelongsToZkRealm(path)) { - throw new IllegalArgumentException( - "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path - + " ZK realm sharding key: " + _zkRealmShardingKey); - } + checkIfPathContainsShardingKey(path); _rawZkClient.asyncGetData(path, cb); } @Override public void asyncExists(String path, ZkAsyncCallbacks.ExistsCallbackHandler cb) { - if (!checkIfPathBelongsToZkRealm(path)) { - throw new IllegalArgumentException( - "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path - + " ZK realm sharding key: " + _zkRealmShardingKey); - } + checkIfPathContainsShardingKey(path); _rawZkClient.asyncExists(path, cb); } @Override public void asyncDelete(String path, ZkAsyncCallbacks.DeleteCallbackHandler cb) { - if (!checkIfPathBelongsToZkRealm(path)) { - throw new IllegalArgumentException( - "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path - + " ZK realm sharding key: " + _zkRealmShardingKey); - } + checkIfPathContainsShardingKey(path); _rawZkClient.asyncDelete(path, cb); } @Override public void watchForData(String path) { - if (!checkIfPathBelongsToZkRealm(path)) { - throw new IllegalArgumentException( - "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path - + " ZK realm sharding key: " + _zkRealmShardingKey); - } + checkIfPathContainsShardingKey(path); _rawZkClient.watchForData(path); } @Override public List watchForChilds(String path) { - if (!checkIfPathBelongsToZkRealm(path)) { - throw new IllegalArgumentException( - "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path - + " ZK realm sharding key: " + _zkRealmShardingKey); - } + checkIfPathContainsShardingKey(path); return _rawZkClient.watchForChilds(path); } @Override public long getCreationTime(String path) { - if (!checkIfPathBelongsToZkRealm(path)) { - throw new IllegalArgumentException( - "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path - + " ZK realm sharding key: " + _zkRealmShardingKey); - } + checkIfPathContainsShardingKey(path); return _rawZkClient.getCreationTime(path); } @@ -530,21 +433,13 @@ public boolean isClosed() { @Override public byte[] serialize(Object data, String path) { - if (!checkIfPathBelongsToZkRealm(path)) { - throw new IllegalArgumentException( - "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path - + " ZK realm sharding key: " + _zkRealmShardingKey); - } + checkIfPathContainsShardingKey(path); return _rawZkClient.serialize(data, path); } @Override public T deserialize(byte[] data, String path) { - if (!checkIfPathBelongsToZkRealm(path)) { - throw new IllegalArgumentException( - "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path - + " ZK realm sharding key: " + _zkRealmShardingKey); - } + checkIfPathContainsShardingKey(path); return _rawZkClient.deserialize(data, path); } @@ -574,24 +469,20 @@ public PathBasedZkSerializer getZkSerializer() { */ private String create(final String path, final Object dataObject, final List acl, final CreateMode mode, final String expectedSessionId) { - if (!checkIfPathBelongsToZkRealm(path)) { - throw new IllegalArgumentException( - "The given path does not map to the ZK realm for this DedicatedZkClient! Path: " + path - + " ZK realm sharding key: " + _zkRealmShardingKey); - } + checkIfPathContainsShardingKey(path); return _rawZkClient.create(path, dataObject, acl, mode, expectedSessionId); } /** - * Checks whether the given path belongs to the realm this DedicatedZkClient is designated to at initialization. + * Checks whether the given path belongs matches the ZK path sharding key this DedicatedZkClient is designated to at initialization. * @param path * @return */ - private boolean checkIfPathBelongsToZkRealm(String path) { - try { - return _zkRealmAddress.equals(_metadataStoreRoutingData.getMetadataStoreRealm(path)); - } catch (NoSuchElementException e) { - return false; + private void checkIfPathContainsShardingKey(String path) { + if (!path.startsWith(_zkRealmShardingKey)) { + throw new IllegalArgumentException( + "Given path: " + path + " does not match the sharding key: " + _zkRealmShardingKey + + " for this DedicatedZkClient!"); } } } From c2195260328d7afb018c75658ef88debe35e95eb Mon Sep 17 00:00:00 2001 From: Hunter Lee Date: Wed, 19 Feb 2020 11:16:58 -0800 Subject: [PATCH 07/14] Update and address comments --- .../impl/client/DedicatedZkClient.java | 36 ++++++++----------- .../helix/zookeeper/zkclient/ZkClient.java | 2 +- 2 files changed, 15 insertions(+), 23 deletions(-) diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java index db0266c61d..f3bd18d01a 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java @@ -58,6 +58,7 @@ public class DedicatedZkClient implements RealmAwareZkClient { private final String _zkRealmShardingKey; private final String _zkRealmAddress; + // TODO: Remove MetadataStoreRoutingData from constructor public DedicatedZkClient(RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig, RealmAwareZkClient.RealmAwareZkClientConfig clientConfig, MetadataStoreRoutingData metadataStoreRoutingData) { @@ -72,6 +73,7 @@ public DedicatedZkClient(RealmAwareZkClient.RealmAwareZkConnectionConfig connect } _metadataStoreRoutingData = metadataStoreRoutingData; + // TODO: Get it from static map/singleton (HttpRoutingDataReader) // Get the ZkRealm address based on the ZK path sharding key String zkRealmAddress = _metadataStoreRoutingData.getMetadataStoreRealm(_zkRealmShardingKey); if (zkRealmAddress == null || zkRealmAddress.isEmpty()) { @@ -184,7 +186,8 @@ public void createEphemeral(String path, List acl) { @Override public void createEphemeral(String path, List acl, String sessionId) { - create(path, null, acl, CreateMode.EPHEMERAL, sessionId); + checkIfPathContainsShardingKey(path); + _rawZkClient.createEphemeral(path, acl, sessionId); } @Override @@ -194,7 +197,8 @@ public String create(String path, Object data, CreateMode mode) { @Override public String create(String path, Object datat, List acl, CreateMode mode) { - return create(path, datat, acl, mode, null); + checkIfPathContainsShardingKey(path); + return _rawZkClient.create(path, datat, acl, mode); } @Override @@ -204,7 +208,8 @@ public void createEphemeral(String path, Object data) { @Override public void createEphemeral(String path, Object data, String sessionId) { - create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, sessionId); + checkIfPathContainsShardingKey(path); + _rawZkClient.createEphemeral(path, data, sessionId); } @Override @@ -214,7 +219,8 @@ public void createEphemeral(String path, Object data, List acl) { @Override public void createEphemeral(String path, Object data, List acl, String sessionId) { - create(path, data, acl, CreateMode.EPHEMERAL, sessionId); + checkIfPathContainsShardingKey(path); + _rawZkClient.createEphemeral(path, data, acl, sessionId); } @Override @@ -229,14 +235,15 @@ public String createEphemeralSequential(String path, Object data, List acl) @Override public String createEphemeralSequential(String path, Object data, String sessionId) { - return create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, - sessionId); + checkIfPathContainsShardingKey(path); + return _rawZkClient.createEphemeralSequential(path, data, sessionId); } @Override public String createEphemeralSequential(String path, Object data, List acl, String sessionId) { - return create(path, data, acl, CreateMode.EPHEMERAL_SEQUENTIAL, sessionId); + checkIfPathContainsShardingKey(path); + return _rawZkClient.createEphemeralSequential(path, data, acl, sessionId); } @Override @@ -458,21 +465,6 @@ public PathBasedZkSerializer getZkSerializer() { return _rawZkClient.getZkSerializer(); } - /** - * The most comprehensive create() method that checks the path to see if the create request should be authorized to go through. - * @param path - * @param dataObject - * @param acl - * @param mode - * @param expectedSessionId - * @return - */ - private String create(final String path, final Object dataObject, final List acl, - final CreateMode mode, final String expectedSessionId) { - checkIfPathContainsShardingKey(path); - return _rawZkClient.create(path, dataObject, acl, mode, expectedSessionId); - } - /** * Checks whether the given path belongs matches the ZK path sharding key this DedicatedZkClient is designated to at initialization. * @param path diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java index 696d6a5298..1c444f18fa 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java @@ -636,7 +636,7 @@ public String create(final String path, Object datat, final List acl, final * @throws IllegalArgumentException if called from anything else except the ZooKeeper event thread * @throws ZkException if any zookeeper exception occurs */ - public String create(final String path, final Object dataObject, final List acl, + private String create(final String path, final Object dataObject, final List acl, final CreateMode mode, final String expectedSessionId) throws IllegalArgumentException, ZkException { if (path == null) { From 29a16eb34f0a4ec45b65b4cf7acdff8756153c88 Mon Sep 17 00:00:00 2001 From: Hunter Lee Date: Wed, 19 Feb 2020 11:19:27 -0800 Subject: [PATCH 08/14] Add Javadoc --- .../helix/zookeeper/api/factory/RealmAwareZkClientFactory.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/factory/RealmAwareZkClientFactory.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/factory/RealmAwareZkClientFactory.java index 4d3622a9a3..fb583441b7 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/factory/RealmAwareZkClientFactory.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/factory/RealmAwareZkClientFactory.java @@ -34,6 +34,7 @@ public interface RealmAwareZkClientFactory { * @param metadataStoreRoutingData * @return HelixZkClient */ + // TODO: remove MetadataStoreRoutingData RealmAwareZkClient buildZkClient(RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig, RealmAwareZkClient.RealmAwareZkClientConfig clientConfig, MetadataStoreRoutingData metadataStoreRoutingData); @@ -44,6 +45,7 @@ RealmAwareZkClient buildZkClient(RealmAwareZkClient.RealmAwareZkConnectionConfig * @param metadataStoreRoutingData * @return RealmAwareZkClient */ + // TODO: remove MetadataStoreRoutingData RealmAwareZkClient buildZkClient(RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig, MetadataStoreRoutingData metadataStoreRoutingData); } From 9629737e60e5f2f2a710747db082bf8b76dca29b Mon Sep 17 00:00:00 2001 From: Hunter Lee Date: Wed, 19 Feb 2020 19:26:10 -0800 Subject: [PATCH 09/14] address comments --- .../impl/client/DedicatedZkClient.java | 25 ++++--------------- .../helix/zookeeper/zkclient/ZkClient.java | 2 +- .../impl/client/TestDedicatedZkClient.java | 11 ++++++-- .../zookeeper/impl/client/ZkTestBase.java | 2 +- 4 files changed, 16 insertions(+), 24 deletions(-) diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java index f3bd18d01a..8d603b6124 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java @@ -31,7 +31,6 @@ import org.apache.helix.zookeeper.zkclient.ZkConnection; import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks; import org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener; -import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException; import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer; import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer; import org.apache.zookeeper.CreateMode; @@ -255,7 +254,7 @@ public List getChildren(String path) { @Override public int countChildren(String path) { checkIfPathContainsShardingKey(path); - return countChildren(path); + return _rawZkClient.countChildren(path); } @Override @@ -295,15 +294,8 @@ public T readData(String path) { @Override public T readData(String path, boolean returnNullIfPathNotExists) { - T data = null; - try { - return readData(path, null); - } catch (ZkNoNodeException e) { - if (!returnNullIfPathNotExists) { - throw e; - } - } - return data; + checkIfPathContainsShardingKey(path); + return _rawZkClient.readData(path, returnNullIfPathNotExists); } @Override @@ -320,15 +312,8 @@ public T readData(String path, Stat stat, boolean watch) { @Override public T readDataAndStat(String path, Stat stat, boolean returnNullIfPathNotExists) { - T data = null; - try { - data = readData(path, stat); - } catch (ZkNoNodeException e) { - if (!returnNullIfPathNotExists) { - throw e; - } - } - return data; + checkIfPathContainsShardingKey(path); + return _rawZkClient.readDataAndStat(path, stat, returnNullIfPathNotExists); } @Override diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java index 1c444f18fa..0507c3f396 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java @@ -636,7 +636,7 @@ public String create(final String path, Object datat, final List acl, final * @throws IllegalArgumentException if called from anything else except the ZooKeeper event thread * @throws ZkException if any zookeeper exception occurs */ - private String create(final String path, final Object dataObject, final List acl, + private String create(final String path, final Object dataObject, final List acl, final CreateMode mode, final String expectedSessionId) throws IllegalArgumentException, ZkException { if (path == null) { diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestDedicatedZkClient.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestDedicatedZkClient.java index 52885aafbe..9ae44f347d 100644 --- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestDedicatedZkClient.java +++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestDedicatedZkClient.java @@ -31,12 +31,13 @@ import org.apache.helix.zookeeper.impl.factory.MetadataStoreRoutingData; import org.apache.helix.zookeeper.impl.factory.TrieRoutingData; import org.testng.Assert; +import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; public class TestDedicatedZkClient extends ZkTestBase { - private static final String ZK_SHARDING_KEY_PREFIX = "/TEST_SHARDING_KEY_"; + private static final String ZK_SHARDING_KEY_PREFIX = "/TEST_SHARDING_KEY"; private static final String TEST_VALID_PATH = ZK_SHARDING_KEY_PREFIX + "_" + 0 + "/a/b/c"; private static final String TEST_INVALID_PATH = ZK_SHARDING_KEY_PREFIX + "_invalid" + "/a/b/c"; @@ -61,6 +62,13 @@ public void beforeClass() _metadataStoreRoutingData = new TrieRoutingData(RAW_ROUTING_DATA); } + @AfterClass + public void afterClass() { + if (_realmAwareZkClient != null && !_realmAwareZkClient.isClosed()) { + _realmAwareZkClient.close(); + } + } + /** * 1. Create a DedicatedZkClient with a non-existing sharding key (for which there is no valid ZK realm) * -> This should fail with an exception @@ -114,7 +122,6 @@ public void testDedicatedZkClientCreatePersistent() { // Test writing and reading against the invalid path try { _realmAwareZkClient.createPersistent(TEST_INVALID_PATH, true); - _realmAwareZkClient.writeData(TEST_INVALID_PATH, znRecord); Assert.fail("Create() should not succeed on an invalid path!"); } catch (IllegalArgumentException e) { // Okay - expected diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/ZkTestBase.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/ZkTestBase.java index 7027838484..1a75f4a27c 100644 --- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/ZkTestBase.java +++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/ZkTestBase.java @@ -51,7 +51,7 @@ public class ZkTestBase { private static final String NUM_ZK_PROPERTY_KEY = "numZk"; protected static final String ZK_PREFIX = "localhost:"; - protected static final int ZK_START_PORT = 2183; + protected static final int ZK_START_PORT = 2127; /* * Multiple ZK references From ad1d251de83a66072cd9cd87ea2b5c2606a155df Mon Sep 17 00:00:00 2001 From: Hunter Lee Date: Wed, 19 Feb 2020 23:56:41 -0800 Subject: [PATCH 10/14] Update tests --- .../impl/{client => }/ZkTestBase.java | 2 +- .../client/RealmAwareZkClientTestBase.java | 164 ++++++++++++++++++ .../impl/client/TestDedicatedZkClient.java | 136 +-------------- 3 files changed, 170 insertions(+), 132 deletions(-) rename zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/{client => }/ZkTestBase.java (99%) create mode 100644 zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientTestBase.java diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/ZkTestBase.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/ZkTestBase.java similarity index 99% rename from zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/ZkTestBase.java rename to zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/ZkTestBase.java index 1a75f4a27c..10edaf4c80 100644 --- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/ZkTestBase.java +++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/ZkTestBase.java @@ -1,4 +1,4 @@ -package org.apache.helix.zookeeper.impl.client; +package org.apache.helix.zookeeper.impl; /* * Licensed to the Apache Software Foundation (ASF) under one diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientTestBase.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientTestBase.java new file mode 100644 index 0000000000..72c50d623a --- /dev/null +++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientTestBase.java @@ -0,0 +1,164 @@ +package org.apache.helix.zookeeper.impl.client; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; +import org.apache.helix.zookeeper.api.factory.RealmAwareZkClientFactory; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer; +import org.apache.helix.zookeeper.impl.ZkTestBase; +import org.apache.helix.zookeeper.impl.factory.MetadataStoreRoutingData; +import org.apache.helix.zookeeper.impl.factory.TrieRoutingData; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public abstract class RealmAwareZkClientTestBase extends ZkTestBase { + private static final String ZK_SHARDING_KEY_PREFIX = "/TEST_SHARDING_KEY"; + private static final String TEST_VALID_PATH = ZK_SHARDING_KEY_PREFIX + "_" + 0 + "/a/b/c"; + private static final String TEST_INVALID_PATH = ZK_SHARDING_KEY_PREFIX + "_invalid" + "/a/b/c"; + + // Mapping + private static final Map> RAW_ROUTING_DATA = new HashMap<>(); + + // The following RealmAwareZkClientFactory is to be defined in subclasses + protected RealmAwareZkClientFactory _realmAwareZkClientFactory; + private RealmAwareZkClient _realmAwareZkClient; + private MetadataStoreRoutingData _metadataStoreRoutingData; + + @BeforeClass + public void beforeClass() + throws Exception { + // Populate RAW_ROUTING_DATA + for (int i = 0; i < _numZk; i++) { + List shardingKeyList = new ArrayList<>(); + shardingKeyList.add(ZK_SHARDING_KEY_PREFIX + "_" + i); + String realmName = ZK_PREFIX + (ZK_START_PORT + i); + RAW_ROUTING_DATA.put(realmName, shardingKeyList); + } + + // Feed the raw routing data into TrieRoutingData to construct an in-memory representation of routing information + _metadataStoreRoutingData = new TrieRoutingData(RAW_ROUTING_DATA); + } + + @AfterClass + public void afterClass() { + if (_realmAwareZkClient != null && !_realmAwareZkClient.isClosed()) { + _realmAwareZkClient.close(); + } + } + + /** + * 1. Create a RealmAwareZkClient with a non-existing sharding key (for which there is no valid ZK realm) + * -> This should fail with an exception + * 2. Create a RealmAwareZkClient with a valid sharding key + * -> This should pass + */ + @Test + public void testRealmAwareZkClientCreation() { + // Create a RealmAwareZkClient + String invalidShardingKey = "InvalidShardingKey"; + RealmAwareZkClient.RealmAwareZkClientConfig clientConfig = + new RealmAwareZkClient.RealmAwareZkClientConfig(); + + // Create a connection config with the invalid sharding key + RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig = + new RealmAwareZkClient.RealmAwareZkConnectionConfig(invalidShardingKey); + + try { + _realmAwareZkClient = _realmAwareZkClientFactory + .buildZkClient(connectionConfig, clientConfig, _metadataStoreRoutingData); + Assert.fail("Should not succeed with an invalid sharding key!"); + } catch (IllegalArgumentException e) { + // Expected + } + + // Use a valid sharding key this time around + String validShardingKey = ZK_SHARDING_KEY_PREFIX + "_" + 0; // Use TEST_SHARDING_KEY_0 + connectionConfig = new RealmAwareZkClient.RealmAwareZkConnectionConfig(validShardingKey); + _realmAwareZkClient = _realmAwareZkClientFactory + .buildZkClient(connectionConfig, clientConfig, _metadataStoreRoutingData); + } + + /** + * Test the persistent create() call against a valid path and an invalid path. + * Valid path is one that belongs to the realm designated by the sharding key. + * Invalid path is one that does not belong to the realm designated by the sharding key. + */ + @Test(dependsOnMethods = "testRealmAwareZkClientCreation") + public void testRealmAwareZkClientCreatePersistent() { + _realmAwareZkClient.setZkSerializer(new ZNRecordSerializer()); + + // Create a dummy ZNRecord + ZNRecord znRecord = new ZNRecord("DummyRecord"); + znRecord.setSimpleField("Dummy", "Value"); + + // Test writing and reading against the validPath + _realmAwareZkClient.createPersistent(TEST_VALID_PATH, true); + _realmAwareZkClient.writeData(TEST_VALID_PATH, znRecord); + Assert.assertEquals(_realmAwareZkClient.readData(TEST_VALID_PATH), znRecord); + + // Test writing and reading against the invalid path + try { + _realmAwareZkClient.createPersistent(TEST_INVALID_PATH, true); + Assert.fail("Create() should not succeed on an invalid path!"); + } catch (IllegalArgumentException e) { + // Okay - expected + } + } + + /** + * Test that exists() works on valid path and fails on invalid path. + */ + @Test(dependsOnMethods = "testRealmAwareZkClientCreatePersistent") + public void testExists() { + Assert.assertTrue(_realmAwareZkClient.exists(TEST_VALID_PATH)); + + try { + _realmAwareZkClient.exists(TEST_INVALID_PATH); + Assert.fail("Exists() should not succeed on an invalid path!"); + } catch (IllegalArgumentException e) { + // Okay - expected + } + } + + /** + * Test that delete() works on valid path and fails on invalid path. + */ + @Test(dependsOnMethods = "testExists") + public void testDelete() { + try { + _realmAwareZkClient.delete(TEST_INVALID_PATH); + Assert.fail("Exists() should not succeed on an invalid path!"); + } catch (IllegalArgumentException e) { + // Okay - expected + } + + Assert.assertTrue(_realmAwareZkClient.delete(TEST_VALID_PATH)); + Assert.assertFalse(_realmAwareZkClient.exists(TEST_VALID_PATH)); + } +} \ No newline at end of file diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestDedicatedZkClient.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestDedicatedZkClient.java index 9ae44f347d..8cf3f85c11 100644 --- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestDedicatedZkClient.java +++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestDedicatedZkClient.java @@ -19,143 +19,17 @@ * under the License. */ -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; -import org.apache.helix.zookeeper.datamodel.ZNRecord; -import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer; import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory; -import org.apache.helix.zookeeper.impl.factory.MetadataStoreRoutingData; -import org.apache.helix.zookeeper.impl.factory.TrieRoutingData; -import org.testng.Assert; -import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - - -public class TestDedicatedZkClient extends ZkTestBase { - private static final String ZK_SHARDING_KEY_PREFIX = "/TEST_SHARDING_KEY"; - private static final String TEST_VALID_PATH = ZK_SHARDING_KEY_PREFIX + "_" + 0 + "/a/b/c"; - private static final String TEST_INVALID_PATH = ZK_SHARDING_KEY_PREFIX + "_invalid" + "/a/b/c"; - // Mapping - private static final Map> RAW_ROUTING_DATA = new HashMap<>(); - private RealmAwareZkClient _realmAwareZkClient; - private MetadataStoreRoutingData _metadataStoreRoutingData; +public class TestDedicatedZkClient extends RealmAwareZkClientTestBase { @BeforeClass public void beforeClass() throws Exception { - // Populate RAW_ROUTING_DATA - for (int i = 0; i < _numZk; i++) { - List shardingKeyList = new ArrayList<>(); - shardingKeyList.add(ZK_SHARDING_KEY_PREFIX + "_" + i); - String realmName = ZK_PREFIX + (ZK_START_PORT + i); - RAW_ROUTING_DATA.put(realmName, shardingKeyList); - } - - // Feed the raw routing data into TrieRoutingData to construct an in-memory representation of routing information - _metadataStoreRoutingData = new TrieRoutingData(RAW_ROUTING_DATA); - } - - @AfterClass - public void afterClass() { - if (_realmAwareZkClient != null && !_realmAwareZkClient.isClosed()) { - _realmAwareZkClient.close(); - } - } - - /** - * 1. Create a DedicatedZkClient with a non-existing sharding key (for which there is no valid ZK realm) - * -> This should fail with an exception - * 2. Create a DedicatedZkClient with a valid sharding key - * -> This should pass - */ - @Test - public void testDedicatedZkClientCreation() { - // Create a DedicatedZkClient - String invalidShardingKey = "InvalidShardingKey"; - RealmAwareZkClient.RealmAwareZkClientConfig clientConfig = - new RealmAwareZkClient.RealmAwareZkClientConfig(); - - // Create a connection config with the invalid sharding key - RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig = - new RealmAwareZkClient.RealmAwareZkConnectionConfig(invalidShardingKey); - - try { - _realmAwareZkClient = DedicatedZkClientFactory.getInstance() - .buildZkClient(connectionConfig, clientConfig, _metadataStoreRoutingData); - Assert.fail("Should not succeed with an invalid sharding key!"); - } catch (IllegalArgumentException e) { - // Expected - } - - // Use a valid sharding key this time around - String validShardingKey = ZK_SHARDING_KEY_PREFIX + "_" + 0; // Use TEST_SHARDING_KEY_0 - connectionConfig = new RealmAwareZkClient.RealmAwareZkConnectionConfig(validShardingKey); - _realmAwareZkClient = DedicatedZkClientFactory.getInstance() - .buildZkClient(connectionConfig, clientConfig, _metadataStoreRoutingData); - } - - /** - * Test the persistent create() call against a valid path and an invalid path. - * Valid path is one that belongs to the realm designated by the sharding key. - * Invalid path is one that does not belong to the realm designated by the sharding key. - */ - @Test(dependsOnMethods = "testDedicatedZkClientCreation") - public void testDedicatedZkClientCreatePersistent() { - _realmAwareZkClient.setZkSerializer(new ZNRecordSerializer()); - - // Create a dummy ZNRecord - ZNRecord znRecord = new ZNRecord("DummyRecord"); - znRecord.setSimpleField("Dummy", "Value"); - - // Test writing and reading against the validPath - _realmAwareZkClient.createPersistent(TEST_VALID_PATH, true); - _realmAwareZkClient.writeData(TEST_VALID_PATH, znRecord); - Assert.assertEquals(_realmAwareZkClient.readData(TEST_VALID_PATH), znRecord); - - // Test writing and reading against the invalid path - try { - _realmAwareZkClient.createPersistent(TEST_INVALID_PATH, true); - Assert.fail("Create() should not succeed on an invalid path!"); - } catch (IllegalArgumentException e) { - // Okay - expected - } - } - - /** - * Test that exists() works on valid path and fails on invalid path. - */ - @Test(dependsOnMethods = "testDedicatedZkClientCreatePersistent") - public void testExists() { - Assert.assertTrue(_realmAwareZkClient.exists(TEST_VALID_PATH)); - - try { - _realmAwareZkClient.exists(TEST_INVALID_PATH); - Assert.fail("Exists() should not succeed on an invalid path!"); - } catch (IllegalArgumentException e) { - // Okay - expected - } - } - - /** - * Test that delete() works on valid path and fails on invalid path. - */ - @Test(dependsOnMethods = "testExists") - public void testDelete() { - try { - _realmAwareZkClient.delete(TEST_INVALID_PATH); - Assert.fail("Exists() should not succeed on an invalid path!"); - } catch (IllegalArgumentException e) { - // Okay - expected - } - - Assert.assertTrue(_realmAwareZkClient.delete(TEST_VALID_PATH)); - Assert.assertFalse(_realmAwareZkClient.exists(TEST_VALID_PATH)); + super.beforeClass(); + // Set the factory to DedicatedZkClientFactory + _realmAwareZkClientFactory = DedicatedZkClientFactory.getInstance(); } -} \ No newline at end of file +} From 7345f8a3a243ec3bf2019c706c6dbccef92de801 Mon Sep 17 00:00:00 2001 From: Hunter Lee Date: Thu, 20 Feb 2020 13:19:01 -0800 Subject: [PATCH 11/14] Update --- .../api/factory/RealmAwareZkClientFactory.java | 8 ++++++-- .../zookeeper/impl/client/DedicatedZkClient.java | 14 +++++++++++--- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/factory/RealmAwareZkClientFactory.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/factory/RealmAwareZkClientFactory.java index fb583441b7..db470ccf22 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/factory/RealmAwareZkClientFactory.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/factory/RealmAwareZkClientFactory.java @@ -46,6 +46,10 @@ RealmAwareZkClient buildZkClient(RealmAwareZkClient.RealmAwareZkConnectionConfig * @return RealmAwareZkClient */ // TODO: remove MetadataStoreRoutingData - RealmAwareZkClient buildZkClient(RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig, - MetadataStoreRoutingData metadataStoreRoutingData); + default RealmAwareZkClient buildZkClient( + RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig, + MetadataStoreRoutingData metadataStoreRoutingData) { + return buildZkClient(connectionConfig, new RealmAwareZkClient.RealmAwareZkClientConfig(), + metadataStoreRoutingData); + } } diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java index 8d603b6124..5311ffa866 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java @@ -20,6 +20,7 @@ */ import java.util.List; +import java.util.NoSuchElementException; import java.util.concurrent.TimeUnit; import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; @@ -456,10 +457,17 @@ public PathBasedZkSerializer getZkSerializer() { * @return */ private void checkIfPathContainsShardingKey(String path) { - if (!path.startsWith(_zkRealmShardingKey)) { + // TODO: replace with the singleton MetadataStoreRoutingData + try { + String zkRealmForPath = _metadataStoreRoutingData.getMetadataStoreRealm(path); + if (!_zkRealmAddress.equals(zkRealmForPath)) { + throw new IllegalArgumentException("Given path: " + path + "'s ZK realm: " + zkRealmForPath + + " does not match the ZK realm: " + _zkRealmAddress + " and sharding key: " + + _zkRealmShardingKey + " for this DedicatedZkClient!"); + } + } catch (NoSuchElementException e) { throw new IllegalArgumentException( - "Given path: " + path + " does not match the sharding key: " + _zkRealmShardingKey - + " for this DedicatedZkClient!"); + "Given path: " + path + " does not have a valid sharding key!"); } } } From c4535ed69f491816efe4997110cdfc4a26178317 Mon Sep 17 00:00:00 2001 From: Hunter Lee Date: Thu, 20 Feb 2020 13:20:02 -0800 Subject: [PATCH 12/14] asdf --- .../zookeeper/impl/factory/DedicatedZkClientFactory.java | 9 --------- 1 file changed, 9 deletions(-) diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/DedicatedZkClientFactory.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/DedicatedZkClientFactory.java index fd4274e9b7..3e78b0d886 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/DedicatedZkClientFactory.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/DedicatedZkClientFactory.java @@ -41,15 +41,6 @@ public RealmAwareZkClient buildZkClient( return new DedicatedZkClient(connectionConfig, clientConfig, metadataStoreRoutingData); } - @Override - public RealmAwareZkClient buildZkClient( - RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig, - MetadataStoreRoutingData metadataStoreRoutingData) { - // TODO: put a real routing data in as a parameter instead of new HashMap<>(); - return new DedicatedZkClient(connectionConfig, - new RealmAwareZkClient.RealmAwareZkClientConfig(), metadataStoreRoutingData); - } - private static class SingletonHelper { private static final DedicatedZkClientFactory INSTANCE = new DedicatedZkClientFactory(); } From 16940e2c0adcee53f8d1e9dc03b68db65afd8d65 Mon Sep 17 00:00:00 2001 From: Hunter Lee Date: Thu, 20 Feb 2020 13:21:11 -0800 Subject: [PATCH 13/14] Fix' git --- .../helix/zookeeper/impl/factory/DedicatedZkClientFactory.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/DedicatedZkClientFactory.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/DedicatedZkClientFactory.java index 3e78b0d886..c0fd04ddf4 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/DedicatedZkClientFactory.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/DedicatedZkClientFactory.java @@ -50,8 +50,7 @@ public static DedicatedZkClientFactory getInstance() { } /** - * Build a Dedicated ZkClient based on connection config and client config - * + * Build a Dedicated ZkClient based on connection config and client config. * @param connectionConfig * @param clientConfig * @return From f8ed4a63d927f3859a99cef3126c6ae5e9403c9c Mon Sep 17 00:00:00 2001 From: Hunter Lee Date: Thu, 20 Feb 2020 16:13:16 -0800 Subject: [PATCH 14/14] Rebase onto apache zooscalability --- zookeeper-api/pom.xml | 5 + .../factory/RealmAwareZkClientFactory.java | 2 +- .../impl/client/DedicatedZkClient.java | 2 +- .../factory/DedicatedZkClientFactory.java | 1 + .../factory/MetadataStoreRoutingData.java | 52 ---- .../impl/factory/SharedZkClientFactory.java | 1 + .../impl/factory/TrieRoutingData.java | 277 ------------------ .../client/RealmAwareZkClientTestBase.java | 7 +- 8 files changed, 12 insertions(+), 335 deletions(-) delete mode 100644 zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/MetadataStoreRoutingData.java delete mode 100644 zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/TrieRoutingData.java diff --git a/zookeeper-api/pom.xml b/zookeeper-api/pom.xml index 395fe39000..91b448fb61 100644 --- a/zookeeper-api/pom.xml +++ b/zookeeper-api/pom.xml @@ -43,6 +43,11 @@ under the License. metrics-common ${project.version} + + org.apache.helix + metadata-store-directory-common + ${project.version} + org.apache.zookeeper zookeeper diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/factory/RealmAwareZkClientFactory.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/factory/RealmAwareZkClientFactory.java index db470ccf22..8c1f7a3af7 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/factory/RealmAwareZkClientFactory.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/factory/RealmAwareZkClientFactory.java @@ -19,8 +19,8 @@ * under the License. */ +import org.apache.helix.msdcommon.datamodel.MetadataStoreRoutingData; import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; -import org.apache.helix.zookeeper.impl.factory.MetadataStoreRoutingData; /** diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java index 5311ffa866..8352b2b625 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java @@ -23,8 +23,8 @@ import java.util.NoSuchElementException; import java.util.concurrent.TimeUnit; +import org.apache.helix.msdcommon.datamodel.MetadataStoreRoutingData; import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; -import org.apache.helix.zookeeper.impl.factory.MetadataStoreRoutingData; import org.apache.helix.zookeeper.zkclient.DataUpdater; import org.apache.helix.zookeeper.zkclient.IZkChildListener; import org.apache.helix.zookeeper.zkclient.IZkConnection; diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/DedicatedZkClientFactory.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/DedicatedZkClientFactory.java index c0fd04ddf4..6694497797 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/DedicatedZkClientFactory.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/DedicatedZkClientFactory.java @@ -19,6 +19,7 @@ * under the License. */ +import org.apache.helix.msdcommon.datamodel.MetadataStoreRoutingData; import org.apache.helix.zookeeper.api.client.HelixZkClient; import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; import org.apache.helix.zookeeper.impl.client.DedicatedZkClient; diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/MetadataStoreRoutingData.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/MetadataStoreRoutingData.java deleted file mode 100644 index 1b7003e893..0000000000 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/MetadataStoreRoutingData.java +++ /dev/null @@ -1,52 +0,0 @@ -package org.apache.helix.zookeeper.impl.factory; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.util.Map; -import java.util.NoSuchElementException; - - -/** - * TODO: remove when there's a separate module - */ -public interface MetadataStoreRoutingData { - /** - * Given a path, return all the "metadata store sharding key-metadata store realm address" pairs - * where the sharding keys contain the given path. For example, given "/a/b", return {"/a/b/c": - * "realm.address.c.com:1234", "/a/b/d": "realm.address.d.com:1234"} where "a/b/c" and "a/b/d" are - * sharding keys and the urls are realm addresses. If the path is invalid, returns an empty - * mapping. - * @param path - the path where the search is conducted - * @return all "sharding key-realm address" pairs where the sharding keys contain the given - * path if the path is valid; empty mapping otherwise - * @throws IllegalArgumentException - when the path is invalid - */ - Map getAllMappingUnderPath(String path) throws IllegalArgumentException; - - /** - * Given a path, return the realm address corresponding to the sharding key contained in the - * path. If the path doesn't contain a sharding key, throw NoSuchElementException. - * @param path - the path where the search is conducted - * @return the realm address corresponding to the sharding key contained in the path - * @throws IllegalArgumentException - when the path is invalid - * @throws NoSuchElementException - when the path doesn't contain a sharding key - */ - String getMetadataStoreRealm(String path) throws IllegalArgumentException, NoSuchElementException; -} diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/SharedZkClientFactory.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/SharedZkClientFactory.java index 9f22274060..1801614de4 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/SharedZkClientFactory.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/SharedZkClientFactory.java @@ -21,6 +21,7 @@ import java.util.HashMap; +import org.apache.helix.msdcommon.datamodel.MetadataStoreRoutingData; import org.apache.helix.zookeeper.api.client.HelixZkClient; import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; import org.apache.helix.zookeeper.exception.ZkClientException; diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/TrieRoutingData.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/TrieRoutingData.java deleted file mode 100644 index 573e19980c..0000000000 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/TrieRoutingData.java +++ /dev/null @@ -1,277 +0,0 @@ -package org.apache.helix.zookeeper.impl.factory; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.util.ArrayDeque; -import java.util.Collections; -import java.util.Deque; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.NoSuchElementException; - - -/** - * TODO: remove when there's a separate module. - * This is a class that uses a data structure similar to trie to represent metadata store routing - * data. It is not exactly a trie because it in essence stores a mapping (from sharding keys to - * realm addresses) instead of pure text information; also, only the terminal nodes store meaningful - * information (realm addresses). - */ -public class TrieRoutingData implements MetadataStoreRoutingData { - private static final String DELIMITER = "/"; - - private final TrieNode _rootNode; - - public TrieRoutingData(Map> routingData) throws Exception { - if (routingData == null || routingData.isEmpty()) { - throw new Exception("routingData cannot be null or empty"); - } - - if (isRootShardingKey(routingData)) { - Map.Entry> entry = routingData.entrySet().iterator().next(); - _rootNode = new TrieNode(Collections.emptyMap(), "/", true, entry.getKey()); - } else { - _rootNode = new TrieNode(new HashMap<>(), "/", false, ""); - constructTrie(routingData); - } - } - - public Map getAllMappingUnderPath(String path) throws IllegalArgumentException { - if (path.isEmpty() || !path.substring(0, 1).equals(DELIMITER)) { - throw new IllegalArgumentException("Provided path is empty or does not have a leading \"" - + DELIMITER + "\" character: " + path); - } - - TrieNode curNode; - try { - curNode = findTrieNode(path, false); - } catch (NoSuchElementException e) { - return Collections.emptyMap(); - } - - Map resultMap = new HashMap<>(); - Deque nodeStack = new ArrayDeque<>(); - nodeStack.push(curNode); - while (!nodeStack.isEmpty()) { - curNode = nodeStack.pop(); - if (curNode.isShardingKey()) { - resultMap.put(curNode.getPath(), curNode.getRealmAddress()); - } else { - for (TrieNode child : curNode.getChildren().values()) { - nodeStack.push(child); - } - } - } - return resultMap; - } - - public String getMetadataStoreRealm(String path) - throws IllegalArgumentException, NoSuchElementException { - if (path.isEmpty() || !path.substring(0, 1).equals(DELIMITER)) { - throw new IllegalArgumentException("Provided path is empty or does not have a leading \"" - + DELIMITER + "\" character: " + path); - } - - TrieNode leafNode = findTrieNode(path, true); - return leafNode.getRealmAddress(); - } - - /** - * If findLeafAlongPath is false, then starting from the root node, find the trie node that the - * given path is pointing to and return it; raise NoSuchElementException if the path does - * not point to any node. If findLeafAlongPath is true, then starting from the root node, find the - * leaf node along the provided path; raise NoSuchElementException if the path does not - * point to any node or if there is no leaf node along the path. - * @param path - the path where the search is conducted - * @param findLeafAlongPath - whether the search is for a leaf node on the path - * @return the node pointed by the path or a leaf node along the path - * @throws NoSuchElementException - when the path points to nothing or when no leaf node is - * found - */ - private TrieNode findTrieNode(String path, boolean findLeafAlongPath) - throws NoSuchElementException { - if (path.equals(DELIMITER)) { - if (findLeafAlongPath && !_rootNode.isShardingKey()) { - throw new NoSuchElementException("No leaf node found along the path. Path: " + path); - } - return _rootNode; - } - - TrieNode curNode = _rootNode; - if (findLeafAlongPath && curNode.isShardingKey()) { - return curNode; - } - Map curChildren = curNode.getChildren(); - for (String pathSection : path.substring(1).split(DELIMITER, 0)) { - curNode = curChildren.get(pathSection); - if (curNode == null) { - throw new NoSuchElementException( - "The provided path is missing from the trie. Path: " + path); - } - if (findLeafAlongPath && curNode.isShardingKey()) { - return curNode; - } - curChildren = curNode.getChildren(); - } - if (findLeafAlongPath) { - throw new NoSuchElementException("No leaf node found along the path. Path: " + path); - } - return curNode; - } - - /** - * Checks for the edge case when the only sharding key in provided routing data is the delimiter - * or an empty string. When this is the case, the trie is valid and contains only one node, which - * is the root node, and the root node is a leaf node with a realm address associated with it. - * @param routingData - a mapping from "sharding keys" to "realm addresses" to be parsed into a - * trie - * @return whether the edge case is true - */ - private boolean isRootShardingKey(Map> routingData) { - if (routingData.size() == 1) { - for (List shardingKeys : routingData.values()) { - return shardingKeys.size() == 1 && shardingKeys.get(0).equals(DELIMITER); - } - } - - return false; - } - - /** - * Constructs a trie based on the provided routing data. It loops through all sharding keys and - * constructs the trie in a top down manner. - * @param routingData- a mapping from "sharding keys" to "realm addresses" to be parsed into a - * trie - * @throws Exception - when there is an empty sharding key (edge case that - * always renders the routing data invalid); when there is a sharding key which already - * contains a sharding key (invalid); when there is a sharding key that is a part of - * another sharding key (invalid); when a sharding key doesn't have a leading delimiter - */ - private void constructTrie(Map> routingData) - throws Exception { - for (Map.Entry> entry : routingData.entrySet()) { - for (String shardingKey : entry.getValue()) { - // Missing leading delimiter is invalid - if (shardingKey.isEmpty() || !shardingKey.substring(0, 1).equals(DELIMITER)) { - throw new Exception("Sharding key does not have a leading \"" - + DELIMITER + "\" character: " + shardingKey); - } - - // Root can only be a sharding key if it's the only sharding key. Since this method is - // running, the special case has already been checked, therefore it's definitely invalid - if (shardingKey.equals(DELIMITER)) { - throw new Exception( - "There exist other sharding keys. Root cannot be a sharding key."); - } - - // Locate the next delimiter - int nextDelimiterIndex = shardingKey.indexOf(DELIMITER, 1); - int prevDelimiterIndex = 0; - String keySection = shardingKey.substring(prevDelimiterIndex + 1, - nextDelimiterIndex > 0 ? nextDelimiterIndex : shardingKey.length()); - TrieNode curNode = _rootNode; - TrieNode nextNode = curNode.getChildren().get(keySection); - - // If the key section is not the last section yet, go in the loop; if the key section is the - // last section, exit - while (nextDelimiterIndex > 0) { - // If the node is already a leaf node, the current sharding key is invalid; if the node - // doesn't exist, construct a node and continue - if (nextNode != null && nextNode.isShardingKey()) { - throw new Exception(shardingKey + " cannot be a sharding key because " - + shardingKey.substring(0, nextDelimiterIndex) - + " is its parent key and is also a sharding key."); - } else if (nextNode == null) { - nextNode = new TrieNode(new HashMap<>(), shardingKey.substring(0, nextDelimiterIndex), - false, ""); - curNode.addChild(keySection, nextNode); - } - prevDelimiterIndex = nextDelimiterIndex; - nextDelimiterIndex = shardingKey.indexOf(DELIMITER, prevDelimiterIndex + 1); - keySection = shardingKey.substring(prevDelimiterIndex + 1, - nextDelimiterIndex > 0 ? nextDelimiterIndex : shardingKey.length()); - curNode = nextNode; - nextNode = curNode.getChildren().get(keySection); - } - - // If the last node already exists, it's a part of another sharding key, making the current - // sharding key invalid - if (nextNode != null) { - throw new Exception(shardingKey - + " cannot be a sharding key because it is a parent key to another sharding key."); - } - nextNode = new TrieNode(new HashMap<>(), shardingKey, true, entry.getKey()); - curNode.addChild(keySection, nextNode); - } - } - } - - private static class TrieNode { - /** - * This field is a mapping between trie key and children nodes. For example, node "a" has - * children "ab" and "ac", therefore the keys are "b" and "c" respectively. - */ - private Map _children; - /** - * This field states whether the path represented by the node is a sharding key - */ - private final boolean _isShardingKey; - /** - * This field contains the complete path/prefix leading to the current node. For example, the - * name of root node is "/", then the name of its child node - * is "/a", and the name of the child's child node is "/a/b". - */ - private final String _path; - /** - * This field represents the data contained in a node(which represents a path), and is only - * available to the terminal nodes. - */ - private final String _realmAddress; - - TrieNode(Map children, String path, boolean isShardingKey, - String realmAddress) { - _children = children; - _isShardingKey = isShardingKey; - _path = path; - _realmAddress = realmAddress; - } - - public Map getChildren() { - return _children; - } - - public boolean isShardingKey() { - return _isShardingKey; - } - - public String getPath() { - return _path; - } - - public String getRealmAddress() { - return _realmAddress; - } - - public void addChild(String key, TrieNode node) { - _children.put(key, node); - } - } -} diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientTestBase.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientTestBase.java index 72c50d623a..cd74975d3e 100644 --- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientTestBase.java +++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientTestBase.java @@ -24,13 +24,13 @@ import java.util.List; import java.util.Map; +import org.apache.helix.msdcommon.datamodel.MetadataStoreRoutingData; +import org.apache.helix.msdcommon.datamodel.TrieRoutingData; import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; import org.apache.helix.zookeeper.api.factory.RealmAwareZkClientFactory; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer; import org.apache.helix.zookeeper.impl.ZkTestBase; -import org.apache.helix.zookeeper.impl.factory.MetadataStoreRoutingData; -import org.apache.helix.zookeeper.impl.factory.TrieRoutingData; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -51,8 +51,7 @@ public abstract class RealmAwareZkClientTestBase extends ZkTestBase { private MetadataStoreRoutingData _metadataStoreRoutingData; @BeforeClass - public void beforeClass() - throws Exception { + public void beforeClass() throws Exception { // Populate RAW_ROUTING_DATA for (int i = 0; i < _numZk; i++) { List shardingKeyList = new ArrayList<>();