Skip to content
Permalink
Browse files
[CURATOR-558] - Updates for ZooKeeper 3.6.0 (#350)
* CURATOR-558

Bring Curator up to ZooKeeper 3.5.6 in preparation for supporting persistent recursive watchers while maintaining background compatability with previous versions of ZK. Added a new module to make sure we maintain compatibility with ZK 3.5.x. ZooKeeper 3.6.0 has some significant changes from previous versions. The reconfig APIs have moved into a new class, ZooKeeperAdmin. This class existed in 3.5.x but wasn't required. Now it is. A bunch of little things changed in the ZK server code which affected Curator's test classes. I moved it all into reflection based calls in Compatibility.java in the test module. We now have modules that test ZK 3.4, 3.5 and 3.6 so we're safe with compatibility. ZooKeeper's MultiTransactionRecord has been removed it seems. That forced CuratorMultiTransactionRecord to be re-written. It's not a public class so hopefully it won't affect anyone.

There is a new module, curator-test-zk35. It forces ZooKeeper 3.5.6 and performs selected tests from the other modules to ensure compatibility. Tests annotated with TestNG groups zk35 and zk35Compatibility are tested. Group zk36 is excluded. Note: these tests will only run from Maven. I don't think IntelliJ/Eclipse support the Maven syntax I used.
Support persistent watchers in ZK 3.6+ while maintaining background compatability with previous versions of ZK. Added a new module to make sure we maintain comaptibility with ZK 3.5.x

* CURATOR-558 - change to version 5.0.0-SNAPSHOT

Co-authored-by: randgalt <randgalt@apache.org>
  • Loading branch information
Randgalt and randgalt committed Mar 16, 2020
1 parent d1a9234 commit 7bbb7196173b88baac0e102bdff265dc7f2ee77e
Showing 47 changed files with 781 additions and 87 deletions.
@@ -23,11 +23,11 @@
<parent>
<groupId>org.apache.curator</groupId>
<artifactId>apache-curator</artifactId>
<version>4.2.1-SNAPSHOT</version>
<version>5.0.0-SNAPSHOT</version>
</parent>

<artifactId>curator-client</artifactId>
<version>4.2.1-SNAPSHOT</version>
<version>5.0.0-SNAPSHOT</version>
<packaging>bundle</packaging>

<name>Curator Client</name>
@@ -0,0 +1,104 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.curator.utils;

import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;

/**
* Utils to help with ZK version compatibility
*/
public class Compatibility
{
private static final Method getReachableOrOneMethod;
private static final Field addrField;

private static final Logger log = LoggerFactory.getLogger(Compatibility.class);

static
{
Method localGetReachableOrOneMethod;
try
{
Class<?> multipleAddressesClass = Class.forName("org.apache.zookeeper.server.quorum.MultipleAddresses");
localGetReachableOrOneMethod = multipleAddressesClass.getMethod("getReachableOrOne");
log.info("Using org.apache.zookeeper.server.quorum.MultipleAddresses");
}
catch ( ReflectiveOperationException ignore )
{
localGetReachableOrOneMethod = null;
}
getReachableOrOneMethod = localGetReachableOrOneMethod;

Field localAddrField;
try
{
localAddrField = QuorumPeer.QuorumServer.class.getField("addr");
}
catch ( NoSuchFieldException e )
{
localAddrField = null;
log.error("Could not get addr field! Reconfiguration fail!");
}
addrField = localAddrField;
}

public static boolean hasGetReachableOrOneMethod()
{
return (getReachableOrOneMethod != null);
}

public static boolean hasAddrField()
{
return (addrField != null);
}

public static String getHostAddress(QuorumPeer.QuorumServer server)
{
InetSocketAddress address = null;
if ( getReachableOrOneMethod != null )
{
try
{
address = (InetSocketAddress)getReachableOrOneMethod.invoke(server.addr);
}
catch ( Exception e )
{
log.error("Could not call getReachableOrOneMethod.invoke({})", server.addr, e);
}
}
else if (addrField != null)
{
try
{
address = (InetSocketAddress)addrField.get(server);
}
catch ( Exception e )
{
log.error("Could not call addrField.get({})", server, e);
}
}
return (address != null) ? address.getAddress().getHostAddress() : "unknown";
}
}
@@ -20,12 +20,13 @@

import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.admin.ZooKeeperAdmin;

public class DefaultZookeeperFactory implements ZookeeperFactory
{
@Override
public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
{
return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
return new ZooKeeperAdmin(connectString, sessionTimeout, watcher, canBeReadOnly);
}
}
@@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.curator.utils;

import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

public class NonAdminZookeeperFactory implements ZookeeperFactory
{
@Override
public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
{
return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
}
}
@@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.curator;

import org.apache.curator.test.compatibility.CuratorTestBase;
import org.apache.curator.utils.Compatibility;
import org.testng.Assert;
import org.testng.annotations.Test;

public class TestIs36 extends CuratorTestBase
{
@Test(groups = zk36Group)
public void testIsZk36()
{
Assert.assertTrue(Compatibility.hasGetReachableOrOneMethod());
Assert.assertTrue(Compatibility.hasAddrField());
}

@Override
protected void createServer()
{
// NOP
}
}
@@ -24,7 +24,7 @@
<parent>
<groupId>org.apache.curator</groupId>
<artifactId>apache-curator</artifactId>
<version>4.2.1-SNAPSHOT</version>
<version>5.0.0-SNAPSHOT</version>
</parent>

<artifactId>curator-examples</artifactId>
@@ -24,11 +24,11 @@
<parent>
<groupId>org.apache.curator</groupId>
<artifactId>apache-curator</artifactId>
<version>4.2.1-SNAPSHOT</version>
<version>5.0.0-SNAPSHOT</version>
</parent>

<artifactId>curator-framework</artifactId>
<version>4.2.1-SNAPSHOT</version>
<version>5.0.0-SNAPSHOT</version>
<packaging>bundle</packaging>

<name>Curator Framework</name>
@@ -194,6 +194,7 @@ public interface CuratorFramework extends Closeable

/**
* Start a remove watches builder.
*
* @return builder object
*/
public RemoveWatchesBuilder watches();
@@ -96,5 +96,10 @@
/**
* Event sent when client is being closed
*/
CLOSING
CLOSING,

/**
* Corresponds to {@link CuratorFramework#watches()}
*/
ADD_WATCH
}
@@ -16,49 +16,57 @@
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.curator.framework.imps;

import com.google.common.collect.Lists;
import org.apache.curator.framework.api.transaction.OperationType;
import org.apache.curator.framework.api.transaction.TypeAndPath;
import org.apache.zookeeper.MultiTransactionRecord;
import org.apache.zookeeper.Op;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

class CuratorMultiTransactionRecord extends MultiTransactionRecord
class CuratorMultiTransactionRecord implements Iterable<Op>
{
private final List<TypeAndPath> metadata = Lists.newArrayList();

@Override
public final void add(Op op)
{
throw new UnsupportedOperationException();
}
private final List<TypeAndPath> metadata = Lists.newArrayList();
private final List<Op> ops = new ArrayList<>();

void add(Op op, OperationType type, String forPath)
{
super.add(op);
ops.add(op);
metadata.add(new TypeAndPath(type, forPath));
}

TypeAndPath getMetadata(int index)
TypeAndPath getMetadata(int index)
{
return metadata.get(index);
}

int metadataSize()
int metadataSize()
{
return metadata.size();
}

void addToDigest(MessageDigest digest)
{
for ( Op op : this )
for ( Op op : ops )
{
digest.update(op.getPath().getBytes());
digest.update(Integer.toString(op.getType()).getBytes());
digest.update(op.toRequestRecord().toString().getBytes());
}
}

@Override
public Iterator<Op> iterator()
{
return ops.iterator();
}

int size()
{
return ops.size();
}
}
@@ -31,6 +31,7 @@
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.utils.Compatibility;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
@@ -39,7 +40,6 @@
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.util.Properties;
@@ -182,7 +182,7 @@ public static String configToConnectionString(QuorumVerifier data) throws Except
String hostAddress;
if ( server.clientAddr.getAddress().isAnyLocalAddress() )
{
hostAddress = server.addr.getAddress().getHostAddress();
hostAddress = Compatibility.getHostAddress(server);
}
else
{
@@ -24,6 +24,7 @@
import org.apache.curator.TimeTrace;
import org.apache.curator.framework.api.*;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.admin.ZooKeeperAdmin;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.DataTree;
import java.util.Arrays;
@@ -268,7 +269,7 @@ public void processResult(int rc, String path, Object ctx, byte[] bytes, Stat st
client.processBackgroundOperation(data, event);
}
};
client.getZooKeeper().reconfig(joining, leaving, newMembers, fromConfig, callback, backgrounding.getContext());
((ZooKeeperAdmin)client.getZooKeeper()).reconfigure(joining, leaving, newMembers, fromConfig, callback, backgrounding.getContext());
}
catch ( Throwable e )
{
@@ -287,7 +288,7 @@ private byte[] ensembleInForeground() throws Exception
@Override
public byte[] call() throws Exception
{
return client.getZooKeeper().reconfig(joining, leaving, newMembers, fromConfig, responseStat);
return ((ZooKeeperAdmin)client.getZooKeeper()).reconfigure(joining, leaving, newMembers, fromConfig, responseStat);
}
}
);
@@ -201,8 +201,13 @@ public Void forPath(String path) throws Exception
}

return null;
}

}

protected CuratorFrameworkImpl getClient()
{
return client;
}

private void pathInBackground(final String path)
{
OperationAndData.ErrorCallback<String> errorCallback = null;

0 comments on commit 7bbb719

Please sign in to comment.