Skip to content

Commit

Permalink
Reset known leader when command fails due to ConnectException to ensu…
Browse files Browse the repository at this point in the history
…re new leaders can be found quickly.
  • Loading branch information
kuujo committed Nov 2, 2017
1 parent a6ff559 commit 3b8a95d
Show file tree
Hide file tree
Showing 11 changed files with 87 additions and 58 deletions.
Expand Up @@ -36,11 +36,20 @@ public interface RaftMetadataClient {
MemberId getLeader(); MemberId getLeader();


/** /**
* Returns the set of known servers in the cluster. * Returns the set of known members in the cluster.
* *
* @return The set of known servers in the cluster. * @return The set of known members in the cluster.
*/ */
Collection<MemberId> getServers(); default Collection<MemberId> getServers() {
return getMembers();
}

/**
* Returns the set of known members in the cluster.
*
* @return The set of known members in the cluster.
*/
Collection<MemberId> getMembers();


/** /**
* Returns a list of open sessions. * Returns a list of open sessions.
Expand Down
Expand Up @@ -61,8 +61,8 @@ public MemberId getLeader() {
} }


@Override @Override
public Collection<MemberId> getServers() { public Collection<MemberId> getMembers() {
return selectorManager.servers(); return selectorManager.members();
} }


/** /**
Expand Down
Expand Up @@ -38,9 +38,9 @@ public enum CommunicationStrategy {
*/ */
ANY { ANY {
@Override @Override
public List<MemberId> selectConnections(MemberId leader, List<MemberId> servers) { public List<MemberId> selectConnections(MemberId leader, List<MemberId> members) {
Collections.shuffle(servers); Collections.shuffle(members);
return servers; return members;
} }
}, },


Expand All @@ -55,12 +55,12 @@ public List<MemberId> selectConnections(MemberId leader, List<MemberId> servers)
*/ */
LEADER { LEADER {
@Override @Override
public List<MemberId> selectConnections(MemberId leader, List<MemberId> servers) { public List<MemberId> selectConnections(MemberId leader, List<MemberId> members) {
if (leader != null) { if (leader != null) {
return Collections.singletonList(leader); return Collections.singletonList(leader);
} }
Collections.shuffle(servers); Collections.shuffle(members);
return servers; return members;
} }
}, },


Expand All @@ -73,18 +73,18 @@ public List<MemberId> selectConnections(MemberId leader, List<MemberId> servers)
*/ */
FOLLOWERS { FOLLOWERS {
@Override @Override
public List<MemberId> selectConnections(MemberId leader, List<MemberId> servers) { public List<MemberId> selectConnections(MemberId leader, List<MemberId> members) {
Collections.shuffle(servers); Collections.shuffle(members);
if (leader != null && servers.size() > 1) { if (leader != null && members.size() > 1) {
List<MemberId> results = new ArrayList<>(servers.size()); List<MemberId> results = new ArrayList<>(members.size());
for (MemberId memberId : servers) { for (MemberId memberId : members) {
if (!memberId.equals(leader)) { if (!memberId.equals(leader)) {
results.add(memberId); results.add(memberId);
} }
} }
return results; return results;
} }
return servers; return members;
} }
}; };


Expand All @@ -99,11 +99,11 @@ public List<MemberId> selectConnections(MemberId leader, List<MemberId> servers)
* *
* @param leader The current cluster leader. The {@code leader} may be {@code null} if no current * @param leader The current cluster leader. The {@code leader} may be {@code null} if no current
* leader exists. * leader exists.
* @param servers The full list of available servers. The provided server list is representative * @param members The full list of available servers. The provided server list is representative
* of the most recent membership update received by the client. The server list * of the most recent membership update received by the client. The server list
* may evolve over time as the structure of the cluster changes. * may evolve over time as the structure of the cluster changes.
* @return A collection of servers to which the client can connect. * @return A collection of servers to which the client can connect.
*/ */
public abstract List<MemberId> selectConnections(MemberId leader, List<MemberId> servers); public abstract List<MemberId> selectConnections(MemberId leader, List<MemberId> members);


} }
Expand Up @@ -57,18 +57,18 @@ public enum State {


private final MemberSelectorManager selectors; private final MemberSelectorManager selectors;
private MemberId leader; private MemberId leader;
private Collection<MemberId> servers = new LinkedList<>(); private Collection<MemberId> members = new LinkedList<>();
private volatile MemberId selection; private volatile MemberId selection;
private final CommunicationStrategy strategy; private final CommunicationStrategy strategy;
private Collection<MemberId> selections = new LinkedList<>(); private Collection<MemberId> selections = new LinkedList<>();
private Iterator<MemberId> selectionsIterator; private Iterator<MemberId> selectionsIterator;


public MemberSelector(MemberId leader, Collection<MemberId> servers, CommunicationStrategy strategy, MemberSelectorManager selectors) { public MemberSelector(MemberId leader, Collection<MemberId> members, CommunicationStrategy strategy, MemberSelectorManager selectors) {
this.leader = leader; this.leader = leader;
this.servers = checkNotNull(servers, "servers cannot be null"); this.members = checkNotNull(members, "servers cannot be null");
this.strategy = checkNotNull(strategy, "strategy cannot be null"); this.strategy = checkNotNull(strategy, "strategy cannot be null");
this.selectors = checkNotNull(selectors, "selectors cannot be null"); this.selectors = checkNotNull(selectors, "selectors cannot be null");
this.selections = strategy.selectConnections(leader, new ArrayList<>(servers)); this.selections = strategy.selectConnections(leader, new ArrayList<>(members));
} }


/** /**
Expand Down Expand Up @@ -109,34 +109,34 @@ public MemberId leader() {
* *
* @return The current set of servers. * @return The current set of servers.
*/ */
public Collection<MemberId> servers() { public Collection<MemberId> members() {
return servers; return members;
} }


/** /**
* Resets the addresses. * Resets the member iterator.
* *
* @return The address selector. * @return The member selector.
*/ */
public MemberSelector reset() { public MemberSelector reset() {
if (selectionsIterator != null) { if (selectionsIterator != null) {
this.selections = strategy.selectConnections(leader, new ArrayList<>(servers)); this.selections = strategy.selectConnections(leader, new ArrayList<>(members));
this.selectionsIterator = null; this.selectionsIterator = null;
} }
return this; return this;
} }


/** /**
* Resets the connection addresses. * Resets the connection leader and members.
* *
* @param servers The collection of server addresses. * @param members The collection of members.
* @return The address selector. * @return The member selector.
*/ */
public MemberSelector reset(MemberId leader, Collection<MemberId> servers) { public MemberSelector reset(MemberId leader, Collection<MemberId> members) {
if (changed(leader, servers)) { if (changed(leader, members)) {
this.leader = leader; this.leader = leader;
this.servers = servers; this.members = members;
this.selections = strategy.selectConnections(leader, new ArrayList<>(servers)); this.selections = strategy.selectConnections(leader, new ArrayList<>(members));
this.selectionsIterator = null; this.selectionsIterator = null;
} }
return this; return this;
Expand All @@ -156,7 +156,7 @@ private boolean changed(MemberId leader, Collection<MemberId> servers) {
} else if (this.leader != null && !this.leader.equals(leader)) { } else if (this.leader != null && !this.leader.equals(leader)) {
checkArgument(servers.contains(leader), "leader must be present in the servers list"); checkArgument(servers.contains(leader), "leader must be present in the servers list");
return true; return true;
} else if (!matches(this.servers, servers)) { } else if (!matches(this.members, servers)) {
return true; return true;
} }
return false; return false;
Expand Down
Expand Up @@ -30,7 +30,7 @@
public final class MemberSelectorManager { public final class MemberSelectorManager {
private final Set<MemberSelector> selectors = new CopyOnWriteArraySet<>(); private final Set<MemberSelector> selectors = new CopyOnWriteArraySet<>();
private volatile MemberId leader; private volatile MemberId leader;
private volatile Collection<MemberId> servers = Collections.emptyList(); private volatile Collection<MemberId> members = Collections.emptyList();


/** /**
* Returns the current cluster leader. * Returns the current cluster leader.
Expand All @@ -42,12 +42,12 @@ public MemberId leader() {
} }


/** /**
* Returns the set of servers in the cluster. * Returns the set of members in the cluster.
* *
* @return The set of servers in the cluster. * @return The set of members in the cluster.
*/ */
public Collection<MemberId> servers() { public Collection<MemberId> members() {
return servers; return members;
} }


/** /**
Expand All @@ -57,7 +57,7 @@ public Collection<MemberId> servers() {
* @return A new address selector. * @return A new address selector.
*/ */
public MemberSelector createSelector(CommunicationStrategy selectionStrategy) { public MemberSelector createSelector(CommunicationStrategy selectionStrategy) {
MemberSelector selector = new MemberSelector(leader, servers, selectionStrategy, this); MemberSelector selector = new MemberSelector(leader, members, selectionStrategy, this);
selectors.add(selector); selectors.add(selector);
return selector; return selector;
} }
Expand All @@ -73,18 +73,18 @@ public void resetAll() {
* Resets all child selectors. * Resets all child selectors.
* *
* @param leader The current cluster leader. * @param leader The current cluster leader.
* @param servers The collection of all active servers. * @param members The collection of all active members.
*/ */
public void resetAll(MemberId leader, Collection<MemberId> servers) { public void resetAll(MemberId leader, Collection<MemberId> members) {
this.leader = leader; this.leader = leader;
this.servers = new LinkedList<>(servers); this.members = new LinkedList<>(members);
selectors.forEach(s -> s.reset(leader, servers)); selectors.forEach(s -> s.reset(leader, members));
} }


/** /**
* Removes the given selector. * Removes the given selector.
* *
* @param selector The address selector to remove. * @param selector The member selector to remove.
*/ */
void remove(MemberSelector selector) { void remove(MemberSelector selector) {
selectors.remove(selector); selectors.remove(selector);
Expand Down
Expand Up @@ -75,6 +75,23 @@ public RaftProxyConnection(RaftClientProtocol protocol, MemberSelector selector,
this.log = ContextualLoggerFactory.getLogger(getClass(), loggerContext); this.log = ContextualLoggerFactory.getLogger(getClass(), loggerContext);
} }


/**
* Resets the member selector.
*/
public void reset() {
selector.reset();
}

/**
* Resets the member selector.
*
* @param leader the selector leader
* @param servers the selector servers
*/
public void reset(MemberId leader, Collection<MemberId> servers) {
selector.reset(leader, servers);
}

/** /**
* Returns the current selector leader. * Returns the current selector leader.
* *
Expand All @@ -85,12 +102,12 @@ public MemberId leader() {
} }


/** /**
* Returns the current set of servers. * Returns the current set of members.
* *
* @return The current set of servers. * @return The current set of members.
*/ */
public Collection<MemberId> servers() { public Collection<MemberId> members() {
return selector.servers(); return selector.members();
} }


/** /**
Expand Down
Expand Up @@ -361,6 +361,9 @@ else if (response.error().type() == RaftError.Type.UNKNOWN_CLIENT
retry(Duration.ofSeconds(FIBONACCI[Math.min(attempt - 1, FIBONACCI.length - 1)])); retry(Duration.ofSeconds(FIBONACCI[Math.min(attempt - 1, FIBONACCI.length - 1)]));
} }
} else if (EXCEPTION_PREDICATE.test(error) || (error instanceof CompletionException && EXCEPTION_PREDICATE.test(error.getCause()))) { } else if (EXCEPTION_PREDICATE.test(error) || (error instanceof CompletionException && EXCEPTION_PREDICATE.test(error.getCause()))) {
if (error instanceof ConnectException || error.getCause() instanceof ConnectException) {
leaderConnection.reset(null, leaderConnection.members());
}
retry(Duration.ofSeconds(FIBONACCI[Math.min(attempt - 1, FIBONACCI.length - 1)])); retry(Duration.ofSeconds(FIBONACCI[Math.min(attempt - 1, FIBONACCI.length - 1)]));
} else { } else {
fail(error); fail(error);
Expand Down
Expand Up @@ -111,7 +111,7 @@ private void handlePublish(PublishRequest request) {
.withIndex(eventIndex) .withIndex(eventIndex)
.build(); .build();
log.trace("Sending {}", resetRequest); log.trace("Sending {}", resetRequest);
protocol.reset(memberSelector.servers(), resetRequest); protocol.reset(memberSelector.members(), resetRequest);
return; return;
} }


Expand Down
Expand Up @@ -317,7 +317,7 @@ private synchronized void keepAliveSessions(long lastKeepAliveTime, long session
// If the timeout has not been passed, attempt to keep the session alive again with no delay. // If the timeout has not been passed, attempt to keep the session alive again with no delay.
// We will continue to retry until the session expiration has passed. // We will continue to retry until the session expiration has passed.
else if (System.currentTimeMillis() - lastKeepAliveTime < sessionTimeout) { else if (System.currentTimeMillis() - lastKeepAliveTime < sessionTimeout) {
selectorManager.resetAll(null, connection.servers()); selectorManager.resetAll(null, connection.members());
keepAliveSessions(lastKeepAliveTime, sessionTimeout); keepAliveSessions(lastKeepAliveTime, sessionTimeout);
} }
// If no leader was set, set the session state to unstable and schedule another keep-alive. // If no leader was set, set the session state to unstable and schedule another keep-alive.
Expand All @@ -330,7 +330,7 @@ else if (System.currentTimeMillis() - lastKeepAliveTime < sessionTimeout) {
// If the timeout has not been passed, reset the connection and attempt to keep the session alive // If the timeout has not been passed, reset the connection and attempt to keep the session alive
// again with no delay. // again with no delay.
else if (System.currentTimeMillis() - lastKeepAliveTime < sessionTimeout && connection.leader() != null) { else if (System.currentTimeMillis() - lastKeepAliveTime < sessionTimeout && connection.leader() != null) {
selectorManager.resetAll(null, connection.servers()); selectorManager.resetAll(null, connection.members());
keepAliveSessions(lastKeepAliveTime, sessionTimeout); keepAliveSessions(lastKeepAliveTime, sessionTimeout);
} }
// If no leader was set, set the session state to unstable and schedule another keep-alive. // If no leader was set, set the session state to unstable and schedule another keep-alive.
Expand Down
Expand Up @@ -35,13 +35,13 @@ public class MemberSelectorManagerTest {
public void testMemberSelectorManager() throws Exception { public void testMemberSelectorManager() throws Exception {
MemberSelectorManager selectorManager = new MemberSelectorManager(); MemberSelectorManager selectorManager = new MemberSelectorManager();
assertNull(selectorManager.leader()); assertNull(selectorManager.leader());
assertEquals(0, selectorManager.servers().size()); assertEquals(0, selectorManager.members().size());
selectorManager.resetAll(); selectorManager.resetAll();
assertNull(selectorManager.leader()); assertNull(selectorManager.leader());
assertEquals(0, selectorManager.servers().size()); assertEquals(0, selectorManager.members().size());
selectorManager.resetAll(MemberId.from("a"), Arrays.asList(MemberId.from("a"), MemberId.from("b"), MemberId.from("c"))); selectorManager.resetAll(MemberId.from("a"), Arrays.asList(MemberId.from("a"), MemberId.from("b"), MemberId.from("c")));
assertEquals(MemberId.from("a"), selectorManager.leader()); assertEquals(MemberId.from("a"), selectorManager.leader());
assertEquals(3, selectorManager.servers().size()); assertEquals(3, selectorManager.members().size());
} }


} }
Expand Up @@ -133,7 +133,7 @@ public void testSelectLeader() throws Exception {


selectorManager.resetAll(MemberId.from("a"), Arrays.asList(MemberId.from("a"), MemberId.from("b"), MemberId.from("c"))); selectorManager.resetAll(MemberId.from("a"), Arrays.asList(MemberId.from("a"), MemberId.from("b"), MemberId.from("c")));
assertEquals(MemberId.from("a"), selector.leader()); assertEquals(MemberId.from("a"), selector.leader());
assertEquals(3, selector.servers().size()); assertEquals(3, selector.members().size());
assertTrue(selector.hasNext()); assertTrue(selector.hasNext());
assertNotNull(selector.next()); assertNotNull(selector.next());
assertFalse(selector.hasNext()); assertFalse(selector.hasNext());
Expand Down

0 comments on commit 3b8a95d

Please sign in to comment.