Skip to content

Commit

Permalink
Merge pull request zalando#1257 from zalando/curator-fix-1
Browse files Browse the repository at this point in the history
A lease is not removed on connection loss
  • Loading branch information
adyach committed Jan 26, 2021
2 parents 9b922ba + b2c72c1 commit a108f05
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,31 @@

public class ZooKeeperHolder {

private static final int CURATOR_LOCKS_INSTANCE_LIVE_PERIOD = 1000 * 60 * 5; // 5 minutes
private static final int CURATOR_RETRY_TIME = 1000;
private static final int CURATOR_RETRY_MAX = 3;

private final Integer connectionTimeoutMs;
private final long maxCommitTimeoutMs;
private final Integer sessionTimeoutMs;
private final ZookeeperConnection conn;

private CuratorFramework zooKeeper;
private CuratorFramework subscriptionCurator;
private CuratorFramework locksCurator;

private long locksCuratorCreatedAt;
private final Object curatorLocksLock;

public ZooKeeperHolder(final ZookeeperConnection conn,
final Integer sessionTimeoutMs,
final Integer connectionTimeoutMs,
final NakadiSettings nakadiSettings) throws Exception {
this.conn = conn;
this.connectionTimeoutMs = connectionTimeoutMs;
this.sessionTimeoutMs = sessionTimeoutMs;
this.maxCommitTimeoutMs = TimeUnit.SECONDS.toMillis(nakadiSettings.getMaxCommitTimeout());
this.curatorLocksLock = new Object();

zooKeeper = createCuratorFramework(sessionTimeoutMs, connectionTimeoutMs);
subscriptionCurator = createCuratorFramework((int) maxCommitTimeoutMs, connectionTimeoutMs);
Expand All @@ -41,6 +49,49 @@ public CuratorFramework get() {
return zooKeeper;
}

/**
* During ConnectionLoss event under certain conditions (unknown yet)
* Curator does not clean acquired leases, which makes it impossible
* for clients to acquire a lease.
* New curator lock instance is intended to avoid such issue by
* closing Zookeeper session, which will closure of associated
* ephemeral znodes like leases.
*/
public CuratorFramework getLocksCurator() throws ZookeeperException {
synchronized (curatorLocksLock) {
if (locksCurator == null) {
try {
locksCurator = createCuratorFramework(sessionTimeoutMs, connectionTimeoutMs);
} catch (Exception e) {
throw new ZookeeperException(
"Failed to create curator framework", e);
}

locksCuratorCreatedAt = System.currentTimeMillis();
return locksCurator;
}

if (System.currentTimeMillis() >
locksCuratorCreatedAt +
CURATOR_LOCKS_INSTANCE_LIVE_PERIOD) {
locksCurator.close();
locksCurator = null;
try {
locksCurator = createCuratorFramework(
sessionTimeoutMs,
connectionTimeoutMs);
} catch (Exception e) {
throw new ZookeeperException(
"Failed to create curator framework", e);
}

locksCuratorCreatedAt = System.currentTimeMillis();
}
}

return locksCurator;
}

public CloseableCuratorFramework getSubscriptionCurator(final long sessionTimeoutMs) throws ZookeeperException {
// most of the clients use default max timeout, subscriptionCurator client saves zookeeper resource
if (sessionTimeoutMs == maxCommitTimeoutMs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public AbstractZkSubscriptionClient(
final String loggingPath,
final long zkSessionTimeout) throws ZookeeperException {
this.subscriptionId = subscriptionId;
this.defaultCurator = zooKeeperHolder.get();
this.defaultCurator = zooKeeperHolder.getLocksCurator();
this.closeableCuratorFramework = zooKeeperHolder.getSubscriptionCurator(zkSessionTimeout);
this.closeSubscriptionStream = getSubscriptionPath("/close_subscription_stream");
this.log = LoggerFactory.getLogger(loggingPath + ".zk");
Expand Down
Empty file modified database/nakadi/execute_db_scripts_in_container.sh
100644 → 100755
Empty file.

0 comments on commit a108f05

Please sign in to comment.