Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pulsar-broker] support configurable zk-cache expiry time #6668

Merged
merged 3 commits into from
Apr 13, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ zooKeeperSessionTimeoutMillis=30000
# ZooKeeper operation timeout in seconds
zooKeeperOperationTimeoutSeconds=30

# ZooKeeper cache expiry time in seconds
zooKeeperCacheExpirySeconds=300

# Time to wait for broker graceful shutdown. After this time elapses, the process will be killed
brokerShutdownTimeoutMs=60000

Expand Down
3 changes: 3 additions & 0 deletions conf/discovery.conf
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ configurationStoreServers=
# ZooKeeper session timeout
zookeeperSessionTimeoutMs=30000

# ZooKeeper cache expiry time in seconds
zooKeeperCacheExpirySeconds=300

# Port to use to server binary-proto request
servicePort=6650

Expand Down
3 changes: 3 additions & 0 deletions conf/proxy.conf
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ functionWorkerWebServiceURLTLS=
# ZooKeeper session timeout (in milliseconds)
zookeeperSessionTimeoutMs=30000

# ZooKeeper cache expiry time in seconds
zooKeeperCacheExpirySeconds=300

### --- Server --- ###

# The port to use for server binary Protobuf requests
Expand Down
3 changes: 3 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ zooKeeperSessionTimeoutMillis=30000
# ZooKeeper operation timeout in seconds
zooKeeperOperationTimeoutSeconds=30

# ZooKeeper cache expiry time in seconds
zooKeeperCacheExpirySeconds=300

# Time to wait for broker graceful shutdown. After this time elapses, the process will be killed
brokerShutdownTimeoutMs=60000

Expand Down
2 changes: 2 additions & 0 deletions conf/websocket.conf
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ configurationStoreServers=

# Zookeeper session timeout in milliseconds
zooKeeperSessionTimeoutMillis=30000
# ZooKeeper cache expiry time in seconds
zooKeeperCacheExpirySeconds=300

# Pulsar cluster url to connect to broker (optional if configurationStoreServers present)
serviceUrl=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "ZooKeeper operation timeout in seconds"
)
private int zooKeeperOperationTimeoutSeconds = 30;
@FieldContext(
category = CATEGORY_SERVER,
doc = "ZooKeeper cache expiry time in seconds"
)
private int zooKeeperCacheExpirySeconds = 300;
@FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ protected void startZkCacheService() throws PulsarServerException {
this.globalZkCache = new GlobalZooKeeperCache(getZooKeeperClientFactory(),
(int) config.getZooKeeperSessionTimeoutMillis(),
config.getZooKeeperOperationTimeoutSeconds(), config.getConfigurationStoreServers(),
getOrderedExecutor(), this.cacheExecutor);
getOrderedExecutor(), this.cacheExecutor, config.getZooKeeperCacheExpirySeconds());
try {
this.globalZkCache.start();
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ public BrokerDiscoveryProvider(ServiceConfig config, ZooKeeperClientFactory zkCl
config.getZookeeperSessionTimeoutMs());
globalZkCache = new GlobalZooKeeperCache(zkClientFactory, config.getZookeeperSessionTimeoutMs(),
(int) TimeUnit.MILLISECONDS.toSeconds(config.getZookeeperSessionTimeoutMs()),
config.getConfigurationStoreServers(), orderedExecutor, scheduledExecutorScheduler);
config.getConfigurationStoreServers(), orderedExecutor, scheduledExecutorScheduler,
config.getZooKeeperCacheExpirySeconds());
globalZkCache.start();
} catch (Exception e) {
LOG.error("Failed to start ZooKeeper {}", e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ public class ServiceConfig implements PulsarConfiguration {
// ZooKeeper session timeout
private int zookeeperSessionTimeoutMs = 30_000;

// ZooKeeper cache expiry time in seconds
private int zooKeeperCacheExpirySeconds=300;

// Port to use to server binary-proto request
private Optional<Integer> servicePort = Optional.ofNullable(5000);
// Port to use to server binary-proto-tls request
Expand Down Expand Up @@ -134,6 +137,14 @@ public void setZookeeperSessionTimeoutMs(int zookeeperSessionTimeoutMs) {
this.zookeeperSessionTimeoutMs = zookeeperSessionTimeoutMs;
}

public int getZooKeeperCacheExpirySeconds() {
return zooKeeperCacheExpirySeconds;
}

public void setZooKeeperCacheExpirySeconds(int zooKeeperCacheExpirySeconds) {
this.zooKeeperCacheExpirySeconds = zooKeeperCacheExpirySeconds;
}

public Optional<Integer> getServicePort() {
return servicePort;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
doc = "ZooKeeper operation timeout in seconds"
)
private int zooKeeperOperationTimeoutSeconds = 30;
@FieldContext(
category = CATEGORY_WORKER,
doc = "ZooKeeper cache expiry time in seconds"
)
private int zooKeeperCacheExpirySeconds = 300;
@FieldContext(
category = CATEGORY_CONNECTORS,
doc = "The path to the location to locate builtin connectors"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ private AuthorizationService getAuthorizationService() throws PulsarServerExcept
(int) workerConfig.getZooKeeperSessionTimeoutMillis(),
workerConfig.getZooKeeperOperationTimeoutSeconds(),
workerConfig.getConfigurationStoreServers(),
orderedExecutor, cacheExecutor);
orderedExecutor, cacheExecutor,
workerConfig.getZooKeeperOperationTimeoutSeconds());
try {
this.globalZkCache.start();
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public BrokerDiscoveryProvider(ProxyConfiguration config, ZooKeeperClientFactory
config.getZookeeperSessionTimeoutMs());
globalZkCache = new GlobalZooKeeperCache(zkClientFactory, config.getZookeeperSessionTimeoutMs(),
(int) TimeUnit.MILLISECONDS.toSeconds(config.getZookeeperSessionTimeoutMs()),
config.getConfigurationStoreServers(), orderedExecutor, scheduledExecutorScheduler);
config.getConfigurationStoreServers(), orderedExecutor, scheduledExecutorScheduler,
config.getZookeeperSessionTimeoutMs());
globalZkCache.start();
} catch (Exception e) {
LOG.error("Failed to start ZooKeeper {}", e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ public class ProxyConfiguration implements PulsarConfiguration {
doc = "ZooKeeper session timeout (in milliseconds)"
)
private int zookeeperSessionTimeoutMs = 30_000;
@FieldContext(
category = CATEGORY_BROKER_DISCOVERY,
doc = "ZooKeeper cache expiry time in seconds"
)
private int zooKeeperCacheExpirySeconds = 300;

@FieldContext(
category = CATEGORY_BROKER_DISCOVERY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ public void start() throws PulsarServerException, PulsarClientException, Malform
this.globalZkCache = new GlobalZooKeeperCache(getZooKeeperClientFactory(),
(int) config.getZooKeeperSessionTimeoutMillis(),
(int) TimeUnit.MILLISECONDS.toSeconds(config.getZooKeeperSessionTimeoutMillis()),
config.getConfigurationStoreServers(), this.orderedExecutor, this.executor);
config.getConfigurationStoreServers(), this.orderedExecutor, this.executor,
config.getZooKeeperCacheExpirySeconds());
try {
this.globalZkCache.start();
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ public class WebSocketProxyConfiguration implements PulsarConfiguration {
private String configurationStoreServers;
// Zookeeper session timeout in milliseconds
private long zooKeeperSessionTimeoutMillis = 30000;
// ZooKeeper cache expiry time in seconds
private int zooKeeperCacheExpirySeconds = 300;

// Port to use to server HTTP request
private Optional<Integer> webServicePort = Optional.of(8080);
Expand Down Expand Up @@ -198,6 +200,14 @@ public void setZooKeeperSessionTimeoutMillis(long zooKeeperSessionTimeoutMillis)
this.zooKeeperSessionTimeoutMillis = zooKeeperSessionTimeoutMillis;
}

public int getZooKeeperCacheExpirySeconds() {
return zooKeeperCacheExpirySeconds;
}

public void setZooKeeperCacheExpirySeconds(int zooKeeperCacheExpirySeconds) {
this.zooKeeperCacheExpirySeconds = zooKeeperCacheExpirySeconds;
}

public Optional<Integer> getWebServicePort() {
return webServicePort;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ public class GlobalZooKeeperCache extends ZooKeeperCache implements Closeable {

public GlobalZooKeeperCache(ZooKeeperClientFactory zkClientFactory, int zkSessionTimeoutMillis,
int zkOperationTimeoutSeconds, String globalZkConnect, OrderedExecutor orderedExecutor,
ScheduledExecutorService scheduledExecutor) {
super("global-zk", null, zkOperationTimeoutSeconds, orderedExecutor);
ScheduledExecutorService scheduledExecutor, int cacheExpirySeconds) {
super("global-zk", null, zkOperationTimeoutSeconds, orderedExecutor, cacheExpirySeconds);
this.zlClientFactory = zkClientFactory;
this.zkSessionTimeoutMillis = zkSessionTimeoutMillis;
this.globalZkConnect = globalZkConnect;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,16 @@ public static interface CacheUpdater<T> {
private final OrderedExecutor backgroundExecutor = OrderedExecutor.newBuilder().name("zk-cache-background").numThreads(2).build();
private boolean shouldShutdownExecutor;
private final int zkOperationTimeoutSeconds;
private static final int CACHE_EXPIRY_SECONDS = 300; //5 minutes

protected AtomicReference<ZooKeeper> zkSession = new AtomicReference<ZooKeeper>(null);

public ZooKeeperCache(String cacheName, ZooKeeper zkSession, int zkOperationTimeoutSeconds, OrderedExecutor executor) {
this(cacheName, zkSession, zkOperationTimeoutSeconds, executor, CACHE_EXPIRY_SECONDS);
}

public ZooKeeperCache(String cacheName, ZooKeeper zkSession, int zkOperationTimeoutSeconds,
OrderedExecutor executor, int cacheExpirySeconds) {
checkNotNull(executor);
this.zkOperationTimeoutSeconds = zkOperationTimeoutSeconds;
this.executor = executor;
Expand All @@ -100,17 +106,17 @@ public ZooKeeperCache(String cacheName, ZooKeeper zkSession, int zkOperationTime

this.dataCache = Caffeine.newBuilder()
.recordStats()
.expireAfterWrite(5, TimeUnit.MINUTES)
.expireAfterWrite(zkOperationTimeoutSeconds, TimeUnit.SECONDS)
.buildAsync((key, executor1) -> null);

this.childrenCache = Caffeine.newBuilder()
.recordStats()
.expireAfterWrite(5, TimeUnit.MINUTES)
.expireAfterWrite(zkOperationTimeoutSeconds, TimeUnit.SECONDS)
.buildAsync((key, executor1) -> null);

this.existsCache = Caffeine.newBuilder()
.recordStats()
.expireAfterWrite(5, TimeUnit.MINUTES)
.expireAfterWrite(zkOperationTimeoutSeconds, TimeUnit.SECONDS)
.buildAsync((key, executor1) -> null);

CacheMetricsCollector.CAFFEINE.addCache(cacheName + "-data", dataCache);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ public CompletableFuture<ZooKeeper> create(String serverList, SessionType sessio
};

GlobalZooKeeperCache zkCacheService = new GlobalZooKeeperCache(zkClientfactory, -1, 30, "", executor,
scheduledExecutor);
scheduledExecutor, 300);
zkCacheService.start();
zkClient = (MockZooKeeper) zkCacheService.getZooKeeper();
ZooKeeperDataCache<String> zkCache = new ZooKeeperDataCache<String>(zkCacheService) {
Expand Down