Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CURATOR-351] Support for TTL Nodes #171

Merged
merged 16 commits into from
Apr 25, 2017
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@ public class ThreadUtils
{
private static final Logger log = LoggerFactory.getLogger(ThreadUtils.class);

public static void checkInterrupted(Throwable e)
public static boolean checkInterrupted(Throwable e)
{
if ( e instanceof InterruptedException )
{
Thread.currentThread().interrupt();
return true;
}
return false;
}

public static ExecutorService newSingleThreadExecutor(String processName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,19 @@
*/
package org.apache.curator.framework.api;

public interface CreateBuilder extends
CreateBuilderMain
public interface CreateBuilder extends CreateBuilderMain
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not extend CreateBuilder2 here? Less duplicate code/comments for withTtl

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's because of orSetData()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. That's not a problem.

public interface CreateBuilder extends CreateBuilder2 { CreateBuilder2 orSetData() }

{
CreateBuilderMain orSetData();
/**
* Specify a TTL when mode is {@link org.apache.zookeeper.CreateMode#PERSISTENT_WITH_TTL} or
* {@link org.apache.zookeeper.CreateMode#PERSISTENT_SEQUENTIAL_WITH_TTL}. If
* the znode has not been modified within the given TTL, it will be deleted once it has no
* children. The TTL unit is milliseconds and must be greater than 0 and less than or equal to
* EphemeralType.MAX_TTL.
*
* @param ttl the ttl
* @return this for chaining
*/
CreateBuilderMain withTtl(long ttl);

CreateBuilder2 orSetData();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.curator.framework.api;

public interface CreateBuilder2 extends CreateBuilderMain
{
/**
* Specify a TTL when mode is {@link org.apache.zookeeper.CreateMode#PERSISTENT_WITH_TTL} or
* {@link org.apache.zookeeper.CreateMode#PERSISTENT_SEQUENTIAL_WITH_TTL}. If
* the znode has not been modified within the given TTL, it will be deleted once it has no
* children. The TTL unit is milliseconds and must be greater than 0 and less than or equal to
* EphemeralType.MAX_TTL.
*
* @param ttl the ttl
* @return this for chaining
*/
CreateBuilderMain withTtl(long ttl);
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;

public class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndBytes>, ErrorListenerPathAndBytesable<String>
public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, BackgroundOperation<PathAndBytes>, ErrorListenerPathAndBytesable<String>
{
private final CuratorFrameworkImpl client;
private CreateMode createMode;
Expand All @@ -54,6 +54,7 @@ public class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<Pat
private String protectedId;
private ACLing acling;
private Stat storingStat;
private long ttl;

@VisibleForTesting
boolean failNextCreateForTesting = false;
Expand All @@ -74,9 +75,10 @@ public class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<Pat
setDataIfExists = false;
protectedId = null;
storingStat = null;
ttl = -1;
}

public CreateBuilderImpl(CuratorFrameworkImpl client, CreateMode createMode, Backgrounding backgrounding, boolean createParentsIfNeeded, boolean createParentsAsContainers, boolean doProtected, boolean compress, boolean setDataIfExists, List<ACL> aclList, Stat storingStat)
public CreateBuilderImpl(CuratorFrameworkImpl client, CreateMode createMode, Backgrounding backgrounding, boolean createParentsIfNeeded, boolean createParentsAsContainers, boolean doProtected, boolean compress, boolean setDataIfExists, List<ACL> aclList, Stat storingStat, long ttl)
{
this.client = client;
this.createMode = createMode;
Expand All @@ -89,15 +91,23 @@ public CreateBuilderImpl(CuratorFrameworkImpl client, CreateMode createMode, Bac
protectedId = null;
this.acling = new ACLing(client.getAclProvider(), aclList);
this.storingStat = storingStat;
this.ttl = ttl;
}

@Override
public CreateBuilderMain orSetData()
public CreateBuilder2 orSetData()
{
setDataIfExists = true;
return this;
}

@Override
public CreateBuilderMain withTtl(long ttl)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer withTTL, but that's just personal preference. I'm not sure what the standard for capitalisation of acronyms is.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've started using strict camel case for everything. It's easier to remember. But, it's not a big deal to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to strict camel case. I don't like how it looks either, but I do like that it is more consistent.

{
this.ttl = ttl;
return this;
}

<T> TransactionCreateBuilder<T> asTransactionCreateBuilder(final T context, final CuratorMultiTransactionRecord transaction)
{
return new TransactionCreateBuilder<T>()
Expand Down Expand Up @@ -138,7 +148,7 @@ public T forPath(String path, byte[] data) throws Exception
}

String fixedPath = client.fixForNamespace(path);
transaction.add(Op.create(fixedPath, data, acling.getAclList(path), createMode), OperationType.CREATE, path);
transaction.add(Op.create(fixedPath, data, acling.getAclList(path), createMode, ttl), OperationType.CREATE, path);
return context;
}
};
Expand All @@ -151,7 +161,8 @@ public CreateBackgroundModeStatACLable compressed()
return new CreateBackgroundModeStatACLable()
{
@Override
public CreateBackgroundModeACLable storingStatIn(Stat stat) {
public CreateBackgroundModeACLable storingStatIn(Stat stat)
{
storingStat = stat;
return asCreateBackgroundModeACLable();
}
Expand Down Expand Up @@ -553,80 +564,49 @@ public void performBackgroundOperation(final OperationAndData<PathAndBytes> oper
final OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("CreateBuilderImpl-Background");
final byte[] data = operationAndData.getData().getData();

if(storingStat == null)
{
client.getZooKeeper().create
(
operationAndData.getData().getPath(),
data,
acling.getAclList(operationAndData.getData().getPath()),
createMode,
new AsyncCallback.StringCallback()
{
@Override
public void processResult(int rc, String path, Object ctx, String name)
{
trace.setReturnCode(rc).setRequestBytesLength(data).setPath(path).commit();
client.getZooKeeper().create
(
operationAndData.getData().getPath(),
data,
acling.getAclList(operationAndData.getData().getPath()),
createMode,
new AsyncCallback.Create2Callback() {
@Override
public void processResult(int rc, String path, Object ctx, String name, Stat stat) {
trace.setReturnCode(rc).setRequestBytesLength(data).setPath(path).commit();

if ( (rc == KeeperException.Code.NONODE.intValue()) && createParentsIfNeeded )
{
backgroundCreateParentsThenNode(client, operationAndData, operationAndData.getData().getPath(), backgrounding, createParentsAsContainers);
}
else if ( (rc == KeeperException.Code.NODEEXISTS.intValue()) && setDataIfExists )
{
backgroundSetData(client, operationAndData, operationAndData.getData().getPath(), backgrounding);
}
else
{
sendBackgroundResponse(rc, path, ctx, name, null, operationAndData);
}
if ( (stat != null) && (storingStat != null) )
{
storingStat.setAversion(stat.getAversion());
storingStat.setCtime(stat.getCtime());
storingStat.setCversion(stat.getCversion());
storingStat.setCzxid(stat.getCzxid());
storingStat.setDataLength(stat.getDataLength());
storingStat.setEphemeralOwner(stat.getEphemeralOwner());
storingStat.setMtime(stat.getMtime());
storingStat.setMzxid(stat.getMzxid());
storingStat.setNumChildren(stat.getNumChildren());
storingStat.setPzxid(stat.getPzxid());
storingStat.setVersion(stat.getVersion());
}
},
backgrounding.getContext()
);
}
else
{
client.getZooKeeper().create
(
operationAndData.getData().getPath(),
operationAndData.getData().getData(),
acling.getAclList(operationAndData.getData().getPath()),
createMode,
new AsyncCallback.Create2Callback() {

@Override
public void processResult(int rc, String path, Object ctx, String name, Stat stat) {
trace.commit();

if ( stat != null )
{
storingStat.setAversion(stat.getAversion());
storingStat.setCtime(stat.getCtime());
storingStat.setCversion(stat.getCversion());
storingStat.setCzxid(stat.getCzxid());
storingStat.setDataLength(stat.getDataLength());
storingStat.setEphemeralOwner(stat.getEphemeralOwner());
storingStat.setMtime(stat.getMtime());
storingStat.setMzxid(stat.getMzxid());
storingStat.setNumChildren(stat.getNumChildren());
storingStat.setPzxid(stat.getPzxid());
storingStat.setVersion(stat.getVersion());
}

if ( (rc == KeeperException.Code.NONODE.intValue()) && createParentsIfNeeded )
{
backgroundCreateParentsThenNode(client, operationAndData, operationAndData.getData().getPath(), backgrounding, createParentsAsContainers);
}
else
{
sendBackgroundResponse(rc, path, ctx, name, stat, operationAndData);
}
if ( (rc == KeeperException.Code.NONODE.intValue()) && createParentsIfNeeded )
{
backgroundCreateParentsThenNode(client, operationAndData, operationAndData.getData().getPath(), backgrounding, createParentsAsContainers);
}
},
backgrounding.getContext()
);
}
else if ( (rc == KeeperException.Code.NODEEXISTS.intValue()) && setDataIfExists )
{
backgroundSetData(client, operationAndData, operationAndData.getData().getPath(), backgrounding);
}
else
{
sendBackgroundResponse(rc, path, ctx, name, stat, operationAndData);
}
}
},
backgrounding.getContext(),
ttl
);
}
catch ( Throwable e )
{
Expand Down Expand Up @@ -1072,14 +1052,14 @@ public String call() throws Exception
{
try
{
createdPath = client.getZooKeeper().create(path, data, aclList, createMode, storingStat);
createdPath = client.getZooKeeper().create(path, data, aclList, createMode, storingStat, ttl);
}
catch ( KeeperException.NoNodeException e )
{
if ( createParentsIfNeeded )
{
ZKPaths.mkdirs(client.getZooKeeper(), path, false, client.getAclProvider(), createParentsAsContainers);
createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode, storingStat);
createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode, storingStat, ttl);
}
else
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.curator.framework.imps;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.Timing;
import org.apache.zookeeper.CreateMode;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.util.concurrent.CountDownLatch;

public class TestTtlNodes extends BaseClassForTests
{
@BeforeMethod
@Override
public void setup() throws Exception
{
System.setProperty("znode.container.checkIntervalMs", "1");
super.setup();
}

@AfterMethod
@Override
public void teardown() throws Exception
{
super.teardown();
System.clearProperty("znode.container.checkIntervalMs");
}

@Test
public void testBasic() throws Exception
{
try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
{
client.start();

client.create().withTtl(10).creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_WITH_TTL).forPath("/a/b/c");
Thread.sleep(20);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume that client should get a 'removed' event when the node times out and gets removed by ZK? If so, wouldn't it be less error prone to wait on a latch and checking that the node was deleted within 10 +- some fudge factor rather than just sleeping?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll take a look at that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On reflection, I want to verify that the node is deleted within the TTL so the sleep seems appropriate to me. Right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're probably right, my concern is that I don't know how quick ZK responds to these TTL expiry events. You've only got a 10ms window here for ZK to remove the node. Maybe that's enough.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know exactly how long because I wrote it :D Note at the top of the test class: System.setProperty("znode.container.checkIntervalMs", "1");

Assert.assertNull(client.checkExists().forPath("/a/b/c"));
}
}

@Test
public void testBasicInBackground() throws Exception
{
try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
{
client.start();

final CountDownLatch latch = new CountDownLatch(1);
BackgroundCallback callback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
latch.countDown();
}
};
client.create().withTtl(10).creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_WITH_TTL).inBackground(callback).forPath("/a/b/c");
Assert.assertTrue(new Timing().awaitLatch(latch));
Thread.sleep(20);
Assert.assertNull(client.checkExists().forPath("/a/b/c"));
}
}
}