Skip to content

Commit

Permalink
ZOOKEEPER-2184: Zookeeper Client should re-resolve hosts when connect…
Browse files Browse the repository at this point in the history
…ion attempts fail

This is the master/3.5 port of #451

Author: Andor Molnar <andor@cloudera.com>
Author: Andor Molnar <andor@apache.org>

Reviewers: Michael Han <hanm@apache.org>, Flavio Junqueira <fpj@apache.org>, Edward Ribeiro <edward.ribeiro@gmail.com>, Mark Fenes <mfenes@cloudera.com>, Abraham Fine <afine@apache.org>

Closes #534 from anmolnar/ZOOKEEPER-2184_master

(cherry picked from commit 0a31187)
Signed-off-by: Michael Han <hanm@apache.org>
  • Loading branch information
anmolnar authored and hanm committed Jul 16, 2018
1 parent 4174a0b commit 1e65b9f
Show file tree
Hide file tree
Showing 4 changed files with 527 additions and 80 deletions.
2 changes: 1 addition & 1 deletion build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ xmlns:cs="antlib:com.puppycrawl.tools.checkstyle.ant">
<property name="netty.version" value="3.10.6.Final"/>

<property name="junit.version" value="4.12"/>
<property name="mockito.version" value="1.8.2"/>
<property name="mockito.version" value="1.8.5"/>
<property name="checkstyle.version" value="6.13"/>
<property name="commons-collections.version" value="3.2.2"/>

Expand Down
5 changes: 3 additions & 2 deletions src/java/main/org/apache/zookeeper/client/HostProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@
*
* * The size() of a HostProvider may never be zero.
*
* A HostProvider must return resolved InetSocketAddress instances on next(),
* but it's up to the HostProvider, when it wants to do the resolving.
* A HostProvider must return resolved InetSocketAddress instances on next() if the next address is resolvable.
* In that case, it's up to the HostProvider, whether it returns the next resolvable address in the list or return
* the next one as UnResolved.
*
* Different HostProvider could be imagined:
*
Expand Down
121 changes: 77 additions & 44 deletions src/java/main/org/apache/zookeeper/client/StaticHostProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand All @@ -32,11 +33,19 @@
import org.slf4j.LoggerFactory;

/**
* Most simple HostProvider, resolves only on instantiation.
*
* Most simple HostProvider, resolves on every next() call.
*
* Please be aware that although this class doesn't do any DNS caching, there're multiple levels of caching already
* present across the stack like in JVM, OS level, hardware, etc. The best we could do here is to get the most recent
* address from the underlying system which is considered up-to-date.
*
*/
@InterfaceAudience.Public
public final class StaticHostProvider implements HostProvider {
public interface Resolver {
InetAddress[] getAllByName(String name) throws UnknownHostException;
}

private static final Logger LOG = LoggerFactory
.getLogger(StaticHostProvider.class);

Expand Down Expand Up @@ -64,6 +73,8 @@ public final class StaticHostProvider implements HostProvider {

private float pOld, pNew;

private Resolver resolver;

/**
* Constructs a SimpleHostSet.
*
Expand All @@ -73,15 +84,29 @@ public final class StaticHostProvider implements HostProvider {
* if serverAddresses is empty or resolves to an empty list
*/
public StaticHostProvider(Collection<InetSocketAddress> serverAddresses) {
sourceOfRandomness = new Random(System.currentTimeMillis() ^ this.hashCode());
init(serverAddresses,
System.currentTimeMillis() ^ this.hashCode(),
new Resolver() {
@Override
public InetAddress[] getAllByName(String name) throws UnknownHostException {
return InetAddress.getAllByName(name);
}
});
}

this.serverAddresses = resolveAndShuffle(serverAddresses);
if (this.serverAddresses.isEmpty()) {
throw new IllegalArgumentException(
"A HostProvider may not be empty!");
}
currentIndex = -1;
lastIndex = -1;
/**
* Constructs a SimpleHostSet.
*
* Introduced for testing purposes. getAllByName() is a static method of InetAddress, therefore cannot be easily mocked.
* By abstraction of Resolver interface we can easily inject a mocked implementation in tests.
*
* @param serverAddresses
* possibly unresolved ZooKeeper server addresses
* @param resolver
* custom resolver implementation
*/
public StaticHostProvider(Collection<InetSocketAddress> serverAddresses, Resolver resolver) {
init(serverAddresses, System.currentTimeMillis() ^ this.hashCode(), resolver);
}

/**
Expand All @@ -96,36 +121,47 @@ public StaticHostProvider(Collection<InetSocketAddress> serverAddresses) {
*/
public StaticHostProvider(Collection<InetSocketAddress> serverAddresses,
long randomnessSeed) {
sourceOfRandomness = new Random(randomnessSeed);
init(serverAddresses, randomnessSeed, new Resolver() {
@Override
public InetAddress[] getAllByName(String name) throws UnknownHostException {
return InetAddress.getAllByName(name);
}
});
}

this.serverAddresses = resolveAndShuffle(serverAddresses);
if (this.serverAddresses.isEmpty()) {
private void init(Collection<InetSocketAddress> serverAddresses, long randomnessSeed, Resolver resolver) {
this.sourceOfRandomness = new Random(randomnessSeed);
this.resolver = resolver;
if (serverAddresses.isEmpty()) {
throw new IllegalArgumentException(
"A HostProvider may not be empty!");
}
}
this.serverAddresses = shuffle(serverAddresses);
currentIndex = -1;
lastIndex = -1;
lastIndex = -1;
}

private List<InetSocketAddress> resolveAndShuffle(Collection<InetSocketAddress> serverAddresses) {
List<InetSocketAddress> tmpList = new ArrayList<InetSocketAddress>(serverAddresses.size());
for (InetSocketAddress address : serverAddresses) {
try {
InetAddress ia = address.getAddress();
String addr = (ia != null) ? ia.getHostAddress() : address.getHostString();
InetAddress resolvedAddresses[] = InetAddress.getAllByName(addr);
for (InetAddress resolvedAddress : resolvedAddresses) {
InetAddress taddr = InetAddress.getByAddress(address.getHostString(), resolvedAddress.getAddress());
tmpList.add(new InetSocketAddress(taddr, address.getPort()));
}
} catch (UnknownHostException ex) {
LOG.warn("No IP address found for server: {}", address, ex);
private InetSocketAddress resolve(InetSocketAddress address) {
try {
String curHostString = address.getHostString();
List<InetAddress> resolvedAddresses = new ArrayList<>(Arrays.asList(this.resolver.getAllByName(curHostString)));
if (resolvedAddresses.isEmpty()) {
return address;
}
Collections.shuffle(resolvedAddresses);
return new InetSocketAddress(resolvedAddresses.get(0), address.getPort());
} catch (UnknownHostException e) {
LOG.error("Unable to resolve address: {}", address.toString(), e);
return address;
}
}

private List<InetSocketAddress> shuffle(Collection<InetSocketAddress> serverAddresses) {
List<InetSocketAddress> tmpList = new ArrayList<>(serverAddresses.size());
tmpList.addAll(serverAddresses);
Collections.shuffle(tmpList, sourceOfRandomness);
return tmpList;
}

}

/**
* Update the list of servers. This returns true if changing connections is necessary for load-balancing, false
Expand All @@ -149,15 +185,12 @@ private List<InetSocketAddress> resolveAndShuffle(Collection<InetSocketAddress>
* @param currentHost the host to which this client is currently connected
* @return true if changing connections is necessary for load-balancing, false otherwise
*/


@Override
public synchronized boolean updateServerList(
Collection<InetSocketAddress> serverAddresses,
InetSocketAddress currentHost) {
// Resolve server addresses and shuffle them
List<InetSocketAddress> resolvedList = resolveAndShuffle(serverAddresses);
if (resolvedList.isEmpty()) {
List<InetSocketAddress> shuffledList = shuffle(serverAddresses);
if (shuffledList.isEmpty()) {
throw new IllegalArgumentException(
"A HostProvider may not be empty!");
}
Expand All @@ -183,7 +216,7 @@ public synchronized boolean updateServerList(
}
}

for (InetSocketAddress addr : resolvedList) {
for (InetSocketAddress addr : shuffledList) {
if (addr.getPort() == myServer.getPort()
&& ((addr.getAddress() != null
&& myServer.getAddress() != null && addr
Expand All @@ -200,11 +233,11 @@ public synchronized boolean updateServerList(
oldServers.clear();
// Divide the new servers into oldServers that were in the previous list
// and newServers that were not in the previous list
for (InetSocketAddress resolvedAddress : resolvedList) {
if (this.serverAddresses.contains(resolvedAddress)) {
oldServers.add(resolvedAddress);
for (InetSocketAddress address : shuffledList) {
if (this.serverAddresses.contains(address)) {
oldServers.add(address);
} else {
newServers.add(resolvedAddress);
newServers.add(address);
}
}

Expand Down Expand Up @@ -245,11 +278,11 @@ public synchronized boolean updateServerList(
}

if (!reconfigMode) {
currentIndex = resolvedList.indexOf(getServerAtCurrentIndex());
currentIndex = shuffledList.indexOf(getServerAtCurrentIndex());
} else {
currentIndex = -1;
}
this.serverAddresses = resolvedList;
this.serverAddresses = shuffledList;
currentIndexOld = -1;
currentIndexNew = -1;
lastIndex = currentIndex;
Expand Down Expand Up @@ -314,7 +347,7 @@ public InetSocketAddress next(long spinDelay) {
addr = nextHostInReconfigMode();
if (addr != null) {
currentIndex = serverAddresses.indexOf(addr);
return addr;
return resolve(addr);
}
//tried all servers and couldn't connect
reconfigMode = false;
Expand All @@ -339,7 +372,7 @@ public InetSocketAddress next(long spinDelay) {
}
}

return addr;
return resolve(addr);
}

public synchronized void onConnected() {
Expand Down

0 comments on commit 1e65b9f

Please sign in to comment.