From 6d84f63c5f332ac2659e49d12a979bea6297b6be Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 9 Jan 2018 22:55:07 +0000 Subject: [PATCH] Reify publication (#21) Prior to this change, the `publish()` method comprises a deeply nested collection of lambdas and anonymous classes which represent the notion of a single publication attempt. In future we want to treat it as a first-class concept so we can detect when it fails etc. This change gives names to the anonymous lambdas and classes as a step towards this. --- .../discovery/zen2/Legislator.java | 186 ++++++++++++------ .../discovery/zen2/LegislatorTests.java | 4 +- 2 files changed, 129 insertions(+), 61 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/discovery/zen2/Legislator.java b/core/src/main/java/org/elasticsearch/discovery/zen2/Legislator.java index d0aae2f50c12c..da841fcdb2b14 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen2/Legislator.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen2/Legislator.java @@ -44,11 +44,12 @@ import org.elasticsearch.discovery.zen2.Messages.StartVoteRequest; import org.elasticsearch.discovery.zen2.Messages.Vote; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.EmptyTransportResponseHandler; import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportResponseHandler; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.Random; @@ -316,79 +317,145 @@ public void handleClientValue(Diff diff) { } private void publish(PublishRequest publishRequest) { - CatchupRequest catchUp = new CatchupRequest<>(publishRequest.getTerm(), consensusState.generateCatchup()); - AtomicReference applyCommitReference = new AtomicReference<>(); - nodeSupplier.get().forEach(n -> transport.sendPublishRequest(n, publishRequest, - new TransportResponseHandler() { - @Override - public LegislatorPublishResponse read(StreamInput in) throws IOException { - return new LegislatorPublishResponse(in); - } + new Publication(publishRequest).start(); + } - @Override - public void handleResponse(LegislatorPublishResponse response) { - if (response.getVote().isPresent()) { - handleVote(n, response.getVote().get()); - } - if (response.isNeedsCatchup()) { - logger.debug("handleRequestCatchUp: sending catch-up to {} with slot {}", n, catchUp.getFullState().getSlot()); - transport.sendCatchUp(n, catchUp, new TransportResponseHandler() { - @Override - public PublishResponse read(StreamInput in) throws IOException { - return new PublishResponse(in); - } + /** + * A single attempt to publish an update + */ + private class Publication { - @Override - public void handleResponse(PublishResponse response) { - handlePublishResponse(n, response); - } + private final CatchupRequest catchUp; + private final AtomicReference applyCommitReference; + private final List publicationTargets; + private final PublishRequest publishRequest; - @Override - public void handleException(TransportException exp) {logger.trace( - (Supplier) () -> new ParameterizedMessage( - "handleCatchupResponse: failed to get catchup response from [{}]", n), exp); - } + private Publication(PublishRequest publishRequest) { + this.publishRequest = publishRequest; + catchUp = new CatchupRequest<>(publishRequest.getTerm(), consensusState.generateCatchup()); + applyCommitReference = new AtomicReference<>(); - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - }); - } - if (response.getPublishResponse().isPresent()) { - handlePublishResponse(n, response.getPublishResponse().get()); - } + final List discoveryNodes = nodeSupplier.get(); + publicationTargets = new ArrayList<>(discoveryNodes.size()); + discoveryNodes.stream().map(PublicationTarget::new).forEach(publicationTargets::add); + } + + private void start() { + publicationTargets.forEach(PublicationTarget::sendPublishRequest); + } + + private void onCommitted(final ApplyCommit applyCommit) { + assert applyCommitReference.get() == null; + applyCommitReference.set(applyCommit); + publicationTargets.forEach(PublicationTarget::sendApplyCommit); + } + + private class PublicationTarget { + private final DiscoveryNode discoveryNode; + + private PublicationTarget(DiscoveryNode discoveryNode) { + this.discoveryNode = discoveryNode; + } + + public void sendPublishRequest() { + transport.sendPublishRequest(discoveryNode, publishRequest, new PublishResponseHandler()); } - void handlePublishResponse(DiscoveryNode sourceNode, PublishResponse publishResponse) { - logger.trace("handlePublishResponse: handling [{}] from [{}])", publishResponse, sourceNode); + void handlePublishResponse(PublishResponse publishResponse) { + logger.trace("handlePublishResponse: handling [{}] from [{}])", publishResponse, discoveryNode); assert consensusState.getCurrentTerm() >= publishResponse.getTerm(); if (applyCommitReference.get() != null) { - transport.sendApplyCommit(n, applyCommitReference.get(), - new EmptyTransportResponseHandler(ThreadPool.Names.SAME)); + sendApplyCommit(); } else { - Optional optionalCommit = consensusState.handlePublishResponse(sourceNode, publishResponse); - if (optionalCommit.isPresent()) { - applyCommitReference.set(optionalCommit.get()); - nodeSupplier.get().forEach(n -> transport.sendApplyCommit(n, optionalCommit.get(), - new EmptyTransportResponseHandler(ThreadPool.Names.SAME))); - } + Optional optionalCommit = consensusState.handlePublishResponse(discoveryNode, publishResponse); + optionalCommit.ifPresent(Publication.this::onCommitted); } // TODO: handle negative votes and move to candidate if leader } - @Override - public void handleException(TransportException exp) { - logger.trace( - (Supplier) () -> new ParameterizedMessage( - "handlePublishResponse: failed to get publish response from [{}]", n), exp); + public void sendApplyCommit() { + ApplyCommit applyCommit = applyCommitReference.get(); + assert applyCommit != null; + transport.sendApplyCommit(discoveryNode, applyCommit, new ApplyCommitResponseHandler()); } - @Override - public String executor() { - return ThreadPool.Names.SAME; + private class PublishResponseHandler implements TransportResponseHandler { + @Override + public LegislatorPublishResponse read(StreamInput in) throws IOException { + return new LegislatorPublishResponse(in); + } + + @Override + public void handleResponse(LegislatorPublishResponse response) { + if (response.getVote().isPresent()) { + handleVote(discoveryNode, response.getVote().get()); + } + if (response.isNeedsCatchup()) { + logger.debug("handleRequestCatchUp: sending catch-up to {} with slot {}", + discoveryNode, catchUp.getFullState().getSlot()); + transport.sendCatchUp(discoveryNode, catchUp, new CatchUpResponseHandler()); + } + response.getPublishResponse().ifPresent(PublicationTarget.this::handlePublishResponse); + } + + @Override + public void handleException(TransportException exp) { + logger.trace( + (Supplier) () -> new ParameterizedMessage( + "handlePublishResponse: failed to get publish response from [{}]", discoveryNode), exp); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + } - })); + + private class CatchUpResponseHandler implements TransportResponseHandler { + @Override + public PublishResponse read(StreamInput in) throws IOException { + return new PublishResponse(in); + } + + @Override + public void handleResponse(PublishResponse response) { + handlePublishResponse(response); + } + + @Override + public void handleException(TransportException exp) { + logger.trace( + (Supplier) () -> new ParameterizedMessage( + "handleCatchupResponse: failed to get catchup response from [{}]", discoveryNode), exp); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + } + + private class ApplyCommitResponseHandler implements TransportResponseHandler { + + @Override + public void handleResponse(TransportResponse.Empty response) { + // TODO record success + } + + @Override + public void handleException(TransportException exp) { + logger.trace( + (Supplier) () -> new ParameterizedMessage( + "ApplyCommitResponseHandler: failed to get response from [{}]", discoveryNode), exp); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + } + } } void handleVote(DiscoveryNode sourceNode, Vote vote) { @@ -746,7 +813,8 @@ void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequ void sendHeartbeatRequest(DiscoveryNode destination, HeartbeatRequest heartbeatRequest, TransportResponseHandler responseHandler); - void sendApplyCommit(DiscoveryNode destination, ApplyCommit applyCommit, EmptyTransportResponseHandler responseHandler); + void sendApplyCommit(DiscoveryNode destination, ApplyCommit applyCommit, + TransportResponseHandler responseHandler); void sendSeekVotes(DiscoveryNode destination, SeekVotes seekVotes, TransportResponseHandler responseHandler); diff --git a/core/src/test/java/org/elasticsearch/discovery/zen2/LegislatorTests.java b/core/src/test/java/org/elasticsearch/discovery/zen2/LegislatorTests.java index d58d417098a4b..f8f7193c34cfa 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen2/LegislatorTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen2/LegislatorTests.java @@ -360,7 +360,7 @@ void sendHeartbeatRequestFrom(DiscoveryNode sender, DiscoveryNode destination, H } void sendApplyCommitFrom(DiscoveryNode sender, DiscoveryNode destination, ApplyCommit applyCommit, - EmptyTransportResponseHandler responseHandler) { + TransportResponseHandler responseHandler) { sendTo(destination, e -> { try { e.handleApplyCommit(sender, applyCommit); @@ -536,7 +536,7 @@ public void sendHeartbeatRequest(DiscoveryNode destination, HeartbeatRequest hea @Override public void sendApplyCommit(DiscoveryNode destination, ApplyCommit applyCommit, - EmptyTransportResponseHandler responseHandler) { + TransportResponseHandler responseHandler) { if (isConnected) { sendApplyCommitFrom(localNode, destination, applyCommit, responseHandler); }