Skip to content
Permalink
Browse files
CURATOR-584: Added fault tolerant idempotent Create, SetData and Dele…
…te operations
  • Loading branch information
Josh Slocum authored and eolivelli committed Feb 18, 2021
1 parent 39ed945 commit d5666ab9ca22f45b905d6ffbf2b33fa30b38e15e
Showing 15 changed files with 1,451 additions and 174 deletions.
@@ -62,6 +62,16 @@ public static String createEphemeralSequential(CuratorFramework client, Strin
return client.create().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, payload);
}

public static void createIdempotent(CuratorFramework client, String path, byte[] payload) throws Exception
{
/*
* This will create the given ZNode with the given data idempotently, meaning that if the initial create
* failed transiently, it will be retried and behave as if the first create never happened, even if the
* first create actually succeeded on the server but the client didn't know it.
*/
client.create().idempotent().forPath(path, payload);
}

public static void setData(CuratorFramework client, String path, byte[] payload) throws Exception
{
// set data for the given node
@@ -92,6 +102,22 @@ public static void setDataAsyncWithCallback(CuratorFramework client, Backgr
client.setData().inBackground(callback).forPath(path, payload);
}

public static void setDataIdempotent(CuratorFramework client, String path, byte[] payload, int currentVersion) throws Exception
{
/*
* This will set the given ZNode with the given data idempotently, meaning that if the initial setData
* failed transiently, it will be retried and behave as if the first setData never happened, even if the
* first setData actually succeeded on the server but the client didn't know it.
* In other words, if currentVersion == X and payload = P, this will return success if the znode ends
* up in the state (version == X+1 && data == P).
* If withVersion is not specified, it will end up with success so long as the data == P, no matter the znode version.
*/
client.setData().idempotent().withVersion(currentVersion).forPath(path, payload);
client.setData().idempotent().forPath(path, payload);
}



public static void delete(CuratorFramework client, String path) throws Exception
{
// delete the given node
@@ -119,6 +145,24 @@ public static void guaranteedDelete(CuratorFramework client, String path) t
client.delete().guaranteed().forPath(path);
}

public static void deleteIdempotent(CuratorFramework client, String path, int currentVersion) throws Exception
{
/*
* This will delete the given ZNode with the given data idempotently, meaning that if the initial delete
* failed transiently, it will be retried and behave as if the first delete never happened, even if the
* first delete actually succeeded on the server but the client didn't know it.
* In other words, if currentVersion == X, this will return success if the znode ends up deleted, and will retry after
* connection loss if the version the znode's version is still X.
* If withVersion is not specified, it will end up successful so long as the node is deleted eventually.
* Kind of like guaranteed but not in the background.
* For deletes this is equivalent to the older quietly() behavior, but it is also provided under idempotent() for compatibility with Create/SetData.
*/
client.delete().idempotent().withVersion(currentVersion).forPath(path);
client.delete().idempotent().forPath(path);
client.delete().quietly().withVersion(currentVersion).forPath(path);
client.delete().quietly().forPath(path);
}

public static List<String> watchedGetChildren(CuratorFramework client, String path) throws Exception
{
/**
@@ -18,7 +18,7 @@
*/
package org.apache.curator.framework.api;

public interface CreateBuilder extends CreateBuilderMain
public interface CreateBuilder extends CreateBuilderMain, Idempotentable<CreateBuilder2>
{
/**
* Specify a TTL when mode is {@link org.apache.zookeeper.CreateMode#PERSISTENT_WITH_TTL} or
@@ -20,6 +20,7 @@

public interface DeleteBuilder extends
Quietly<DeleteBuilderMain>,
DeleteBuilderMain
DeleteBuilderMain,
Idempotentable<DeleteBuilderMain>
{
}
@@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.curator.framework.api;

public interface Idempotentable<T>
{
/**
* If the first try of this operation gets a transient error, curator will retry
* the operation, and treat it as successful so long as the end state of the znode
* is the same as if the operation had completed without error on the first try.
* @return this:
*/
public T idempotent();
}
@@ -23,6 +23,7 @@
public interface SetDataBuilder extends
BackgroundPathAndBytesable<Stat>,
Versionable<BackgroundPathAndBytesable<Stat>>,
Compressible<SetDataBackgroundVersionable>
Compressible<SetDataBackgroundVersionable>,
Idempotentable<SetDataBuilder>
{
}
@@ -57,12 +57,17 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro
private boolean compress;
private boolean setDataIfExists;
private int setDataIfExistsVersion = -1;
private boolean idempotent = false;
private ACLing acling;
private Stat storingStat;
private long ttl;

@VisibleForTesting
boolean failNextCreateForTesting = false;
@VisibleForTesting
boolean failBeforeNextCreateForTesting = false;
@VisibleForTesting
boolean failNextIdempotentCheckForTesting = false;

CreateBuilderImpl(CuratorFrameworkImpl client)
{
@@ -115,6 +120,13 @@ public CreateBuilder2 orSetData(int version)
return this;
}

@Override
public CreateBuilder2 idempotent()
{
this.idempotent = true;
return this;
}

@Override
public CreateBuilderMain withTtl(long ttl)
{
@@ -642,6 +654,10 @@ else if ( (rc == KeeperException.Code.NODEEXISTS.intValue()) && setDataIfExists
{
backgroundSetData(client, operationAndData, operationAndData.getData().getPath(), backgrounding);
}
else if ( (rc == KeeperException.Code.NODEEXISTS.intValue()) && idempotent )
{
backgroundCheckIdempotent(client, operationAndData, operationAndData.getData().getPath(), backgrounding);
}
else
{
sendBackgroundResponse(rc, path, ctx, name, stat, operationAndData);
@@ -658,6 +674,7 @@ else if ( (rc == KeeperException.Code.NODEEXISTS.intValue()) && setDataIfExists
backgrounding.getContext(),
ttl
);

}
catch ( Throwable e )
{
@@ -807,6 +824,54 @@ public void performBackgroundOperation(OperationAndData<PathAndBytes> op) throws
client.queueOperation(new OperationAndData<>(operation, null, null, null, null, null));
}

private void backgroundCheckIdempotent(final CuratorFrameworkImpl client, final OperationAndData<PathAndBytes> mainOperationAndData, final String path, final Backgrounding backgrounding)
{
final AsyncCallback.DataCallback dataCallback = new AsyncCallback.DataCallback()
{
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat)
{
if ( rc == KeeperException.Code.NONODE.intValue() )
{
client.queueOperation(mainOperationAndData); // try to create it again
}
else
{
if ( rc == KeeperException.Code.OK.intValue() )
{
if ( failNextIdempotentCheckForTesting )
{
failNextIdempotentCheckForTesting = false;
rc = KeeperException.Code.CONNECTIONLOSS.intValue();
}
else if ( !IdempotentUtils.matches(0, mainOperationAndData.getData().getData(), stat.getVersion(), data) )
{
rc = KeeperException.Code.NODEEXISTS.intValue();
}
}
sendBackgroundResponse(rc, path, ctx, path, stat, mainOperationAndData);
}
}
};
BackgroundOperation<PathAndBytes> operation = new BackgroundOperation<PathAndBytes>()
{
@Override
public void performBackgroundOperation(OperationAndData<PathAndBytes> op) throws Exception
{
try
{
client.getZooKeeper().getData(path, false, dataCallback, backgrounding.getContext());
}
catch ( KeeperException e )
{
// ignore
client.logError("Unexpected exception in async idempotent check for, ignoring: " + path, e);
}
}
};
client.queueOperation(new OperationAndData<>(operation, null, null, null, null, null));
}

private void sendBackgroundResponse(int rc, String path, Object ctx, String name, Stat stat, OperationAndData<PathAndBytes> operationAndData)
{
path = client.unfixForNamespace(path);
@@ -1085,10 +1150,25 @@ void callPerformBackgroundOperation() throws Exception
}
}

if ( failBeforeNextCreateForTesting )
{
failBeforeNextCreateForTesting = false;
throw new KeeperException.ConnectionLossException();
}

if ( failNextCreateForTesting )
{
failNextCreateForTesting = false;
pathInForeground(path, data, acling.getAclList(path)); // simulate success on server without notification to client
try
{
// simulate success on server without notification to client
// if another error occurs in pathInForeground that isn't NodeExists, this hangs instead of fully propagating the error. Likely not worth fixing though.
pathInForeground(path, data, acling.getAclList(path));
}
catch ( KeeperException.NodeExistsException e )
{
client.logError("NodeExists while injecting failure after create, ignoring: " + givenPath, e);
}
throw new KeeperException.ConnectionLossException();
}

@@ -1128,6 +1208,11 @@ public String call() throws Exception
{
try
{
if ( failBeforeNextCreateForTesting )
{
failBeforeNextCreateForTesting = false;
throw new KeeperException.ConnectionLossException();
}
createdPath = client.getZooKeeper().create(path, data, aclList, createMode, storingStat, ttl);
}
catch ( KeeperException.NoNodeException e )
@@ -1147,12 +1232,35 @@ public String call() throws Exception
if ( setDataIfExists )
{
Stat setStat = client.getZooKeeper().setData(path, data, setDataIfExistsVersion);
if(storingStat != null)
if ( storingStat != null )
{
DataTree.copyStat(setStat, storingStat);
}
createdPath = path;
}
else if ( idempotent )
{
if ( failNextIdempotentCheckForTesting )
{
failNextIdempotentCheckForTesting = false;
throw new KeeperException.ConnectionLossException();
}
Stat getStat = new Stat();
byte[] existingData = client.getZooKeeper().getData(path, false, getStat);
// check to see if data version == 0 and data matches the idempotent case
if ( IdempotentUtils.matches(0, data, getStat.getVersion(), existingData) )
{
if ( storingStat != null )
{
DataTree.copyStat(getStat, storingStat);
}
createdPath = path;
}
else
{
throw e;
}
}
else
{
throw e;

0 comments on commit d5666ab

Please sign in to comment.