From bd5dc8147cfe1bae0531158aa4eedee12fd245bb Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Fri, 3 Apr 2020 15:57:20 -0700 Subject: [PATCH 1/2] [pulsar-broker] support configurable zk-cache expiry time --- .../java/org/apache/pulsar/zookeeper/ZooKeeperCache.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java index fcc122cc4f733..55df0fa2a0ba7 100644 --- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java +++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java @@ -88,10 +88,16 @@ public static interface CacheUpdater { private final OrderedExecutor backgroundExecutor = OrderedExecutor.newBuilder().name("zk-cache-background").numThreads(2).build(); private boolean shouldShutdownExecutor; private final int zkOperationTimeoutSeconds; + private static final int CACHE_EXPIRY_SECONDS = 300; //5 minutes protected AtomicReference zkSession = new AtomicReference(null); public ZooKeeperCache(String cacheName, ZooKeeper zkSession, int zkOperationTimeoutSeconds, OrderedExecutor executor) { + this(cacheName, zkSession, zkOperationTimeoutSeconds, executor, CACHE_EXPIRY_SECONDS); + } + + public ZooKeeperCache(String cacheName, ZooKeeper zkSession, int zkOperationTimeoutSeconds, + OrderedExecutor executor, int cacheExpirySeconds) { checkNotNull(executor); this.zkOperationTimeoutSeconds = zkOperationTimeoutSeconds; this.executor = executor; From 8de4f441d23d3c252e1c98ee20d54d0f2eea14ee Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Fri, 3 Apr 2020 17:04:05 -0700 Subject: [PATCH 2/2] [pulsar-broker] make zk-cache expiry configuration --- conf/broker.conf | 3 +++ conf/discovery.conf | 3 +++ conf/proxy.conf | 3 +++ conf/standalone.conf | 3 +++ conf/websocket.conf | 2 ++ .../apache/pulsar/broker/ServiceConfiguration.java | 5 +++++ .../java/org/apache/pulsar/broker/PulsarService.java | 2 +- .../discovery/service/BrokerDiscoveryProvider.java | 3 ++- .../discovery/service/server/ServiceConfig.java | 11 +++++++++++ .../apache/pulsar/functions/worker/WorkerConfig.java | 5 +++++ .../org/apache/pulsar/functions/worker/Worker.java | 3 ++- .../pulsar/proxy/server/BrokerDiscoveryProvider.java | 3 ++- .../pulsar/proxy/server/ProxyConfiguration.java | 5 +++++ .../org/apache/pulsar/websocket/WebSocketService.java | 3 ++- .../service/WebSocketProxyConfiguration.java | 10 ++++++++++ .../apache/pulsar/zookeeper/GlobalZooKeeperCache.java | 4 ++-- .../org/apache/pulsar/zookeeper/ZooKeeperCache.java | 6 +++--- .../apache/pulsar/zookeeper/ZookeeperCacheTest.java | 2 +- 18 files changed, 65 insertions(+), 11 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 2ded71968ac8c..458949931e2ac 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -64,6 +64,9 @@ zooKeeperSessionTimeoutMillis=30000 # ZooKeeper operation timeout in seconds zooKeeperOperationTimeoutSeconds=30 +# ZooKeeper cache expiry time in seconds +zooKeeperCacheExpirySeconds=300 + # Time to wait for broker graceful shutdown. After this time elapses, the process will be killed brokerShutdownTimeoutMs=60000 diff --git a/conf/discovery.conf b/conf/discovery.conf index 57709dcd43ee6..2cb5cc4af9483 100644 --- a/conf/discovery.conf +++ b/conf/discovery.conf @@ -26,6 +26,9 @@ configurationStoreServers= # ZooKeeper session timeout zookeeperSessionTimeoutMs=30000 +# ZooKeeper cache expiry time in seconds +zooKeeperCacheExpirySeconds=300 + # Port to use to server binary-proto request servicePort=6650 diff --git a/conf/proxy.conf b/conf/proxy.conf index e110ab4ed4f52..b1eb305eb50f8 100644 --- a/conf/proxy.conf +++ b/conf/proxy.conf @@ -41,6 +41,9 @@ functionWorkerWebServiceURLTLS= # ZooKeeper session timeout (in milliseconds) zookeeperSessionTimeoutMs=30000 +# ZooKeeper cache expiry time in seconds +zooKeeperCacheExpirySeconds=300 + ### --- Server --- ### # The port to use for server binary Protobuf requests diff --git a/conf/standalone.conf b/conf/standalone.conf index 9b3b615485da2..400c053894b45 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -54,6 +54,9 @@ zooKeeperSessionTimeoutMillis=30000 # ZooKeeper operation timeout in seconds zooKeeperOperationTimeoutSeconds=30 +# ZooKeeper cache expiry time in seconds +zooKeeperCacheExpirySeconds=300 + # Time to wait for broker graceful shutdown. After this time elapses, the process will be killed brokerShutdownTimeoutMs=60000 diff --git a/conf/websocket.conf b/conf/websocket.conf index 600b138f00545..26473e8b439a4 100644 --- a/conf/websocket.conf +++ b/conf/websocket.conf @@ -24,6 +24,8 @@ configurationStoreServers= # Zookeeper session timeout in milliseconds zooKeeperSessionTimeoutMillis=30000 +# ZooKeeper cache expiry time in seconds +zooKeeperCacheExpirySeconds=300 # Pulsar cluster url to connect to broker (optional if configurationStoreServers present) serviceUrl= diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 239442ecb306f..272b743232c51 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -207,6 +207,11 @@ public class ServiceConfiguration implements PulsarConfiguration { doc = "ZooKeeper operation timeout in seconds" ) private int zooKeeperOperationTimeoutSeconds = 30; + @FieldContext( + category = CATEGORY_SERVER, + doc = "ZooKeeper cache expiry time in seconds" + ) + private int zooKeeperCacheExpirySeconds = 300; @FieldContext( category = CATEGORY_SERVER, dynamic = true, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index cb8583da0bfda..d7e4d19a7d0b0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -630,7 +630,7 @@ protected void startZkCacheService() throws PulsarServerException { this.globalZkCache = new GlobalZooKeeperCache(getZooKeeperClientFactory(), (int) config.getZooKeeperSessionTimeoutMillis(), config.getZooKeeperOperationTimeoutSeconds(), config.getConfigurationStoreServers(), - getOrderedExecutor(), this.cacheExecutor); + getOrderedExecutor(), this.cacheExecutor, config.getZooKeeperCacheExpirySeconds()); try { this.globalZkCache.start(); } catch (IOException e) { diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java index f302a601c8902..414d2ce980b06 100644 --- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java +++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java @@ -74,7 +74,8 @@ public BrokerDiscoveryProvider(ServiceConfig config, ZooKeeperClientFactory zkCl config.getZookeeperSessionTimeoutMs()); globalZkCache = new GlobalZooKeeperCache(zkClientFactory, config.getZookeeperSessionTimeoutMs(), (int) TimeUnit.MILLISECONDS.toSeconds(config.getZookeeperSessionTimeoutMs()), - config.getConfigurationStoreServers(), orderedExecutor, scheduledExecutorScheduler); + config.getConfigurationStoreServers(), orderedExecutor, scheduledExecutorScheduler, + config.getZooKeeperCacheExpirySeconds()); globalZkCache.start(); } catch (Exception e) { LOG.error("Failed to start ZooKeeper {}", e.getMessage(), e); diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java index 17af9e476c6d4..30269e4818ff9 100644 --- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java +++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java @@ -45,6 +45,9 @@ public class ServiceConfig implements PulsarConfiguration { // ZooKeeper session timeout private int zookeeperSessionTimeoutMs = 30_000; + // ZooKeeper cache expiry time in seconds + private int zooKeeperCacheExpirySeconds=300; + // Port to use to server binary-proto request private Optional servicePort = Optional.ofNullable(5000); // Port to use to server binary-proto-tls request @@ -134,6 +137,14 @@ public void setZookeeperSessionTimeoutMs(int zookeeperSessionTimeoutMs) { this.zookeeperSessionTimeoutMs = zookeeperSessionTimeoutMs; } + public int getZooKeeperCacheExpirySeconds() { + return zooKeeperCacheExpirySeconds; + } + + public void setZooKeeperCacheExpirySeconds(int zooKeeperCacheExpirySeconds) { + this.zooKeeperCacheExpirySeconds = zooKeeperCacheExpirySeconds; + } + public Optional getServicePort() { return servicePort; } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java index d5fbc49c12b77..24883dfd31b8f 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java @@ -124,6 +124,11 @@ public class WorkerConfig implements Serializable, PulsarConfiguration { doc = "ZooKeeper operation timeout in seconds" ) private int zooKeeperOperationTimeoutSeconds = 30; + @FieldContext( + category = CATEGORY_WORKER, + doc = "ZooKeeper cache expiry time in seconds" + ) + private int zooKeeperCacheExpirySeconds = 300; @FieldContext( category = CATEGORY_CONNECTORS, doc = "The path to the location to locate builtin connectors" diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java index 0807d0f9a264b..7ace702ba82b9 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java @@ -163,7 +163,8 @@ private AuthorizationService getAuthorizationService() throws PulsarServerExcept (int) workerConfig.getZooKeeperSessionTimeoutMillis(), workerConfig.getZooKeeperOperationTimeoutSeconds(), workerConfig.getConfigurationStoreServers(), - orderedExecutor, cacheExecutor); + orderedExecutor, cacheExecutor, + workerConfig.getZooKeeperOperationTimeoutSeconds()); try { this.globalZkCache.start(); } catch (IOException e) { diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java index b9b68c8b1851e..57a7a026a1bd4 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java @@ -73,7 +73,8 @@ public BrokerDiscoveryProvider(ProxyConfiguration config, ZooKeeperClientFactory config.getZookeeperSessionTimeoutMs()); globalZkCache = new GlobalZooKeeperCache(zkClientFactory, config.getZookeeperSessionTimeoutMs(), (int) TimeUnit.MILLISECONDS.toSeconds(config.getZookeeperSessionTimeoutMs()), - config.getConfigurationStoreServers(), orderedExecutor, scheduledExecutorScheduler); + config.getConfigurationStoreServers(), orderedExecutor, scheduledExecutorScheduler, + config.getZookeeperSessionTimeoutMs()); globalZkCache.start(); } catch (Exception e) { LOG.error("Failed to start ZooKeeper {}", e.getMessage(), e); diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java index 02c4549ec836d..e2f4644810cd4 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java @@ -88,6 +88,11 @@ public class ProxyConfiguration implements PulsarConfiguration { doc = "ZooKeeper session timeout (in milliseconds)" ) private int zookeeperSessionTimeoutMs = 30_000; + @FieldContext( + category = CATEGORY_BROKER_DISCOVERY, + doc = "ZooKeeper cache expiry time in seconds" + ) + private int zooKeeperCacheExpirySeconds = 300; @FieldContext( category = CATEGORY_BROKER_DISCOVERY, diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java index 062af7efb82ed..ef9a71c3a5a2a 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java @@ -105,7 +105,8 @@ public void start() throws PulsarServerException, PulsarClientException, Malform this.globalZkCache = new GlobalZooKeeperCache(getZooKeeperClientFactory(), (int) config.getZooKeeperSessionTimeoutMillis(), (int) TimeUnit.MILLISECONDS.toSeconds(config.getZooKeeperSessionTimeoutMillis()), - config.getConfigurationStoreServers(), this.orderedExecutor, this.executor); + config.getConfigurationStoreServers(), this.orderedExecutor, this.executor, + config.getZooKeeperCacheExpirySeconds()); try { this.globalZkCache.start(); } catch (IOException e) { diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java index 29a08f82069b0..854d31b2f0579 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java @@ -57,6 +57,8 @@ public class WebSocketProxyConfiguration implements PulsarConfiguration { private String configurationStoreServers; // Zookeeper session timeout in milliseconds private long zooKeeperSessionTimeoutMillis = 30000; + // ZooKeeper cache expiry time in seconds + private int zooKeeperCacheExpirySeconds = 300; // Port to use to server HTTP request private Optional webServicePort = Optional.of(8080); @@ -198,6 +200,14 @@ public void setZooKeeperSessionTimeoutMillis(long zooKeeperSessionTimeoutMillis) this.zooKeeperSessionTimeoutMillis = zooKeeperSessionTimeoutMillis; } + public int getZooKeeperCacheExpirySeconds() { + return zooKeeperCacheExpirySeconds; + } + + public void setZooKeeperCacheExpirySeconds(int zooKeeperCacheExpirySeconds) { + this.zooKeeperCacheExpirySeconds = zooKeeperCacheExpirySeconds; + } + public Optional getWebServicePort() { return webServicePort; } diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java index 4c237a4010dc6..9788b52fa9993 100644 --- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java +++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java @@ -52,8 +52,8 @@ public class GlobalZooKeeperCache extends ZooKeeperCache implements Closeable { public GlobalZooKeeperCache(ZooKeeperClientFactory zkClientFactory, int zkSessionTimeoutMillis, int zkOperationTimeoutSeconds, String globalZkConnect, OrderedExecutor orderedExecutor, - ScheduledExecutorService scheduledExecutor) { - super("global-zk", null, zkOperationTimeoutSeconds, orderedExecutor); + ScheduledExecutorService scheduledExecutor, int cacheExpirySeconds) { + super("global-zk", null, zkOperationTimeoutSeconds, orderedExecutor, cacheExpirySeconds); this.zlClientFactory = zkClientFactory; this.zkSessionTimeoutMillis = zkSessionTimeoutMillis; this.globalZkConnect = globalZkConnect; diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java index 55df0fa2a0ba7..2688c10c40fd3 100644 --- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java +++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java @@ -106,17 +106,17 @@ public ZooKeeperCache(String cacheName, ZooKeeper zkSession, int zkOperationTime this.dataCache = Caffeine.newBuilder() .recordStats() - .expireAfterWrite(5, TimeUnit.MINUTES) + .expireAfterWrite(zkOperationTimeoutSeconds, TimeUnit.SECONDS) .buildAsync((key, executor1) -> null); this.childrenCache = Caffeine.newBuilder() .recordStats() - .expireAfterWrite(5, TimeUnit.MINUTES) + .expireAfterWrite(zkOperationTimeoutSeconds, TimeUnit.SECONDS) .buildAsync((key, executor1) -> null); this.existsCache = Caffeine.newBuilder() .recordStats() - .expireAfterWrite(5, TimeUnit.MINUTES) + .expireAfterWrite(zkOperationTimeoutSeconds, TimeUnit.SECONDS) .buildAsync((key, executor1) -> null); CacheMetricsCollector.CAFFEINE.addCache(cacheName + "-data", dataCache); diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java index 0b4100a4d6bb6..6aa7ddd4416e5 100644 --- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java +++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java @@ -299,7 +299,7 @@ public CompletableFuture create(String serverList, SessionType sessio }; GlobalZooKeeperCache zkCacheService = new GlobalZooKeeperCache(zkClientfactory, -1, 30, "", executor, - scheduledExecutor); + scheduledExecutor, 300); zkCacheService.start(); zkClient = (MockZooKeeper) zkCacheService.getZooKeeper(); ZooKeeperDataCache zkCache = new ZooKeeperDataCache(zkCacheService) {