From 7f8b1bea25fbedc1c4d6b5db918578352cd71d0c Mon Sep 17 00:00:00 2001 From: Re'em Bensimhon Date: Mon, 25 Dec 2017 20:51:39 +0200 Subject: [PATCH] Remove I/O pool blocking sniffing call from onFailure callback, add a some logic around host exclusion --- .../elasticsearch/client/sniff/Sniffer.java | 73 +++++++++++++++---- .../client/sniff/SnifferBuilder.java | 15 +++- .../documentation/SnifferDocumentation.java | 3 +- docs/java-rest/low-level/sniffer.asciidoc | 3 +- 4 files changed, 75 insertions(+), 19 deletions(-) diff --git a/client/sniffer/src/main/java/org/elasticsearch/client/sniff/Sniffer.java b/client/sniffer/src/main/java/org/elasticsearch/client/sniff/Sniffer.java index c655babd9ed3d..c82a906aec03f 100644 --- a/client/sniffer/src/main/java/org/elasticsearch/client/sniff/Sniffer.java +++ b/client/sniffer/src/main/java/org/elasticsearch/client/sniff/Sniffer.java @@ -29,8 +29,11 @@ import java.io.IOException; import java.security.AccessController; import java.security.PrivilegedAction; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.Executors; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; @@ -53,15 +56,16 @@ public class Sniffer implements Closeable { private final Task task; - Sniffer(RestClient restClient, HostsSniffer hostsSniffer, long sniffInterval, long sniffAfterFailureDelay) { - this.task = new Task(hostsSniffer, restClient, sniffInterval, sniffAfterFailureDelay); + Sniffer(RestClient restClient, HostsSniffer hostsSniffer, long sniffInterval, long sniffAfterFailureDelay, int maxExcludedRounds) { + this.task = new Task(hostsSniffer, restClient, sniffInterval, sniffAfterFailureDelay, maxExcludedRounds); } /** * Triggers a new sniffing round and explicitly takes out the failed host provided as argument */ public void sniffOnFailure(HttpHost failedHost) { - this.task.sniffOnFailure(failedHost); + this.task.failedHosts.putIfAbsent(failedHost, 0L); + this.task.scheduleNextRun(0); } @Override @@ -75,15 +79,24 @@ private static class Task implements Runnable { private final long sniffIntervalMillis; private final long sniffAfterFailureDelayMillis; + private final int maxExcludedRounds; private final ScheduledExecutorService scheduledExecutorService; private final AtomicBoolean running = new AtomicBoolean(false); private ScheduledFuture scheduledFuture; + private ConcurrentHashMap failedHosts = new ConcurrentHashMap<>(); + + private Task( + HostsSniffer hostsSniffer, + RestClient restClient, + long sniffIntervalMillis, + long sniffAfterFailureDelayMillis, + int maxExcludedRounds) { - private Task(HostsSniffer hostsSniffer, RestClient restClient, long sniffIntervalMillis, long sniffAfterFailureDelayMillis) { this.hostsSniffer = hostsSniffer; this.restClient = restClient; this.sniffIntervalMillis = sniffIntervalMillis; this.sniffAfterFailureDelayMillis = sniffAfterFailureDelayMillis; + this.maxExcludedRounds = maxExcludedRounds; SnifferThreadFactory threadFactory = new SnifferThreadFactory(SNIFFER_THREAD_NAME); this.scheduledExecutorService = Executors.newScheduledThreadPool(1, threadFactory); scheduleNextRun(0); @@ -106,35 +119,63 @@ synchronized void scheduleNextRun(long delayMillis) { @Override public void run() { - sniff(null, sniffIntervalMillis); - } - - void sniffOnFailure(HttpHost failedHost) { - sniff(failedHost, sniffAfterFailureDelayMillis); + sniff(sniffIntervalMillis); } - void sniff(HttpHost excludeHost, long nextSniffDelayMillis) { + void sniff(long nextSniffDelayMillis) { if (running.compareAndSet(false, true)) { + long nextSniffDelay = nextSniffDelayMillis; try { List sniffedHosts = hostsSniffer.sniffHosts(); logger.debug("sniffed hosts: " + sniffedHosts); - if (excludeHost != null) { - sniffedHosts.remove(excludeHost); - } - if (sniffedHosts.isEmpty()) { + + List hostsFiltered = removeExcludedAndCycle(sniffedHosts); + logger.debug("sniffed hosts after filtering: " + sniffedHosts); + + if (hostsFiltered.isEmpty()) { logger.warn("no hosts to set, hosts will be updated at the next sniffing round"); } else { - this.restClient.setHosts(sniffedHosts.toArray(new HttpHost[sniffedHosts.size()])); + this.restClient.setHosts(hostsFiltered.toArray(new HttpHost[hostsFiltered.size()])); } } catch (Exception e) { logger.error("error while sniffing nodes", e); + nextSniffDelay = sniffAfterFailureDelayMillis; } finally { - scheduleNextRun(nextSniffDelayMillis); + scheduleNextRun(nextSniffDelay); running.set(false); } } } + /** + * Remove excluded hosts from the list of all sniffed hosts, and cycle through the map. Hosts in the map remain + * there for {@link org.elasticsearch.client.sniff.Sniffer.Task#maxExcludedRounds} cycles + * @param allHosts the list of all sniffed hosts + * @return a new list containing the remaining hosts + */ + private List removeExcludedAndCycle(List allHosts) { + final List excluded = Collections.list(failedHosts.keys()); + + if (excluded.isEmpty()) { + return allHosts; + } + + try { + List copy = new ArrayList<>(allHosts); + copy.removeAll(excluded); + return copy; + } finally { + for (HttpHost host : excluded) { + long excludedCycles = failedHosts.get(host) + 1; + if (excludedCycles >= maxExcludedRounds) { + failedHosts.remove(host); + } else { + failedHosts.put(host, excludedCycles); + } + } + } + } + synchronized void shutdown() { scheduledExecutorService.shutdown(); try { diff --git a/client/sniffer/src/main/java/org/elasticsearch/client/sniff/SnifferBuilder.java b/client/sniffer/src/main/java/org/elasticsearch/client/sniff/SnifferBuilder.java index 010a8a4a78d20..881c09d6762b3 100644 --- a/client/sniffer/src/main/java/org/elasticsearch/client/sniff/SnifferBuilder.java +++ b/client/sniffer/src/main/java/org/elasticsearch/client/sniff/SnifferBuilder.java @@ -30,10 +30,12 @@ public final class SnifferBuilder { public static final long DEFAULT_SNIFF_INTERVAL = TimeUnit.MINUTES.toMillis(5); public static final long DEFAULT_SNIFF_AFTER_FAILURE_DELAY = TimeUnit.MINUTES.toMillis(1); + public static final int DEFAULT_HOST_EXCLUDED_SNIFF_ROUNDS = 1; private final RestClient restClient; private long sniffIntervalMillis = DEFAULT_SNIFF_INTERVAL; private long sniffAfterFailureDelayMillis = DEFAULT_SNIFF_AFTER_FAILURE_DELAY; + private int maxExcludedRounds = DEFAULT_HOST_EXCLUDED_SNIFF_ROUNDS; private HostsSniffer hostsSniffer; /** @@ -79,6 +81,17 @@ public SnifferBuilder setHostsSniffer(HostsSniffer hostsSniffer) { return this; } + /** + * Sets the amount of future sniffing calls from which a host that had failed a request would be excluded. Will not + * be used if client wasn't built utilizing + * {@link org.elasticsearch.client.RestClientBuilder#setFailureListener(RestClient.FailureListener)} + * @param maxExcludedRounds the number of sniffing calls to exclude a host from + */ + public SnifferBuilder setMaxExcludedRounds(int maxExcludedRounds) { + this.maxExcludedRounds = maxExcludedRounds; + return this; + } + /** * Creates the {@link Sniffer} based on the provided configuration. */ @@ -86,6 +99,6 @@ public Sniffer build() { if (hostsSniffer == null) { this.hostsSniffer = new ElasticsearchHostsSniffer(restClient); } - return new Sniffer(restClient, hostsSniffer, sniffIntervalMillis, sniffAfterFailureDelayMillis); + return new Sniffer(restClient, hostsSniffer, sniffIntervalMillis, sniffAfterFailureDelayMillis, maxExcludedRounds); } } diff --git a/client/sniffer/src/test/java/org/elasticsearch/client/sniff/documentation/SnifferDocumentation.java b/client/sniffer/src/test/java/org/elasticsearch/client/sniff/documentation/SnifferDocumentation.java index 199632d478f81..4a95be5c11607 100644 --- a/client/sniffer/src/test/java/org/elasticsearch/client/sniff/documentation/SnifferDocumentation.java +++ b/client/sniffer/src/test/java/org/elasticsearch/client/sniff/documentation/SnifferDocumentation.java @@ -82,8 +82,9 @@ public void testUsage() throws IOException { .build(); Sniffer sniffer = Sniffer.builder(restClient) .setSniffAfterFailureDelayMillis(30000) // <2> + .setMaxExcludedRounds(3) // <3> .build(); - sniffOnFailureListener.setSniffer(sniffer); // <3> + sniffOnFailureListener.setSniffer(sniffer); // <4> //end::sniff-on-failure } { diff --git a/docs/java-rest/low-level/sniffer.asciidoc b/docs/java-rest/low-level/sniffer.asciidoc index df772643bf4dc..811a3a3671e7c 100644 --- a/docs/java-rest/low-level/sniffer.asciidoc +++ b/docs/java-rest/low-level/sniffer.asciidoc @@ -98,7 +98,8 @@ normal and we want to detect that as soon as possible. Said interval can be customized at `Sniffer` creation time through the `setSniffAfterFailureDelayMillis` method. Note that this last configuration parameter has no effect in case sniffing on failure is not enabled like explained above. -<3> Set the `Sniffer` instance to the failure listener +<3> Set the amount of sniffing rounds to exclude a failed host for +<4> Set the `Sniffer` instance to the failure listener The Elasticsearch Nodes Info api doesn't return the protocol to use when connecting to the nodes but only their `host:port` key-pair, hence `http`