Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow joining node to trigger term bump #53338

Merged
merged 2 commits into from
Mar 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,8 @@ public void onFailure(Exception e) {
private void processJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback joinCallback) {
final Optional<Join> optionalJoin = joinRequest.getOptionalJoin();
synchronized (mutex) {
updateMaxTermSeen(joinRequest.getTerm());

final CoordinationState coordState = coordinationState.get();
final boolean prevElectionWon = coordState.electionWon();

Expand Down Expand Up @@ -1115,7 +1117,7 @@ private class CoordinatorPeerFinder extends PeerFinder {
protected void onActiveMasterFound(DiscoveryNode masterNode, long term) {
synchronized (mutex) {
ensureTermAtLeast(masterNode, term);
joinHelper.sendJoinRequest(masterNode, joinWithDestination(lastJoin, masterNode, term));
joinHelper.sendJoinRequest(masterNode, getCurrentTerm(), joinWithDestination(lastJoin, masterNode, term));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public ClusterTasksResult<JoinTaskExecutor.Task> execute(ClusterState currentSta
StartJoinRequest::new,
(request, channel, task) -> {
final DiscoveryNode destination = request.getSourceNode();
sendJoinRequest(destination, Optional.of(joinLeaderInTerm.apply(request)));
sendJoinRequest(destination, currentTermSupplier.getAsLong(), Optional.of(joinLeaderInTerm.apply(request)));
channel.sendResponse(Empty.INSTANCE);
});

Expand Down Expand Up @@ -230,9 +230,9 @@ void logLastFailedJoinAttempt() {
}
}

public void sendJoinRequest(DiscoveryNode destination, Optional<Join> optionalJoin) {
public void sendJoinRequest(DiscoveryNode destination, long term, Optional<Join> optionalJoin) {
assert destination.isMasterNode() : "trying to join master-ineligible " + destination;
final JoinRequest joinRequest = new JoinRequest(transportService.getLocalNode(), optionalJoin);
final JoinRequest joinRequest = new JoinRequest(transportService.getLocalNode(), term, optionalJoin);
final Tuple<DiscoveryNode, JoinRequest> dedupKey = Tuple.tuple(destination, joinRequest);
if (pendingOutgoingJoins.add(dedupKey)) {
logger.debug("attempting to join {} with {}", destination, joinRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,43 +18,81 @@
*/
package org.elasticsearch.cluster.coordination;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.TransportRequest;

import java.io.IOException;
import java.util.Objects;
import java.util.Optional;

public class JoinRequest extends TransportRequest {

/**
* The sending (i.e. joining) node.
*/
private final DiscoveryNode sourceNode;

/**
* The minimum term for which the joining node will accept any cluster state publications. If the joining node is in a strictly greater
* term than the master it wants to join then the master must enter a new term and hold another election. Doesn't necessarily match
* {@link JoinRequest#optionalJoin} and may be zero in join requests sent prior to {@link Version#V_8_0_0}.
*/
private final long minimumTerm;

/**
* A vote for the receiving node. This vote is optional since the sending node may have voted for a different master in this term.
* That's ok, the sender likely discovered that the master we voted for lost the election and now we're trying to join the winner. Once
* the sender has successfully joined the master, the lack of a vote in its term causes another election (see
* {@link Publication#onMissingJoin(DiscoveryNode)}).
*/
private final Optional<Join> optionalJoin;

public JoinRequest(DiscoveryNode sourceNode, Optional<Join> optionalJoin) {
public JoinRequest(DiscoveryNode sourceNode, long minimumTerm, Optional<Join> optionalJoin) {
assert optionalJoin.isPresent() == false || optionalJoin.get().getSourceNode().equals(sourceNode);
this.sourceNode = sourceNode;
this.minimumTerm = minimumTerm;
this.optionalJoin = optionalJoin;
}

public JoinRequest(StreamInput in) throws IOException {
super(in);
sourceNode = new DiscoveryNode(in);
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
minimumTerm = in.readLong();
} else {
minimumTerm = 0L;
}
optionalJoin = Optional.ofNullable(in.readOptionalWriteable(Join::new));
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
sourceNode.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeLong(minimumTerm);
}
out.writeOptionalWriteable(optionalJoin.orElse(null));
}

public DiscoveryNode getSourceNode() {
return sourceNode;
}

public long getMinimumTerm() {
return minimumTerm;
}

public long getTerm() {
// If the join is also present then its term will normally equal the corresponding term, but we do not require callers to
// obtain the term and the join in a synchronized fashion so it's possible that they disagree. Also older nodes do not share the
// minimum term, so for BWC we can take it from the join if present.
return Math.max(minimumTerm, optionalJoin.map(Join::getTerm).orElse(0L));
}

public Optional<Join> getOptionalJoin() {
return optionalJoin;
}
Expand All @@ -66,21 +104,21 @@ public boolean equals(Object o) {

JoinRequest that = (JoinRequest) o;

if (minimumTerm != that.minimumTerm) return false;
if (!sourceNode.equals(that.sourceNode)) return false;
return optionalJoin.equals(that.optionalJoin);
}

@Override
public int hashCode() {
int result = sourceNode.hashCode();
result = 31 * result + optionalJoin.hashCode();
return result;
return Objects.hash(sourceNode, minimumTerm, optionalJoin);
}

@Override
public String toString() {
return "JoinRequest{" +
"sourceNode=" + sourceNode +
", minimumTerm=" + minimumTerm +
", optionalJoin=" + optionalJoin +
'}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void testJoinDeduplication() {
// check that sending a join to node1 works
Optional<Join> optionalJoin1 = randomBoolean() ? Optional.empty() :
Optional.of(new Join(localNode, node1, randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()));
joinHelper.sendJoinRequest(node1, optionalJoin1);
joinHelper.sendJoinRequest(node1, 0L, optionalJoin1);
CapturedRequest[] capturedRequests1 = capturingTransport.getCapturedRequestsAndClear();
assertThat(capturedRequests1.length, equalTo(1));
CapturedRequest capturedRequest1 = capturedRequests1[0];
Expand All @@ -79,14 +79,14 @@ public void testJoinDeduplication() {
// check that sending a join to node2 works
Optional<Join> optionalJoin2 = randomBoolean() ? Optional.empty() :
Optional.of(new Join(localNode, node2, randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()));
joinHelper.sendJoinRequest(node2, optionalJoin2);
joinHelper.sendJoinRequest(node2, 0L, optionalJoin2);
CapturedRequest[] capturedRequests2 = capturingTransport.getCapturedRequestsAndClear();
assertThat(capturedRequests2.length, equalTo(1));
CapturedRequest capturedRequest2 = capturedRequests2[0];
assertEquals(node2, capturedRequest2.node);

// check that sending another join to node1 is a noop as the previous join is still in progress
joinHelper.sendJoinRequest(node1, optionalJoin1);
joinHelper.sendJoinRequest(node1, 0L, optionalJoin1);
assertThat(capturingTransport.getCapturedRequestsAndClear().length, equalTo(0));

// complete the previous join to node1
Expand All @@ -97,7 +97,7 @@ public void testJoinDeduplication() {
}

// check that sending another join to node1 now works again
joinHelper.sendJoinRequest(node1, optionalJoin1);
joinHelper.sendJoinRequest(node1, 0L, optionalJoin1);
CapturedRequest[] capturedRequests1a = capturingTransport.getCapturedRequestsAndClear();
assertThat(capturedRequests1a.length, equalTo(1));
CapturedRequest capturedRequest1a = capturedRequests1a[0];
Expand All @@ -106,7 +106,7 @@ public void testJoinDeduplication() {
// check that sending another join to node2 works if the optionalJoin is different
Optional<Join> optionalJoin2a = optionalJoin2.isPresent() && randomBoolean() ? Optional.empty() :
Optional.of(new Join(localNode, node2, randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()));
joinHelper.sendJoinRequest(node2, optionalJoin2a);
joinHelper.sendJoinRequest(node2, 0L, optionalJoin2a);
CapturedRequest[] capturedRequests2a = capturingTransport.getCapturedRequestsAndClear();
assertThat(capturedRequests2a.length, equalTo(1));
CapturedRequest capturedRequest2a = capturedRequests2a[0];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,18 @@ public void testJoinRequestEqualsHashCodeSerialization() {
Join initialJoin = new Join(createNode(randomAlphaOfLength(10)), createNode(randomAlphaOfLength(10)), randomNonNegativeLong(),
randomNonNegativeLong(), randomNonNegativeLong());
JoinRequest initialJoinRequest = new JoinRequest(initialJoin.getSourceNode(),
randomBoolean() ? Optional.empty() : Optional.of(initialJoin));
randomNonNegativeLong(), randomBoolean() ? Optional.empty() : Optional.of(initialJoin));
// Note: the explicit cast of the CopyFunction is needed for some IDE (specifically Eclipse 4.8.0) to infer the right type
EqualsHashCodeTestUtils.checkEqualsAndHashCode(initialJoinRequest,
(CopyFunction<JoinRequest>) joinRequest -> copyWriteable(joinRequest, writableRegistry(), JoinRequest::new),
joinRequest -> {
if (randomBoolean() && joinRequest.getOptionalJoin().isPresent() == false) {
return new JoinRequest(createNode(randomAlphaOfLength(20)), joinRequest.getOptionalJoin());
return new JoinRequest(createNode(randomAlphaOfLength(10)),
joinRequest.getMinimumTerm(), joinRequest.getOptionalJoin());
} else if (randomBoolean()) {
return new JoinRequest(joinRequest.getSourceNode(),
randomValueOtherThan(joinRequest.getMinimumTerm(), ESTestCase::randomNonNegativeLong),
joinRequest.getOptionalJoin());
} else {
// change OptionalJoin
final Optional<Join> newOptionalJoin;
Expand All @@ -195,7 +200,7 @@ public void testJoinRequestEqualsHashCodeSerialization() {
newOptionalJoin = Optional.of(new Join(joinRequest.getSourceNode(), createNode(randomAlphaOfLength(10)),
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()));
}
return new JoinRequest(joinRequest.getSourceNode(), newOptionalJoin);
return new JoinRequest(joinRequest.getSourceNode(), joinRequest.getMinimumTerm(), newOptionalJoin);
}
});
}
Expand Down
Loading