Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZOOKEEPER-3188: Improve resilience to network #1048

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
7bfbe7e
ZOOKEEPER-3188: Improve resilience to network
symat Aug 9, 2019
5b22432
ZOOKEEPER-3188: fix LeaderElection to work with multiple election add…
symat Aug 12, 2019
6c4220a
ZOOKEEPER-3188: fix SendWorker.asyncValidateIfSocketIsStillReachable
symat Aug 12, 2019
42a52a6
ZOOKEEPER-3188: improve based on code review comments
symat Aug 14, 2019
5bd1f4e
ZOOKEEPER-3188: supress spotbugs warning
symat Aug 14, 2019
da98a8d
ZOOKEEPER-3188: fix JDK-13 warning
symat Aug 14, 2019
de7bad2
Merge remote-tracking branch 'origin/master' into ZOOKEEPER-3188
symat Aug 23, 2019
e823af4
Merge remote-tracking branch 'origin/master' into ZOOKEEPER-3188
symat Aug 26, 2019
05eae83
Merge remote-tracking branch 'apache/master' into ZOOKEEPER-3188
Oct 4, 2019
8713a5b
ZOOKEEPER-3188: add fixes for PR comments
Oct 4, 2019
ed31d2c
ZOOKEEPER-3188: better shutdown for executors (following PR comments)
Oct 4, 2019
a5d6bcb
ZOOKEEPER-3188: support for dynamic reconfig + add more unit tests
Oct 8, 2019
483d2fc
Merge remote-tracking branch 'apache/master' into ZOOKEEPER-3188
Oct 8, 2019
2eedf26
ZOOKEEPER-3188: fix PR commits; handle case when Leader can not bind …
Oct 9, 2019
6f2ab75
Merge remote-tracking branch 'apache/master' into ZOOKEEPER-3188
Nov 2, 2019
e892d8d
Merge remote-tracking branch 'apache/master' into ZOOKEEPER-3188
Nov 4, 2019
e232c55
ZOOKEEPER-3188: fix flaky unit MultiAddress unit test
symat Nov 4, 2019
0f95678
ZOOKEEPER-3188: skip unreachable addresses when Learner connects to L…
symat Nov 13, 2019
31805e7
Merge remote-tracking branch 'apache/master' into ZOOKEEPER-3188
Nov 13, 2019
f875f5c
Merge remote-tracking branch 'apache/master' into ZOOKEEPER-3188
symat Nov 13, 2019
40bc44c
Merge remote-tracking branch 'apache/master' into ZOOKEEPER-3188
symat Nov 16, 2019
4b6bcea
ZOOKEEPER-3188: MultiAddress unit tests for Quorum TLS and Kerberos/D…
symat Nov 17, 2019
45b6c0f
Merge remote-tracking branch 'apache/master' into ZOOKEEPER-3188
symat Nov 19, 2019
356882d
ZOOKEEPER-3188: document new configuration format for using multiple …
symat Nov 19, 2019
3c6fc52
Merge remote-tracking branch 'apache/master' into ZOOKEEPER-3188
symat Nov 28, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -18,6 +18,7 @@

package org.apache.zookeeper.server;

import java.net.InetSocketAddress;
import org.apache.zookeeper.server.quorum.Observer;
import org.apache.zookeeper.server.quorum.ObserverMXBean;
import org.apache.zookeeper.server.quorum.QuorumPeer;
Expand Down Expand Up @@ -49,10 +50,11 @@ public String getQuorumAddress() {

public String getLearnerMaster() {
QuorumPeer.QuorumServer learnerMaster = observer.getCurrentLearnerMaster();
if (learnerMaster == null || learnerMaster.addr == null) {
if (learnerMaster == null || learnerMaster.addr.isEmpty()) {
return "Unknown";
}
return learnerMaster.addr.getAddress().getHostAddress() + ":" + learnerMaster.addr.getPort();
InetSocketAddress address = learnerMaster.addr.getReachableOrOne();
return address.getAddress().getHostAddress() + ":" + address.getPort();
}

public void setLearnerMaster(String learnerMaster) {
Expand Down
Expand Up @@ -18,7 +18,9 @@

package org.apache.zookeeper.server.admin;

import com.fasterxml.jackson.annotation.JsonAnyGetter;
import com.fasterxml.jackson.annotation.JsonProperty;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -43,7 +45,9 @@
import org.apache.zookeeper.server.quorum.FollowerZooKeeperServer;
import org.apache.zookeeper.server.quorum.Leader;
import org.apache.zookeeper.server.quorum.LeaderZooKeeperServer;
import org.apache.zookeeper.server.quorum.MultipleAddresses;
import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
import org.apache.zookeeper.server.quorum.QuorumZooKeeperServer;
import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
Expand Down Expand Up @@ -672,53 +676,62 @@ public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs)
CommandResponse response = initializeResponse();
if (zkServer instanceof QuorumZooKeeperServer) {
QuorumPeer peer = ((QuorumZooKeeperServer) zkServer).self;
VotingView votingView = new VotingView(peer.getVotingView());
Map<Long, QuorumServerView> votingView = peer.getVotingView().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> new QuorumServerView(e.getValue())));
response.put("current_config", votingView);
} else {
response.put("current_config", Collections.emptyMap());
}
return response;
}

private static class VotingView {

private final Map<Long, String> view;

VotingView(Map<Long, QuorumPeer.QuorumServer> view) {
this.view = view.entrySet()
.stream()
.filter(e -> e.getValue().addr != null)
.collect(Collectors.toMap(
Map.Entry::getKey,
e -> String.format(
"%s:%d%s:%s%s",
QuorumPeer.QuorumServer.delimitedHostString(e.getValue().addr),
e.getValue().addr.getPort(),
e.getValue().electionAddr == null ? "" : ":" + e.getValue().electionAddr.getPort(),
e.getValue().type.equals(QuorumPeer.LearnerType.PARTICIPANT) ? "participant" : "observer",
e.getValue().clientAddr == null || e.getValue().isClientAddrFromStatic
? ""
: String.format(
";%s:%d",
QuorumPeer.QuorumServer.delimitedHostString(e.getValue().clientAddr),
e.getValue().clientAddr.getPort())),
(v1, v2) -> v1, // cannot get duplicates as this straight draws from the other map
TreeMap::new));
@SuppressFBWarnings(value = "URF_UNREAD_FIELD", justification = "class is used only for JSON serialization")
private static class QuorumServerView {

@JsonProperty
private List<String> serverAddresses;

@JsonProperty
private List<String> electionAddresses;

@JsonProperty
private String clientAddress;

@JsonProperty
private String learnerType;

public QuorumServerView(QuorumPeer.QuorumServer quorumServer) {
this.serverAddresses = getMultiAddressString(quorumServer.addr);
this.electionAddresses = getMultiAddressString(quorumServer.electionAddr);
this.learnerType = quorumServer.type.equals(LearnerType.PARTICIPANT) ? "participant" : "observer";
this.clientAddress = getAddressString(quorumServer.clientAddr);
}

@JsonAnyGetter
public Map<Long, String> getView() {
return view;
private static List<String> getMultiAddressString(MultipleAddresses multipleAddresses) {
if (multipleAddresses == null) {
return Collections.emptyList();
}

return multipleAddresses.getAllAddresses().stream()
.map(QuorumServerView::getAddressString)
.collect(Collectors.toList());
}

}
private static String getAddressString(InetSocketAddress address) {
if (address == null) {
return "";
}
return String.format("%s:%d", QuorumPeer.QuorumServer.delimitedHostString(address), address.getPort());
}
}

}

/**
* Watch information aggregated by session. Returned Map contains:
* - "session_id_to_watched_paths": Map&lt;Long, Set&lt;String&gt;&gt; session ID -&gt; watched paths
* @see DataTree#getWatches()
* @see DataTree#getWatches()
*/
public static class WatchCommand extends CommandBase {

Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -713,7 +714,8 @@ private void process(ToSend m) {
}

for (QuorumServer server : self.getVotingView().values()) {
InetSocketAddress saddr = new InetSocketAddress(server.addr.getAddress(), port);
InetAddress address = server.addr.getReachableOrOne().getAddress();
InetSocketAddress saddr = new InetSocketAddress(address, port);
addrChallengeMap.put(saddr, new ConcurrentHashMap<Long, Long>());
}

Expand Down Expand Up @@ -741,7 +743,7 @@ public AuthFastLeaderElection(QuorumPeer self) {

private void starter(QuorumPeer self) {
this.self = self;
port = self.getVotingView().get(self.getId()).electionAddr.getPort();
port = self.getVotingView().get(self.getId()).electionAddr.getAllPorts().get(0);
proposedLeader = -1;
proposedZxid = -1;

Expand All @@ -764,14 +766,15 @@ private void leaveInstance() {
private void sendNotifications() {
for (QuorumServer server : self.getView().values()) {

InetSocketAddress address = self.getView().get(server.id).electionAddr.getReachableOrOne();
ToSend notmsg = new ToSend(
ToSend.mType.notification,
AuthFastLeaderElection.sequencer++,
proposedLeader,
proposedZxid,
logicalclock.get(),
QuorumPeer.ServerState.LOOKING,
self.getView().get(server.id).electionAddr);
address);

sendqueue.offer(notmsg);
}
Expand Down