diff --git a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/web/MetadataStoreCacheLoaderTest.java b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/web/MetadataStoreCacheLoaderTest.java index a0691e34b696d..9a87bfaf188b6 100644 --- a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/web/MetadataStoreCacheLoaderTest.java +++ b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/web/MetadataStoreCacheLoaderTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.discovery.service.web; import static org.apache.pulsar.broker.resources.MetadataStoreCacheLoader.LOADBALANCE_BROKERS_ROOT; +import static org.testng.Assert.assertEquals; import static org.testng.Assert.fail; import java.io.IOException; @@ -34,6 +35,7 @@ import org.apache.pulsar.policies.data.loadbalancer.LoadReport; import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData; import org.apache.zookeeper.KeeperException; +import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -73,19 +75,14 @@ public void testZookeeperCacheLoader() throws InterruptedException, KeeperExcept LoadManagerReport report = i % 2 == 0 ? getSimpleLoadManagerLoadReport(brokers.get(i)) : getModularLoadManagerLoadReport(brokers.get(i)); zkStore.put(LOADBALANCE_BROKERS_ROOT + "/" + brokers.get(i), - ObjectMapperFactory.getThreadLocal().writeValueAsBytes(report), Optional.of(-1L)); + ObjectMapperFactory.getThreadLocal().writeValueAsBytes(report), Optional.of(-1L)) + .join(); } catch (Exception e) { fail("failed while creating broker znodes"); } } - // strategically wait for cache to get sync - for (int i = 0; i < 5; i++) { - if (zkLoader.getAvailableBrokers().size() == 3 || i == 4) { - break; - } - Thread.sleep(1000); - } + Awaitility.await().untilAsserted(() -> assertEquals(zkLoader.getAvailableBrokers().size(), 3)); // 2. get available brokers from ZookeeperCacheLoader List list = zkLoader.getAvailableBrokers(); @@ -100,7 +97,8 @@ public void testZookeeperCacheLoader() throws InterruptedException, KeeperExcept final String newBroker = "broker-4:15000"; LoadManagerReport report = getSimpleLoadManagerLoadReport(newBroker); zkStore.put(LOADBALANCE_BROKERS_ROOT + "/" + newBroker, - ObjectMapperFactory.getThreadLocal().writeValueAsBytes(report), Optional.of(-1L)); + ObjectMapperFactory.getThreadLocal().writeValueAsBytes(report), Optional.of(-1L)) + .join(); brokers.add(newBroker); Thread.sleep(100); // wait for 100 msec: to get cache updated diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java new file mode 100644 index 0000000000000..5cc9ffca4facc --- /dev/null +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java @@ -0,0 +1,1554 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.metadata.impl; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.RateLimiter; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.IOException; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.stats.NullStatsLogger; +import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.util.MathUtils; +import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy; +import org.apache.bookkeeper.zookeeper.RetryPolicy; +import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase; +import org.apache.zookeeper.AddWatchMode; +import org.apache.zookeeper.AsyncCallback.ACLCallback; +import org.apache.zookeeper.AsyncCallback.Children2Callback; +import org.apache.zookeeper.AsyncCallback.ChildrenCallback; +import org.apache.zookeeper.AsyncCallback.DataCallback; +import org.apache.zookeeper.AsyncCallback.MultiCallback; +import org.apache.zookeeper.AsyncCallback.StatCallback; +import org.apache.zookeeper.AsyncCallback.StringCallback; +import org.apache.zookeeper.AsyncCallback.VoidCallback; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Op; +import org.apache.zookeeper.OpResult; +import org.apache.zookeeper.Transaction; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Provide a zookeeper client to handle session expire. + */ +@Slf4j +public class PulsarZooKeeperClient extends ZooKeeper implements Watcher, AutoCloseable { + + private static final int DEFAULT_RETRY_EXECUTOR_THREAD_COUNT = 1; + + // ZooKeeper client connection variables + private final String connectString; + private final int sessionTimeoutMs; + private final boolean allowReadOnlyMode; + + // state for the zookeeper client + private final AtomicReference zk = new AtomicReference(); + private final AtomicBoolean closed = new AtomicBoolean(false); + private final ZooKeeperWatcherBase watcherManager; + + private final ScheduledExecutorService retryExecutor; + private final ExecutorService connectExecutor; + + // rate limiter + private final RateLimiter rateLimiter; + + // retry polices + private final RetryPolicy connectRetryPolicy; + private final RetryPolicy operationRetryPolicy; + + // Stats Logger + private final OpStatsLogger createStats; + private final OpStatsLogger getStats; + private final OpStatsLogger setStats; + private final OpStatsLogger deleteStats; + private final OpStatsLogger getChildrenStats; + private final OpStatsLogger existsStats; + private final OpStatsLogger multiStats; + private final OpStatsLogger getACLStats; + private final OpStatsLogger setACLStats; + private final OpStatsLogger syncStats; + private final OpStatsLogger createClientStats; + + private final Callable clientCreator = new Callable() { + + @Override + public ZooKeeper call() throws Exception { + try { + return ZooWorker.syncCallWithRetries(null, new ZooWorker.ZooCallable() { + + @Override + public ZooKeeper call() throws KeeperException, InterruptedException { + log.info("Reconnecting zookeeper {}.", connectString); + // close the previous one + closeZkHandle(); + ZooKeeper newZk; + try { + newZk = createZooKeeper(); + } catch (IOException ie) { + log.error("Failed to create zookeeper instance to " + connectString, ie); + throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS); + } + waitForConnection(); + zk.set(newZk); + log.info("ZooKeeper session {} is created to {}.", + Long.toHexString(newZk.getSessionId()), connectString); + return newZk; + } + + @Override + public String toString() { + return String.format("ZooKeeper Client Creator (%s)", connectString); + } + + }, connectRetryPolicy, rateLimiter, createClientStats); + } catch (Exception e) { + log.error("Gave up reconnecting to ZooKeeper : ", e); + Runtime.getRuntime().exit(-1); + return null; + } + } + + }; + + @VisibleForTesting + static PulsarZooKeeperClient createConnectedZooKeeperClient( + String connectString, int sessionTimeoutMs, Set childWatchers, + RetryPolicy operationRetryPolicy) + throws KeeperException, InterruptedException, IOException { + return PulsarZooKeeperClient.newBuilder() + .connectString(connectString) + .sessionTimeoutMs(sessionTimeoutMs) + .watchers(childWatchers) + .operationRetryPolicy(operationRetryPolicy) + .build(); + } + + /** + * A builder to build retryable zookeeper client. + */ + public static class Builder { + String connectString = null; + int sessionTimeoutMs = 10000; + Set watchers = null; + RetryPolicy connectRetryPolicy = null; + RetryPolicy operationRetryPolicy = null; + StatsLogger statsLogger = NullStatsLogger.INSTANCE; + int retryExecThreadCount = DEFAULT_RETRY_EXECUTOR_THREAD_COUNT; + double requestRateLimit = 0; + boolean allowReadOnlyMode = false; + + private Builder() {} + + public Builder connectString(String connectString) { + this.connectString = connectString; + return this; + } + + public Builder sessionTimeoutMs(int sessionTimeoutMs) { + this.sessionTimeoutMs = sessionTimeoutMs; + return this; + } + + public Builder watchers(Set watchers) { + this.watchers = watchers; + return this; + } + + public Builder connectRetryPolicy(RetryPolicy retryPolicy) { + this.connectRetryPolicy = retryPolicy; + return this; + } + + public Builder operationRetryPolicy(RetryPolicy retryPolicy) { + this.operationRetryPolicy = retryPolicy; + return this; + } + + public Builder statsLogger(StatsLogger statsLogger) { + this.statsLogger = statsLogger; + return this; + } + + public Builder requestRateLimit(double requestRateLimit) { + this.requestRateLimit = requestRateLimit; + return this; + } + + public Builder retryThreadCount(int numThreads) { + this.retryExecThreadCount = numThreads; + return this; + } + + public Builder allowReadOnlyMode(boolean allowReadOnlyMode) { + this.allowReadOnlyMode = allowReadOnlyMode; + return this; + } + + public PulsarZooKeeperClient build() throws IOException, KeeperException, InterruptedException { + checkNotNull(connectString); + checkArgument(sessionTimeoutMs > 0); + checkNotNull(statsLogger); + checkArgument(retryExecThreadCount > 0); + + if (null == connectRetryPolicy) { + // Session expiry event is received by client only when zk quorum is well established. + // All other connection loss retries happen at zk client library transparently. + // Hence, we don't need to wait before retrying. + connectRetryPolicy = + new BoundExponentialBackoffRetryPolicy(0, 0, Integer.MAX_VALUE); + } + if (null == operationRetryPolicy) { + operationRetryPolicy = + new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, 0); + } + + // Create a watcher manager + StatsLogger watcherStatsLogger = statsLogger.scope("watcher"); + ZooKeeperWatcherBase watcherManager = + null == watchers ? new ZooKeeperWatcherBase(sessionTimeoutMs, watcherStatsLogger) : + new ZooKeeperWatcherBase(sessionTimeoutMs, watchers, watcherStatsLogger); + PulsarZooKeeperClient client = new PulsarZooKeeperClient( + connectString, + sessionTimeoutMs, + watcherManager, + connectRetryPolicy, + operationRetryPolicy, + statsLogger, + retryExecThreadCount, + requestRateLimit, + allowReadOnlyMode + ); + // Wait for connection to be established. + try { + watcherManager.waitForConnection(); + } catch (KeeperException ke) { + client.close(); + throw ke; + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + client.close(); + throw ie; + } + return client; + } + } + + public static Builder newBuilder() { + return new Builder(); + } + + protected PulsarZooKeeperClient(String connectString, + int sessionTimeoutMs, + ZooKeeperWatcherBase watcherManager, + RetryPolicy connectRetryPolicy, + RetryPolicy operationRetryPolicy, + StatsLogger statsLogger, + int retryExecThreadCount, + double rate, + boolean allowReadOnlyMode) throws IOException { + super(connectString, sessionTimeoutMs, watcherManager, allowReadOnlyMode); + this.connectString = connectString; + this.sessionTimeoutMs = sessionTimeoutMs; + this.allowReadOnlyMode = allowReadOnlyMode; + this.watcherManager = watcherManager; + this.connectRetryPolicy = connectRetryPolicy; + this.operationRetryPolicy = operationRetryPolicy; + this.rateLimiter = rate > 0 ? RateLimiter.create(rate) : null; + this.retryExecutor = + Executors.newScheduledThreadPool(retryExecThreadCount, + new ThreadFactoryBuilder().setNameFormat("ZKC-retry-executor-%d").build()); + this.connectExecutor = + Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setNameFormat("ZKC-connect-executor-%d").build()); + // added itself to the watcher + watcherManager.addChildWatcher(this); + + // Stats + StatsLogger scopedStatsLogger = statsLogger.scope("zk"); + createClientStats = scopedStatsLogger.getOpStatsLogger("create_client"); + createStats = scopedStatsLogger.getOpStatsLogger("create"); + getStats = scopedStatsLogger.getOpStatsLogger("get_data"); + setStats = scopedStatsLogger.getOpStatsLogger("set_data"); + deleteStats = scopedStatsLogger.getOpStatsLogger("delete"); + getChildrenStats = scopedStatsLogger.getOpStatsLogger("get_children"); + existsStats = scopedStatsLogger.getOpStatsLogger("exists"); + multiStats = scopedStatsLogger.getOpStatsLogger("multi"); + getACLStats = scopedStatsLogger.getOpStatsLogger("get_acl"); + setACLStats = scopedStatsLogger.getOpStatsLogger("set_acl"); + syncStats = scopedStatsLogger.getOpStatsLogger("sync"); + } + + @Override + public void close() throws InterruptedException { + closed.set(true); + connectExecutor.shutdown(); + retryExecutor.shutdown(); + closeZkHandle(); + } + + private void closeZkHandle() throws InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + super.close(); + } else { + zkHandle.close(); + } + } + + public void waitForConnection() throws KeeperException, InterruptedException { + watcherManager.waitForConnection(); + } + + protected ZooKeeper createZooKeeper() throws IOException { + return new ZooKeeper(connectString, sessionTimeoutMs, watcherManager, allowReadOnlyMode); + } + + @Override + public void process(WatchedEvent event) { + if (event.getType() == EventType.None + && event.getState() == KeeperState.Expired) { + onExpired(); + } + } + + private void onExpired() { + if (closed.get()) { + // we don't schedule any tries if the client is closed. + return; + } + + log.info("ZooKeeper session {} is expired from {}.", + Long.toHexString(getSessionId()), connectString); + try { + connectExecutor.submit(clientCreator); + } catch (RejectedExecutionException ree) { + if (!closed.get()) { + log.error("ZooKeeper reconnect task is rejected : ", ree); + } + } catch (Exception t) { + log.error("Failed to submit zookeeper reconnect task due to runtime exception : ", t); + } + } + + /** + * A runnable that retries zookeeper operations. + */ + abstract static class ZkRetryRunnable implements Runnable { + + final ZooWorker worker; + final RateLimiter rateLimiter; + final Runnable that; + + ZkRetryRunnable(RetryPolicy retryPolicy, + RateLimiter rateLimiter, + OpStatsLogger statsLogger) { + this.worker = new ZooWorker(retryPolicy, statsLogger); + this.rateLimiter = rateLimiter; + that = this; + } + + @Override + public void run() { + if (null != rateLimiter) { + rateLimiter.acquire(); + } + zkRun(); + } + + abstract void zkRun(); + } + + // inherits from ZooKeeper client for all operations + + @Override + public long getSessionId() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return super.getSessionId(); + } + return zkHandle.getSessionId(); + } + + @Override + public byte[] getSessionPasswd() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return super.getSessionPasswd(); + } + return zkHandle.getSessionPasswd(); + } + + @Override + public int getSessionTimeout() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return super.getSessionTimeout(); + } + return zkHandle.getSessionTimeout(); + } + + @Override + public void addAuthInfo(String scheme, byte[] auth) { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + super.addAuthInfo(scheme, auth); + return; + } + zkHandle.addAuthInfo(scheme, auth); + } + + private void backOffAndRetry(Runnable r, long nextRetryWaitTimeMs) { + try { + retryExecutor.schedule(r, nextRetryWaitTimeMs, TimeUnit.MILLISECONDS); + } catch (RejectedExecutionException ree) { + if (!closed.get()) { + log.error("ZooKeeper Operation {} is rejected : ", r, ree); + } + } + } + + private boolean allowRetry(ZooWorker worker, int rc) { + return worker.allowRetry(rc) && !closed.get(); + } + + @Override + public synchronized void register(Watcher watcher) { + watcherManager.addChildWatcher(watcher); + } + + @Override + public List multi(final Iterable ops) throws InterruptedException, KeeperException { + return ZooWorker.syncCallWithRetries(this, new ZooWorker.ZooCallable>() { + + @Override + public String toString() { + return "multi"; + } + + @Override + public List call() throws KeeperException, InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return PulsarZooKeeperClient.super.multi(ops); + } + return zkHandle.multi(ops); + } + + }, operationRetryPolicy, rateLimiter, multiStats); + } + + @Override + public void multi(final Iterable ops, + final MultiCallback cb, + final Object context) { + final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, createStats) { + + final MultiCallback multiCb = new MultiCallback() { + + @Override + public void processResult(int rc, String path, Object ctx, List results) { + ZooWorker worker = (ZooWorker) ctx; + if (allowRetry(worker, rc)) { + backOffAndRetry(that, worker.nextRetryWaitTime()); + } else { + cb.processResult(rc, path, context, results); + } + } + + }; + + @Override + void zkRun() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + PulsarZooKeeperClient.super.multi(ops, multiCb, worker); + } else { + zkHandle.multi(ops, multiCb, worker); + } + } + + @Override + public String toString() { + return "multi"; + } + }; + // execute it immediately + proc.run(); + } + + @Override + @Deprecated + public Transaction transaction() { + // since there is no reference about which client that the transaction could use + // so just use ZooKeeper instance directly. + // you'd better to use {@link #multi}. + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return super.transaction(); + } + return zkHandle.transaction(); + } + + @Override + public List getACL(final String path, final Stat stat) throws KeeperException, InterruptedException { + return ZooWorker.syncCallWithRetries(this, new ZooWorker.ZooCallable>() { + + @Override + public String toString() { + return String.format("getACL (%s, stat = %s)", path, stat); + } + + @Override + public List call() throws KeeperException, InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return PulsarZooKeeperClient.super.getACL(path, stat); + } + return zkHandle.getACL(path, stat); + } + + }, operationRetryPolicy, rateLimiter, getACLStats); + } + + @Override + public void getACL(final String path, final Stat stat, final ACLCallback cb, final Object context) { + final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, getACLStats) { + + final ACLCallback aclCb = new ACLCallback() { + + @Override + public void processResult(int rc, String path, Object ctx, List acl, Stat stat) { + ZooWorker worker = (ZooWorker) ctx; + if (allowRetry(worker, rc)) { + backOffAndRetry(that, worker.nextRetryWaitTime()); + } else { + cb.processResult(rc, path, context, acl, stat); + } + } + + }; + + @Override + public String toString() { + return String.format("getACL (%s, stat = %s)", path, stat); + } + + @Override + void zkRun() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + PulsarZooKeeperClient.super.getACL(path, stat, aclCb, worker); + } else { + zkHandle.getACL(path, stat, aclCb, worker); + } + } + }; + // execute it immediately + proc.run(); + } + + @Override + public Stat setACL(final String path, final List acl, final int version) + throws KeeperException, InterruptedException { + return ZooWorker.syncCallWithRetries(this, new ZooWorker.ZooCallable() { + + @Override + public String toString() { + return String.format("setACL (%s, acl = %s, version = %d)", path, acl, version); + } + + @Override + public Stat call() throws KeeperException, InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return PulsarZooKeeperClient.super.setACL(path, acl, version); + } + return zkHandle.setACL(path, acl, version); + } + + }, operationRetryPolicy, rateLimiter, setACLStats); + } + + @Override + public void setACL(final String path, final List acl, final int version, + final StatCallback cb, final Object context) { + final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, setACLStats) { + + final StatCallback stCb = new StatCallback() { + + @Override + public void processResult(int rc, String path, Object ctx, Stat stat) { + ZooWorker worker = (ZooWorker) ctx; + if (allowRetry(worker, rc)) { + backOffAndRetry(that, worker.nextRetryWaitTime()); + } else { + cb.processResult(rc, path, context, stat); + } + } + + }; + + @Override + public String toString() { + return String.format("setACL (%s, acl = %s, version = %d)", path, acl, version); + } + + @Override + void zkRun() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + PulsarZooKeeperClient.super.setACL(path, acl, version, stCb, worker); + } else { + zkHandle.setACL(path, acl, version, stCb, worker); + } + } + }; + // execute it immediately + proc.run(); + } + + @Override + public void sync(final String path, final VoidCallback cb, final Object context) { + final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, syncStats) { + + final VoidCallback vCb = new VoidCallback() { + + @Override + public void processResult(int rc, String path, Object ctx) { + ZooWorker worker = (ZooWorker) ctx; + if (allowRetry(worker, rc)) { + backOffAndRetry(that, worker.nextRetryWaitTime()); + } else { + cb.processResult(rc, path, context); + } + } + + }; + + @Override + public String toString() { + return String.format("sync (%s)", path); + } + + @Override + void zkRun() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + PulsarZooKeeperClient.super.sync(path, vCb, worker); + } else { + zkHandle.sync(path, vCb, worker); + } + } + }; + // execute it immediately + proc.run(); + } + + @Override + public States getState() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return PulsarZooKeeperClient.super.getState(); + } else { + return zkHandle.getState(); + } + } + + @Override + public String toString() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return PulsarZooKeeperClient.super.toString(); + } else { + return zkHandle.toString(); + } + } + + @Override + public String create(final String path, final byte[] data, + final List acl, final CreateMode createMode) + throws KeeperException, InterruptedException { + return ZooWorker.syncCallWithRetries(this, new ZooWorker.ZooCallable() { + + @Override + public String call() throws KeeperException, InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return PulsarZooKeeperClient.super.create(path, data, acl, createMode); + } + return zkHandle.create(path, data, acl, createMode); + } + + @Override + public String toString() { + return String.format("create (%s, acl = %s, mode = %s)", path, acl, createMode); + } + + }, operationRetryPolicy, rateLimiter, createStats); + } + + @Override + public void create(final String path, final byte[] data, final List acl, + final CreateMode createMode, final StringCallback cb, final Object context) { + final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, createStats) { + + final StringCallback createCb = new StringCallback() { + + @Override + public void processResult(int rc, String path, Object ctx, String name) { + ZooWorker worker = (ZooWorker) ctx; + if (allowRetry(worker, rc)) { + backOffAndRetry(that, worker.nextRetryWaitTime()); + } else { + cb.processResult(rc, path, context, name); + } + } + + }; + + @Override + void zkRun() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + PulsarZooKeeperClient.super.create(path, data, acl, createMode, createCb, worker); + } else { + zkHandle.create(path, data, acl, createMode, createCb, worker); + } + } + + @Override + public String toString() { + return String.format("create (%s, acl = %s, mode = %s)", path, acl, createMode); + } + }; + // execute it immediately + proc.run(); + } + + @Override + public void delete(final String path, final int version) throws KeeperException, InterruptedException { + ZooWorker.syncCallWithRetries(this, new ZooWorker.ZooCallable() { + + @Override + public Void call() throws KeeperException, InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + PulsarZooKeeperClient.super.delete(path, version); + } else { + zkHandle.delete(path, version); + } + return null; + } + + @Override + public String toString() { + return String.format("delete (%s, version = %d)", path, version); + } + + }, operationRetryPolicy, rateLimiter, deleteStats); + } + + @Override + public void delete(final String path, final int version, final VoidCallback cb, final Object context) { + final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, deleteStats) { + + final VoidCallback deleteCb = new VoidCallback() { + + @Override + public void processResult(int rc, String path, Object ctx) { + ZooWorker worker = (ZooWorker) ctx; + if (allowRetry(worker, rc)) { + backOffAndRetry(that, worker.nextRetryWaitTime()); + } else { + cb.processResult(rc, path, context); + } + } + + }; + + @Override + void zkRun() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + PulsarZooKeeperClient.super.delete(path, version, deleteCb, worker); + } else { + zkHandle.delete(path, version, deleteCb, worker); + } + } + + @Override + public String toString() { + return String.format("delete (%s, version = %d)", path, version); + } + }; + // execute it immediately + proc.run(); + } + + @Override + public Stat exists(final String path, final Watcher watcher) throws KeeperException, InterruptedException { + return ZooWorker.syncCallWithRetries(this, new ZooWorker.ZooCallable() { + + @Override + public Stat call() throws KeeperException, InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return PulsarZooKeeperClient.super.exists(path, watcher); + } + return zkHandle.exists(path, watcher); + } + + @Override + public String toString() { + return String.format("exists (%s, watcher = %s)", path, watcher); + } + + }, operationRetryPolicy, rateLimiter, existsStats); + } + + @Override + public Stat exists(final String path, final boolean watch) throws KeeperException, InterruptedException { + return ZooWorker.syncCallWithRetries(this, new ZooWorker.ZooCallable() { + + @Override + public Stat call() throws KeeperException, InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return PulsarZooKeeperClient.super.exists(path, watch); + } + return zkHandle.exists(path, watch); + } + + @Override + public String toString() { + return String.format("exists (%s, watcher = %s)", path, watch); + } + + }, operationRetryPolicy, rateLimiter, existsStats); + } + + @Override + public void exists(final String path, final Watcher watcher, final StatCallback cb, final Object context) { + final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, existsStats) { + + final StatCallback stCb = new StatCallback() { + + @Override + public void processResult(int rc, String path, Object ctx, Stat stat) { + ZooWorker worker = (ZooWorker) ctx; + if (allowRetry(worker, rc)) { + backOffAndRetry(that, worker.nextRetryWaitTime()); + } else { + cb.processResult(rc, path, context, stat); + } + } + + }; + + @Override + void zkRun() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + PulsarZooKeeperClient.super.exists(path, watcher, stCb, worker); + } else { + zkHandle.exists(path, watcher, stCb, worker); + } + } + + @Override + public String toString() { + return String.format("exists (%s, watcher = %s)", path, watcher); + } + }; + // execute it immediately + proc.run(); + } + + @Override + public void exists(final String path, final boolean watch, final StatCallback cb, final Object context) { + final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, existsStats) { + + final StatCallback stCb = new StatCallback() { + + @Override + public void processResult(int rc, String path, Object ctx, Stat stat) { + ZooWorker worker = (ZooWorker) ctx; + if (allowRetry(worker, rc)) { + backOffAndRetry(that, worker.nextRetryWaitTime()); + } else { + cb.processResult(rc, path, context, stat); + } + } + + }; + + @Override + void zkRun() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + PulsarZooKeeperClient.super.exists(path, watch, stCb, worker); + } else { + zkHandle.exists(path, watch, stCb, worker); + } + } + + @Override + public String toString() { + return String.format("exists (%s, watcher = %s)", path, watch); + } + }; + // execute it immediately + proc.run(); + } + + @Override + public byte[] getData(final String path, final Watcher watcher, final Stat stat) + throws KeeperException, InterruptedException { + return ZooWorker.syncCallWithRetries(this, new ZooWorker.ZooCallable() { + + @Override + public byte[] call() throws KeeperException, InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return PulsarZooKeeperClient.super.getData(path, watcher, stat); + } + return zkHandle.getData(path, watcher, stat); + } + + @Override + public String toString() { + return String.format("getData (%s, watcher = %s)", path, watcher); + } + + }, operationRetryPolicy, rateLimiter, getStats); + } + + @Override + public byte[] getData(final String path, final boolean watch, final Stat stat) + throws KeeperException, InterruptedException { + return ZooWorker.syncCallWithRetries(this, new ZooWorker.ZooCallable() { + + @Override + public byte[] call() throws KeeperException, InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return PulsarZooKeeperClient.super.getData(path, watch, stat); + } + return zkHandle.getData(path, watch, stat); + } + + @Override + public String toString() { + return String.format("getData (%s, watcher = %s)", path, watch); + } + + }, operationRetryPolicy, rateLimiter, getStats); + } + + @Override + public void getData(final String path, final Watcher watcher, final DataCallback cb, final Object context) { + final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, getStats) { + + final DataCallback dataCb = new DataCallback() { + + @Override + public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { + ZooWorker worker = (ZooWorker) ctx; + if (allowRetry(worker, rc)) { + backOffAndRetry(that, worker.nextRetryWaitTime()); + } else { + cb.processResult(rc, path, context, data, stat); + } + } + + }; + + @Override + void zkRun() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + PulsarZooKeeperClient.super.getData(path, watcher, dataCb, worker); + } else { + zkHandle.getData(path, watcher, dataCb, worker); + } + } + + @Override + public String toString() { + return String.format("getData (%s, watcher = %s)", path, watcher); + } + }; + // execute it immediately + proc.run(); + } + + @Override + public void getData(final String path, final boolean watch, final DataCallback cb, final Object context) { + final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, getStats) { + + final DataCallback dataCb = new DataCallback() { + + @Override + public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { + ZooWorker worker = (ZooWorker) ctx; + if (allowRetry(worker, rc)) { + backOffAndRetry(that, worker.nextRetryWaitTime()); + } else { + cb.processResult(rc, path, context, data, stat); + } + } + + }; + + @Override + void zkRun() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + PulsarZooKeeperClient.super.getData(path, watch, dataCb, worker); + } else { + zkHandle.getData(path, watch, dataCb, worker); + } + } + + @Override + public String toString() { + return String.format("getData (%s, watcher = %s)", path, watch); + } + }; + // execute it immediately + proc.run(); + } + + @Override + public Stat setData(final String path, final byte[] data, final int version) + throws KeeperException, InterruptedException { + return ZooWorker.syncCallWithRetries(this, new ZooWorker.ZooCallable() { + + @Override + public Stat call() throws KeeperException, InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return PulsarZooKeeperClient.super.setData(path, data, version); + } + return zkHandle.setData(path, data, version); + } + + @Override + public String toString() { + return String.format("setData (%s, version = %d)", path, version); + } + + }, operationRetryPolicy, rateLimiter, setStats); + } + + @Override + public void setData(final String path, final byte[] data, final int version, + final StatCallback cb, final Object context) { + final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, setStats) { + + final StatCallback stCb = new StatCallback() { + + @Override + public void processResult(int rc, String path, Object ctx, Stat stat) { + ZooWorker worker = (ZooWorker) ctx; + if (allowRetry(worker, rc)) { + backOffAndRetry(that, worker.nextRetryWaitTime()); + } else { + cb.processResult(rc, path, context, stat); + } + } + + }; + + @Override + void zkRun() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + PulsarZooKeeperClient.super.setData(path, data, version, stCb, worker); + } else { + zkHandle.setData(path, data, version, stCb, worker); + } + } + + @Override + public String toString() { + return String.format("setData (%s, version = %d)", path, version); + } + }; + // execute it immediately + proc.run(); + } + + @Override + public void addWatch(String basePath, Watcher watcher, AddWatchMode mode) + throws KeeperException, InterruptedException { + ZooWorker.syncCallWithRetries(this, new ZooWorker.ZooCallable() { + + @Override + public Void call() throws KeeperException, InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + PulsarZooKeeperClient.super.addWatch(basePath, watcher, mode); + } else { + zkHandle.addWatch(basePath, watcher, mode); + } + return null; + } + + @Override + public String toString() { + return String.format("addWatch (%s, mode = %s)", basePath, mode); + } + + }, operationRetryPolicy, rateLimiter, setStats); + } + + @Override + public void addWatch(String basePath, Watcher watcher, AddWatchMode mode, VoidCallback cb, Object ctx) { + final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, setStats) { + + final VoidCallback vCb = new VoidCallback() { + + @Override + public void processResult(int rc, String path, Object ctx) { + ZooWorker worker = (ZooWorker) ctx; + if (allowRetry(worker, rc)) { + backOffAndRetry(that, worker.nextRetryWaitTime()); + } else { + vCb.processResult(rc, basePath, ctx); + } + } + + }; + + @Override + void zkRun() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + PulsarZooKeeperClient.super.addWatch(basePath, watcher, mode, cb, ctx); + } else { + zkHandle.addWatch(basePath, watcher, mode, cb, ctx); + } + } + + @Override + public String toString() { + return String.format("setData (%s, mode = %d)", basePath, mode); + } + }; + // execute it immediately + proc.run(); + } + + @Override + public List getChildren(final String path, final Watcher watcher, final Stat stat) + throws KeeperException, InterruptedException { + return ZooWorker.syncCallWithRetries(this, new ZooWorker.ZooCallable>() { + + @Override + public List call() throws KeeperException, InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return PulsarZooKeeperClient.super.getChildren(path, watcher, stat); + } + return zkHandle.getChildren(path, watcher, stat); + } + + @Override + public String toString() { + return String.format("getChildren (%s, watcher = %s)", path, watcher); + } + + }, operationRetryPolicy, rateLimiter, getChildrenStats); + } + + @Override + public List getChildren(final String path, final boolean watch, final Stat stat) + throws KeeperException, InterruptedException { + return ZooWorker.syncCallWithRetries(this, new ZooWorker.ZooCallable>() { + + @Override + public List call() throws KeeperException, InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return PulsarZooKeeperClient.super.getChildren(path, watch, stat); + } + return zkHandle.getChildren(path, watch, stat); + } + + @Override + public String toString() { + return String.format("getChildren (%s, watcher = %s)", path, watch); + } + + }, operationRetryPolicy, rateLimiter, getChildrenStats); + } + + @Override + public void getChildren(final String path, final Watcher watcher, + final Children2Callback cb, final Object context) { + final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, getChildrenStats) { + + final Children2Callback childCb = new Children2Callback() { + + @Override + public void processResult(int rc, String path, Object ctx, + List children, Stat stat) { + ZooWorker worker = (ZooWorker) ctx; + if (allowRetry(worker, rc)) { + backOffAndRetry(that, worker.nextRetryWaitTime()); + } else { + cb.processResult(rc, path, context, children, stat); + } + } + + }; + + @Override + void zkRun() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + PulsarZooKeeperClient.super.getChildren(path, watcher, childCb, worker); + } else { + zkHandle.getChildren(path, watcher, childCb, worker); + } + } + + @Override + public String toString() { + return String.format("getChildren (%s, watcher = %s)", path, watcher); + } + }; + // execute it immediately + proc.run(); + } + + @Override + public void getChildren(final String path, final boolean watch, final Children2Callback cb, + final Object context) { + final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, getChildrenStats) { + + final Children2Callback childCb = new Children2Callback() { + + @Override + public void processResult(int rc, String path, Object ctx, + List children, Stat stat) { + ZooWorker worker = (ZooWorker) ctx; + if (allowRetry(worker, rc)) { + backOffAndRetry(that, worker.nextRetryWaitTime()); + } else { + cb.processResult(rc, path, context, children, stat); + } + } + + }; + + @Override + void zkRun() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + PulsarZooKeeperClient.super.getChildren(path, watch, childCb, worker); + } else { + zkHandle.getChildren(path, watch, childCb, worker); + } + } + + @Override + public String toString() { + return String.format("getChildren (%s, watcher = %s)", path, watch); + } + }; + // execute it immediately + proc.run(); + } + + + @Override + public List getChildren(final String path, final Watcher watcher) + throws KeeperException, InterruptedException { + return ZooWorker.syncCallWithRetries(this, new ZooWorker.ZooCallable>() { + + @Override + public List call() throws KeeperException, InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return PulsarZooKeeperClient.super.getChildren(path, watcher); + } + return zkHandle.getChildren(path, watcher); + } + + @Override + public String toString() { + return String.format("getChildren (%s, watcher = %s)", path, watcher); + } + + }, operationRetryPolicy, rateLimiter, getChildrenStats); + } + + @Override + public List getChildren(final String path, final boolean watch) + throws KeeperException, InterruptedException { + return ZooWorker.syncCallWithRetries(this, new ZooWorker.ZooCallable>() { + + @Override + public List call() throws KeeperException, InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return PulsarZooKeeperClient.super.getChildren(path, watch); + } + return zkHandle.getChildren(path, watch); + } + + @Override + public String toString() { + return String.format("getChildren (%s, watcher = %s)", path, watch); + } + + }, operationRetryPolicy, rateLimiter, getChildrenStats); + } + + @Override + public void getChildren(final String path, final Watcher watcher, + final ChildrenCallback cb, final Object context) { + final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, getChildrenStats) { + + final ChildrenCallback childCb = new ChildrenCallback() { + + @Override + public void processResult(int rc, String path, Object ctx, + List children) { + ZooWorker worker = (ZooWorker) ctx; + if (allowRetry(worker, rc)) { + backOffAndRetry(that, worker.nextRetryWaitTime()); + } else { + cb.processResult(rc, path, context, children); + } + } + + }; + + @Override + void zkRun() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + PulsarZooKeeperClient.super.getChildren(path, watcher, childCb, worker); + } else { + zkHandle.getChildren(path, watcher, childCb, worker); + } + } + + @Override + public String toString() { + return String.format("getChildren (%s, watcher = %s)", path, watcher); + } + }; + // execute it immediately + proc.run(); + } + + @Override + public void getChildren(final String path, final boolean watch, + final ChildrenCallback cb, final Object context) { + final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, getChildrenStats) { + + final ChildrenCallback childCb = new ChildrenCallback() { + + @Override + public void processResult(int rc, String path, Object ctx, + List children) { + ZooWorker worker = (ZooWorker) ctx; + if (allowRetry(worker, rc)) { + backOffAndRetry(that, worker.nextRetryWaitTime()); + } else { + cb.processResult(rc, path, context, children); + } + } + + }; + + @Override + void zkRun() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + PulsarZooKeeperClient.super.getChildren(path, watch, childCb, worker); + } else { + zkHandle.getChildren(path, watch, childCb, worker); + } + } + + @Override + public String toString() { + return String.format("getChildren (%s, watcher = %s)", path, watch); + } + }; + // execute it immediately + proc.run(); + } + + @Slf4j + static final class ZooWorker { + int attempts = 0; + long startTimeNanos; + long elapsedTimeMs = 0L; + final RetryPolicy retryPolicy; + final OpStatsLogger statsLogger; + + ZooWorker(RetryPolicy retryPolicy, OpStatsLogger statsLogger) { + this.retryPolicy = retryPolicy; + this.statsLogger = statsLogger; + this.startTimeNanos = MathUtils.nowInNano(); + } + + public boolean allowRetry(int rc) { + elapsedTimeMs = MathUtils.elapsedMSec(startTimeNanos); + if (!ZooWorker.isRecoverableException(rc)) { + if (KeeperException.Code.OK.intValue() == rc) { + statsLogger.registerSuccessfulEvent(MathUtils.elapsedMicroSec(startTimeNanos), TimeUnit.MICROSECONDS); + } else { + statsLogger.registerFailedEvent(MathUtils.elapsedMicroSec(startTimeNanos), TimeUnit.MICROSECONDS); + } + return false; + } + ++attempts; + return retryPolicy.allowRetry(attempts, elapsedTimeMs); + } + + public long nextRetryWaitTime() { + return retryPolicy.nextRetryWaitTime(attempts, elapsedTimeMs); + } + + /** + * Check whether the given result code is recoverable by retry. + * + * @param rc result code + * @return true if given result code is recoverable. + */ + public static boolean isRecoverableException(int rc) { + return KeeperException.Code.CONNECTIONLOSS.intValue() == rc + || KeeperException.Code.OPERATIONTIMEOUT.intValue() == rc + || KeeperException.Code.SESSIONMOVED.intValue() == rc + || KeeperException.Code.SESSIONEXPIRED.intValue() == rc; + } + + /** + * Check whether the given exception is recoverable by retry. + * + * @param exception given exception + * @return true if given exception is recoverable. + */ + public static boolean isRecoverableException(KeeperException exception) { + return isRecoverableException(exception.code().intValue()); + } + + interface ZooCallable { + /** + * Be compatible with ZooKeeper interface. + * + * @return value + * @throws InterruptedException + * @throws KeeperException + */ + T call() throws InterruptedException, KeeperException; + } + + /** + * Execute a sync zookeeper operation with a given retry policy. + * + * @param client + * ZooKeeper client. + * @param proc + * Synchronous zookeeper operation wrapped in a {@link Callable}. + * @param retryPolicy + * Retry policy to execute the synchronous operation. + * @param rateLimiter + * Rate limiter for zookeeper calls + * @param statsLogger + * Stats Logger for zookeeper client. + * @return result of the zookeeper operation + * @throws KeeperException any non-recoverable exception or recoverable exception exhausted all retires. + * @throws InterruptedException the operation is interrupted. + */ + public static T syncCallWithRetries(PulsarZooKeeperClient client, + ZooWorker.ZooCallable proc, + RetryPolicy retryPolicy, + RateLimiter rateLimiter, + OpStatsLogger statsLogger) + throws KeeperException, InterruptedException { + T result = null; + boolean isDone = false; + int attempts = 0; + long startTimeNanos = MathUtils.nowInNano(); + while (!isDone) { + try { + if (null != client) { + client.waitForConnection(); + } + log.debug("Execute {} at {} retry attempt.", proc, attempts); + if (null != rateLimiter) { + rateLimiter.acquire(); + } + result = proc.call(); + isDone = true; + statsLogger.registerSuccessfulEvent(MathUtils.elapsedMicroSec(startTimeNanos), TimeUnit.MICROSECONDS); + } catch (KeeperException e) { + ++attempts; + boolean rethrow = true; + long elapsedTime = MathUtils.elapsedMSec(startTimeNanos); + if (((null != client && isRecoverableException(e)) || null == client) + && retryPolicy.allowRetry(attempts, elapsedTime)) { + rethrow = false; + } + if (rethrow) { + statsLogger.registerFailedEvent(MathUtils.elapsedMicroSec(startTimeNanos), TimeUnit.MICROSECONDS); + log.debug("Stopped executing {} after {} attempts.", proc, attempts); + throw e; + } + TimeUnit.MILLISECONDS.sleep(retryPolicy.nextRetryWaitTime(attempts, elapsedTime)); + } + } + return result; + } + + } + + +} diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java index e1e44f816c6b5..7f54eab17e501 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java @@ -26,11 +26,11 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.util.ZkUtils; import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy; -import org.apache.bookkeeper.zookeeper.ZooKeeperClient; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.GetResult; import org.apache.pulsar.metadata.api.MetadataStoreConfig; @@ -44,6 +44,8 @@ import org.apache.pulsar.metadata.api.Stat; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; import org.apache.pulsar.metadata.api.extended.CreateOption; +import org.apache.pulsar.metadata.api.extended.SessionEvent; +import org.apache.zookeeper.AddWatchMode; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; @@ -53,7 +55,7 @@ import org.apache.zookeeper.ZooKeeper; @Slf4j -public class ZKMetadataStore extends AbstractMetadataStore implements MetadataStoreExtended, Watcher, MetadataStoreLifecycle { +public class ZKMetadataStore extends AbstractMetadataStore implements MetadataStoreExtended, MetadataStoreLifecycle { private final String metadataURL; private final MetadataStoreConfig metadataStoreConfig; @@ -66,7 +68,7 @@ public ZKMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConf this.metadataURL = metadataURL; this.metadataStoreConfig = metadataStoreConfig; isZkManaged = true; - zkc = ZooKeeperClient.newBuilder().connectString(metadataURL) + zkc = PulsarZooKeeperClient.newBuilder().connectString(metadataURL) .connectRetryPolicy(new BoundExponentialBackoffRetryPolicy(100, 60_000, Integer.MAX_VALUE)) .allowReadOnlyMode(metadataStoreConfig.isAllowReadOnlyOperations()) .sessionTimeoutMs(metadataStoreConfig.getSessionTimeoutMillis()) @@ -76,6 +78,7 @@ public ZKMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConf } })) .build(); + zkc.addWatch("/", this::handleWatchEvent, AddWatchMode.PERSISTENT_RECURSIVE); sessionWatcher = new ZKSessionWatcher(zkc, this::receivedSessionEvent); } catch (Throwable t) { throw new MetadataStoreException(t); @@ -83,12 +86,40 @@ public ZKMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConf } @VisibleForTesting + @SneakyThrows public ZKMetadataStore(ZooKeeper zkc) { this.metadataURL = null; this.metadataStoreConfig = null; this.isZkManaged = false; this.zkc = zkc; this.sessionWatcher = new ZKSessionWatcher(zkc, this::receivedSessionEvent); + zkc.addWatch("/", this::handleWatchEvent, AddWatchMode.PERSISTENT_RECURSIVE); + } + + @Override + protected void receivedSessionEvent(SessionEvent event) { + if (event == SessionEvent.SessionReestablished) { + // Recreate the persistent watch on the new session + zkc.addWatch("/", this::handleWatchEvent, AddWatchMode.PERSISTENT_RECURSIVE, + (rc, path, ctx) -> { + if (rc == Code.OK.intValue()) { + super.receivedSessionEvent(event); + } else { + log.error("Failed to recreate persistent watch on ZooKeeper: {}", Code.get(rc)); + sessionWatcher.setSessionInvalid(); + // On the reconnectable client, mark the session as expired to trigger a new reconnect and + // we will have the chance to set the watch again. + if (zkc instanceof PulsarZooKeeperClient) { + ((PulsarZooKeeperClient) zkc).process( + new WatchedEvent(Watcher.Event.EventType.None, + Watcher.Event.KeeperState.Expired, + null)); + } + } + }, null); + } else { + super.receivedSessionEvent(event); + } } @Override @@ -96,7 +127,7 @@ public CompletableFuture> get(String path) { CompletableFuture> future = new CompletableFuture<>(); try { - zkc.getData(path, this, (rc, path1, ctx, data, stat) -> { + zkc.getData(path, null, (rc, path1, ctx, data, stat) -> { execute(() -> { Code code = Code.get(rc); if (code == Code.OK) { @@ -137,7 +168,7 @@ public CompletableFuture> getChildrenFromStore(String path) { CompletableFuture> future = new CompletableFuture<>(); try { - zkc.getChildren(path, this, (rc, path1, ctx, children) -> { + zkc.getChildren(path, null, (rc, path1, ctx, children) -> { execute(() -> { Code code = Code.get(rc); if (code == Code.OK) { @@ -180,7 +211,7 @@ public CompletableFuture existsFromStore(String path) { CompletableFuture future = new CompletableFuture<>(); try { - zkc.exists(path, this, (rc, path1, ctx, stat) -> { + zkc.exists(path, null, (rc, path1, ctx, stat) -> { execute(() -> { Code code = Code.get(rc); if (code == Code.OK) { @@ -205,7 +236,7 @@ public CompletableFuture put(String path, byte[] value, Optional opt } @Override - public CompletableFuture storePut(String path, byte[] value, Optional optExpectedVersion, + protected CompletableFuture storePut(String path, byte[] value, Optional optExpectedVersion, EnumSet options) { boolean hasVersion = optExpectedVersion.isPresent(); int expectedVersion = optExpectedVersion.orElse(-1L).intValue(); @@ -262,7 +293,7 @@ public CompletableFuture storePut(String path, byte[] value, Optional storeDelete(String path, Optional optExpectedVersion) { + protected CompletableFuture storeDelete(String path, Optional optExpectedVersion) { int expectedVersion = optExpectedVersion.orElse(-1L).intValue(); CompletableFuture future = new CompletableFuture<>(); @@ -315,8 +346,7 @@ private static MetadataStoreException getException(Code code, String path) { } } - @Override - public void process(WatchedEvent event) { + private void handleWatchEvent(WatchedEvent event) { if (log.isDebugEnabled()) { log.debug("Received ZK watch : {}", event); } @@ -326,10 +356,16 @@ public void process(WatchedEvent event) { return; } + String parent = parent(path); + Notification childrenChangedNotification = null; + NotificationType type; switch (event.getType()) { case NodeCreated: type = NotificationType.Created; + if (parent != null) { + childrenChangedNotification = new Notification(NotificationType.ChildrenChanged, parent); + } break; case NodeDataChanged: @@ -342,6 +378,9 @@ public void process(WatchedEvent event) { case NodeDeleted: type = NotificationType.Deleted; + if (parent != null) { + childrenChangedNotification = new Notification(NotificationType.ChildrenChanged, parent); + } break; default: @@ -349,6 +388,9 @@ public void process(WatchedEvent event) { } receivedNotification(new Notification(type, event.getPath())); + if (childrenChangedNotification != null) { + receivedNotification(childrenChangedNotification); + } } private static CreateMode getCreateMode(EnumSet options) { @@ -381,7 +423,7 @@ public CompletableFuture initializeCluster() { if (chrootIndex > 0) { String chrootPath = metadataURL.substring(chrootIndex); String zkConnectForChrootCreation = metadataURL.substring(0, chrootIndex); - try (ZooKeeper chrootZk = ZooKeeperClient.newBuilder() + try (ZooKeeper chrootZk = PulsarZooKeeperClient.newBuilder() .connectString(zkConnectForChrootCreation) .sessionTimeoutMs(metadataStoreConfig.getSessionTimeoutMillis()) .connectRetryPolicy( diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java index c396842c9c32c..9ba6be57b4e71 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java @@ -121,6 +121,10 @@ public synchronized void process(WatchedEvent event) { checkState(event.getState()); } + synchronized void setSessionInvalid() { + currentStatus = SessionEvent.SessionLost; + } + private void checkState(Watcher.Event.KeeperState zkClientState) { switch (zkClientState) { case Expired: diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java index 68f772cc7cd43..53e8292b4fac8 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java @@ -285,11 +285,14 @@ public void insertionOutsideCache(String provider, String url) throws Exception String key1 = newKey(); + assertEquals(objCache.getIfCached(key1), Optional.empty()); + assertEquals(objCache.get(key1).join(), Optional.empty()); + MyClass value1 = new MyClass("a", 1); store.put(key1, ObjectMapperFactory.getThreadLocal().writeValueAsBytes(value1), Optional.of(-1L)).join(); - assertEquals(objCache.getIfCached(key1), Optional.empty()); assertEquals(objCache.get(key1).join(), Optional.of(value1)); + assertEquals(objCache.getIfCached(key1), Optional.of(value1)); } @Test(dataProvider = "impl") diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java index e4a2e9ec5c105..4fdcb595e7787 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java @@ -288,6 +288,7 @@ public void notificationListeners(String provider, String url) throws Exception assertEquals(store.getChildren(key1).join(), Collections.singletonList("xx")); store.delete(key1Child, Optional.empty()).join(); + n = notifications.poll(3, TimeUnit.SECONDS); assertNotNull(n); assertEquals(n.getType(), NotificationType.Deleted); diff --git a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java index b791803d29d3b..9517b24bfe986 100644 --- a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java +++ b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java @@ -42,6 +42,8 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiPredicate; +import lombok.AllArgsConstructor; +import lombok.Data; import org.apache.commons.lang3.tuple.Pair; import org.apache.zookeeper.AsyncCallback.Children2Callback; import org.apache.zookeeper.AsyncCallback.ChildrenCallback; @@ -95,6 +97,16 @@ private class Failure { } } + @Data + @AllArgsConstructor + private static class PersistentWatcher { + final String path; + final Watcher watcher; + final AddWatchMode mode; + } + + private List persistentWatchers; + public static MockZooKeeper newInstance() { return newInstance(null); } @@ -154,6 +166,7 @@ private void init(ExecutorService executor) { stopped = false; alwaysFail = new AtomicReference<>(KeeperException.Code.OK); failures = new CopyOnWriteArrayList<>(); + persistentWatchers = new ArrayList<>(); } @Override @@ -226,6 +239,7 @@ public String create(String path, byte[] data, List acl, CreateMode createM final String finalPath = path; executor.execute(() -> { + triggerPersistentWatches(finalPath, parent, EventType.NodeCreated); toNotifyCreate.forEach( watcher -> watcher.process( @@ -292,6 +306,8 @@ public void create(final String path, final byte[] data, final List acl, Cr mutex.unlock(); cb.processResult(0, path, ctx, name); + triggerPersistentWatches(path, parent, EventType.NodeCreated); + toNotifyCreate.forEach( watcher -> watcher.process( new WatchedEvent(EventType.NodeCreated, @@ -711,6 +727,8 @@ public Stat setData(final String path, byte[] data, int version) throws KeeperEx } executor.execute(() -> { + triggerPersistentWatches(path, null, EventType.NodeDataChanged); + toNotify.forEach(watcher -> watcher .process(new WatchedEvent(EventType.NodeDataChanged, KeeperState.SyncConnected, path))); }); @@ -774,6 +792,8 @@ public void setData(final String path, final byte[] data, int version, final Sta for (Watcher watcher : toNotify) { watcher.process(new WatchedEvent(EventType.NodeDataChanged, KeeperState.SyncConnected, path)); } + + triggerPersistentWatches(path, null, EventType.NodeDataChanged); }); } @@ -829,6 +849,8 @@ public void delete(final String path, int version) throws InterruptedException, for (Watcher watcher2 : toNotifyParent) { watcher2.process(new WatchedEvent(EventType.NodeChildrenChanged, KeeperState.SyncConnected, parent)); } + + triggerPersistentWatches(path, parent, EventType.NodeDeleted); }); } @@ -878,6 +900,7 @@ public void delete(final String path, int version, final VoidCallback cb, final .process(new WatchedEvent(EventType.NodeDeleted, KeeperState.SyncConnected, path))); toNotifyParent.forEach(watcher -> watcher .process(new WatchedEvent(EventType.NodeChildrenChanged, KeeperState.SyncConnected, parent))); + triggerPersistentWatches(path, parent, EventType.NodeDeleted); } }; @@ -920,6 +943,27 @@ public List multi(Iterable ops) throws Interr return res; } + @Override + public synchronized void addWatch(String basePath, Watcher watcher, AddWatchMode mode) { + persistentWatchers.add(new PersistentWatcher(basePath, watcher, mode)); + } + + @Override + public void addWatch(String basePath, Watcher watcher, AddWatchMode mode, VoidCallback cb, Object ctx) { + if (stopped) { + cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), basePath, ctx); + return; + } + + executor.execute(() -> { + synchronized (MockZooKeeper.this) { + persistentWatchers.add(new PersistentWatcher(basePath, watcher, mode)); + } + + cb.processResult(KeeperException.Code.OK.intValue(), basePath, ctx); + }); + } + @Override public void close() throws InterruptedException { } @@ -1002,5 +1046,25 @@ private void checkReadOpDelay() { } } + private void triggerPersistentWatches(String path, String parent, EventType eventType) { + persistentWatchers.forEach(w -> { + if (w.mode == AddWatchMode.PERSISTENT_RECURSIVE) { + if (path.startsWith(w.getPath())) { + w.watcher.process(new WatchedEvent(eventType, KeeperState.SyncConnected, path)); + } + } else if (w.mode == AddWatchMode.PERSISTENT) { + if (w.getPath().equals(path)) { + w.watcher.process(new WatchedEvent(eventType, KeeperState.SyncConnected, path)); + } + + if (eventType == EventType.NodeCreated || eventType == EventType.NodeDeleted) { + // Also notify parent + w.watcher.process( + new WatchedEvent(EventType.NodeChildrenChanged, KeeperState.SyncConnected, parent)); + } + } + }); + } + private static final Logger log = LoggerFactory.getLogger(MockZooKeeper.class); } diff --git a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeperSession.java b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeperSession.java index 4fdc5c73df1f0..499da0e345d44 100644 --- a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeperSession.java +++ b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeperSession.java @@ -179,6 +179,27 @@ public List multi(Iterable ops) throws Interr return mockZooKeeper.multi(ops); } + @Override + public void addWatch(String basePath, Watcher watcher, AddWatchMode mode, VoidCallback cb, Object ctx) { + mockZooKeeper.addWatch(basePath, watcher, mode, cb, ctx); + } + + @Override + public void addWatch(String basePath, Watcher watcher, AddWatchMode mode) + throws KeeperException, InterruptedException { + mockZooKeeper.addWatch(basePath, watcher, mode); + } + + @Override + public void addWatch(String basePath, AddWatchMode mode) throws KeeperException, InterruptedException { + mockZooKeeper.addWatch(basePath, mode); + } + + @Override + public void addWatch(String basePath, AddWatchMode mode, VoidCallback cb, Object ctx) { + mockZooKeeper.addWatch(basePath, mode, cb, ctx); + } + @Override public void close() throws InterruptedException { mockZooKeeper.close();