Skip to content

Commit

Permalink
Reify publication (elastic#21)
Browse files Browse the repository at this point in the history
Prior to this change, the `publish()` method comprises a deeply nested 
collection of lambdas and anonymous classes which represent the notion of a
single publication attempt. In future we want to treat it as a first-class
concept so we can detect when it fails etc.

This change gives names to the anonymous lambdas and classes as a step towards
this.
  • Loading branch information
DaveCTurner committed Jan 9, 2018
1 parent ef06205 commit 6d84f63
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 61 deletions.
186 changes: 127 additions & 59 deletions core/src/main/java/org/elasticsearch/discovery/zen2/Legislator.java
Expand Up @@ -44,11 +44,12 @@
import org.elasticsearch.discovery.zen2.Messages.StartVoteRequest;
import org.elasticsearch.discovery.zen2.Messages.Vote;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Random;
Expand Down Expand Up @@ -316,79 +317,145 @@ public void handleClientValue(Diff<T> diff) {
}

private void publish(PublishRequest<T> publishRequest) {
CatchupRequest<T> catchUp = new CatchupRequest<>(publishRequest.getTerm(), consensusState.generateCatchup());
AtomicReference<ApplyCommit> applyCommitReference = new AtomicReference<>();
nodeSupplier.get().forEach(n -> transport.sendPublishRequest(n, publishRequest,
new TransportResponseHandler<LegislatorPublishResponse>() {
@Override
public LegislatorPublishResponse read(StreamInput in) throws IOException {
return new LegislatorPublishResponse(in);
}
new Publication(publishRequest).start();
}

@Override
public void handleResponse(LegislatorPublishResponse response) {
if (response.getVote().isPresent()) {
handleVote(n, response.getVote().get());
}
if (response.isNeedsCatchup()) {
logger.debug("handleRequestCatchUp: sending catch-up to {} with slot {}", n, catchUp.getFullState().getSlot());
transport.sendCatchUp(n, catchUp, new TransportResponseHandler<PublishResponse>() {
@Override
public PublishResponse read(StreamInput in) throws IOException {
return new PublishResponse(in);
}
/**
* A single attempt to publish an update
*/
private class Publication {

@Override
public void handleResponse(PublishResponse response) {
handlePublishResponse(n, response);
}
private final CatchupRequest<T> catchUp;
private final AtomicReference<ApplyCommit> applyCommitReference;
private final List<PublicationTarget> publicationTargets;
private final PublishRequest<T> publishRequest;

@Override
public void handleException(TransportException exp) {logger.trace(
(Supplier<?>) () -> new ParameterizedMessage(
"handleCatchupResponse: failed to get catchup response from [{}]", n), exp);
}
private Publication(PublishRequest<T> publishRequest) {
this.publishRequest = publishRequest;
catchUp = new CatchupRequest<>(publishRequest.getTerm(), consensusState.generateCatchup());
applyCommitReference = new AtomicReference<>();

@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
}
if (response.getPublishResponse().isPresent()) {
handlePublishResponse(n, response.getPublishResponse().get());
}
final List<DiscoveryNode> discoveryNodes = nodeSupplier.get();
publicationTargets = new ArrayList<>(discoveryNodes.size());
discoveryNodes.stream().map(PublicationTarget::new).forEach(publicationTargets::add);
}

private void start() {
publicationTargets.forEach(PublicationTarget::sendPublishRequest);
}

private void onCommitted(final ApplyCommit applyCommit) {
assert applyCommitReference.get() == null;
applyCommitReference.set(applyCommit);
publicationTargets.forEach(PublicationTarget::sendApplyCommit);
}

private class PublicationTarget {
private final DiscoveryNode discoveryNode;

private PublicationTarget(DiscoveryNode discoveryNode) {
this.discoveryNode = discoveryNode;
}

public void sendPublishRequest() {
transport.sendPublishRequest(discoveryNode, publishRequest, new PublishResponseHandler());
}

void handlePublishResponse(DiscoveryNode sourceNode, PublishResponse publishResponse) {
logger.trace("handlePublishResponse: handling [{}] from [{}])", publishResponse, sourceNode);
void handlePublishResponse(PublishResponse publishResponse) {
logger.trace("handlePublishResponse: handling [{}] from [{}])", publishResponse, discoveryNode);
assert consensusState.getCurrentTerm() >= publishResponse.getTerm();
if (applyCommitReference.get() != null) {
transport.sendApplyCommit(n, applyCommitReference.get(),
new EmptyTransportResponseHandler(ThreadPool.Names.SAME));
sendApplyCommit();
} else {
Optional<ApplyCommit> optionalCommit = consensusState.handlePublishResponse(sourceNode, publishResponse);
if (optionalCommit.isPresent()) {
applyCommitReference.set(optionalCommit.get());
nodeSupplier.get().forEach(n -> transport.sendApplyCommit(n, optionalCommit.get(),
new EmptyTransportResponseHandler(ThreadPool.Names.SAME)));
}
Optional<ApplyCommit> optionalCommit = consensusState.handlePublishResponse(discoveryNode, publishResponse);
optionalCommit.ifPresent(Publication.this::onCommitted);
}
// TODO: handle negative votes and move to candidate if leader
}

@Override
public void handleException(TransportException exp) {
logger.trace(
(Supplier<?>) () -> new ParameterizedMessage(
"handlePublishResponse: failed to get publish response from [{}]", n), exp);
public void sendApplyCommit() {
ApplyCommit applyCommit = applyCommitReference.get();
assert applyCommit != null;
transport.sendApplyCommit(discoveryNode, applyCommit, new ApplyCommitResponseHandler());
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
private class PublishResponseHandler implements TransportResponseHandler<LegislatorPublishResponse> {
@Override
public LegislatorPublishResponse read(StreamInput in) throws IOException {
return new LegislatorPublishResponse(in);
}

@Override
public void handleResponse(LegislatorPublishResponse response) {
if (response.getVote().isPresent()) {
handleVote(discoveryNode, response.getVote().get());
}
if (response.isNeedsCatchup()) {
logger.debug("handleRequestCatchUp: sending catch-up to {} with slot {}",
discoveryNode, catchUp.getFullState().getSlot());
transport.sendCatchUp(discoveryNode, catchUp, new CatchUpResponseHandler());
}
response.getPublishResponse().ifPresent(PublicationTarget.this::handlePublishResponse);
}

@Override
public void handleException(TransportException exp) {
logger.trace(
(Supplier<?>) () -> new ParameterizedMessage(
"handlePublishResponse: failed to get publish response from [{}]", discoveryNode), exp);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

}
}));

private class CatchUpResponseHandler implements TransportResponseHandler<PublishResponse> {
@Override
public PublishResponse read(StreamInput in) throws IOException {
return new PublishResponse(in);
}

@Override
public void handleResponse(PublishResponse response) {
handlePublishResponse(response);
}

@Override
public void handleException(TransportException exp) {
logger.trace(
(Supplier<?>) () -> new ParameterizedMessage(
"handleCatchupResponse: failed to get catchup response from [{}]", discoveryNode), exp);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}

private class ApplyCommitResponseHandler implements TransportResponseHandler<TransportResponse.Empty> {

@Override
public void handleResponse(TransportResponse.Empty response) {
// TODO record success
}

@Override
public void handleException(TransportException exp) {
logger.trace(
(Supplier<?>) () -> new ParameterizedMessage(
"ApplyCommitResponseHandler: failed to get response from [{}]", discoveryNode), exp);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}
}
}

void handleVote(DiscoveryNode sourceNode, Vote vote) {
Expand Down Expand Up @@ -746,7 +813,8 @@ void sendPublishRequest(DiscoveryNode destination, PublishRequest<T> publishRequ
void sendHeartbeatRequest(DiscoveryNode destination, HeartbeatRequest heartbeatRequest,
TransportResponseHandler<HeartbeatResponse> responseHandler);

void sendApplyCommit(DiscoveryNode destination, ApplyCommit applyCommit, EmptyTransportResponseHandler responseHandler);
void sendApplyCommit(DiscoveryNode destination, ApplyCommit applyCommit,
TransportResponseHandler<TransportResponse.Empty> responseHandler);

void sendSeekVotes(DiscoveryNode destination, SeekVotes seekVotes, TransportResponseHandler<OfferVote> responseHandler);

Expand Down
Expand Up @@ -360,7 +360,7 @@ void sendHeartbeatRequestFrom(DiscoveryNode sender, DiscoveryNode destination, H
}

void sendApplyCommitFrom(DiscoveryNode sender, DiscoveryNode destination, ApplyCommit applyCommit,
EmptyTransportResponseHandler responseHandler) {
TransportResponseHandler<TransportResponse.Empty> responseHandler) {
sendTo(destination, e -> {
try {
e.handleApplyCommit(sender, applyCommit);
Expand Down Expand Up @@ -536,7 +536,7 @@ public void sendHeartbeatRequest(DiscoveryNode destination, HeartbeatRequest hea

@Override
public void sendApplyCommit(DiscoveryNode destination, ApplyCommit applyCommit,
EmptyTransportResponseHandler responseHandler) {
TransportResponseHandler<TransportResponse.Empty> responseHandler) {
if (isConnected) {
sendApplyCommitFrom(localNode, destination, applyCommit, responseHandler);
}
Expand Down

0 comments on commit 6d84f63

Please sign in to comment.