From 90cf0db27fbd04b68ffe5fd1d55f1080e73387de Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Wed, 31 Aug 2022 10:08:23 -0400 Subject: [PATCH 1/8] feat: add possibility to add an exception handler to Informers --- .../informers/InformerExceptionHandler.java | 9 +++++ .../client/informers/SharedIndexInformer.java | 5 ++- .../dsl/internal/AbstractWatchManager.java | 9 +++-- .../impl/DefaultSharedIndexInformer.java | 13 ++++++- .../informers/impl/cache/Reflector.java | 39 ++++++++++++++----- 5 files changed, 61 insertions(+), 14 deletions(-) create mode 100644 kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/InformerExceptionHandler.java diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/InformerExceptionHandler.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/InformerExceptionHandler.java new file mode 100644 index 00000000000..70d026ac6bb --- /dev/null +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/InformerExceptionHandler.java @@ -0,0 +1,9 @@ +package io.fabric8.kubernetes.client.informers; + +import io.fabric8.kubernetes.client.WatcherException; + +public interface InformerExceptionHandler { + + void onWatchNonrecoverable(WatcherException e); + +} diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java index d7f5902fcb7..0919e69def4 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java @@ -82,7 +82,8 @@ default SharedIndexInformer removeNamespaceIndex() { * @param handle the event handler * @param resyncPeriod the specific resync period */ - SharedIndexInformer addEventHandlerWithResyncPeriod(ResourceEventHandler handle, long resyncPeriod); + SharedIndexInformer addEventHandlerWithResyncPeriod(ResourceEventHandler handle, + long resyncPeriod); /** * Starts the shared informer, which will be stopped when {@link #stop()} is called. @@ -165,4 +166,6 @@ default boolean hasSynced() { */ CompletableFuture start(); + default void setExceptionHandler(InformerExceptionHandler handler) { + } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java index 17caf47d518..ca68b92a34a 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java @@ -303,11 +303,14 @@ protected void onMessage(String message) { logger.error("Unknown message received: {}", message); } } catch (ClassCastException e) { - logger.error("Received wrong type of object for watch", e); + final String msg = "Received wrong type of object for watch"; + close(new WatcherException(msg, e)); } catch (IllegalArgumentException e) { - logger.error("Invalid event type", e); + final String msg = "Invalid event type"; + close(new WatcherException(msg, e)); } catch (Exception e) { - logger.error("Unhandled exception encountered in watcher event handler", e); + final String msg = "Unhandled exception encountered in watcher event handler"; + close(new WatcherException(msg, e)); } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java index 5178879ebd4..1a27f7ece20 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java @@ -18,6 +18,7 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.KubernetesResourceList; import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.informers.InformerExceptionHandler; import io.fabric8.kubernetes.client.informers.ResourceEventHandler; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import io.fabric8.kubernetes.client.informers.cache.Indexer; @@ -72,6 +73,8 @@ public class DefaultSharedIndexInformer initialState; + private InformerExceptionHandler exceptionHandler; + public DefaultSharedIndexInformer(Class apiTypeClass, ListerWatcher listerWatcher, long resyncPeriod, Executor informerExecutor) { if (resyncPeriod < 0) { @@ -87,7 +90,11 @@ public DefaultSharedIndexInformer(Class apiTypeClass, ListerWatcher lis this.processor = new SharedProcessor<>(informerExecutor, description); processorStore = new ProcessorStore<>(this.indexer, this.processor); - this.reflector = new Reflector<>(listerWatcher, processorStore); + this.reflector = new Reflector<>(listerWatcher, processorStore, this::getExceptionHandler); + } + + public InformerExceptionHandler getExceptionHandler() { + return exceptionHandler; } /** @@ -289,4 +296,8 @@ public String toString() { return this.description; } + @Override + public void setExceptionHandler(InformerExceptionHandler handler) { + this.exceptionHandler = handler; + } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java index 52d9126f423..ab6aa5827c4 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java @@ -22,6 +22,7 @@ import io.fabric8.kubernetes.client.Watch; import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.WatcherException; +import io.fabric8.kubernetes.client.informers.InformerExceptionHandler; import io.fabric8.kubernetes.client.informers.impl.ListerWatcher; import io.fabric8.kubernetes.client.utils.Utils; import org.slf4j.Logger; @@ -34,6 +35,7 @@ import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; public class Reflector> { @@ -49,9 +51,14 @@ public class Reflector reconnectFuture; public Reflector(ListerWatcher listerWatcher, SyncableStore store) { + this(listerWatcher, store, null); + } + + public Reflector(ListerWatcher listerWatcher, SyncableStore store, + Supplier exceptionHandlerSupplier) { this.listerWatcher = listerWatcher; this.store = store; - this.watcher = new ReflectorWatcher(); + this.watcher = new ReflectorWatcher(exceptionHandlerSupplier); } public CompletableFuture start() { @@ -119,7 +126,9 @@ public CompletableFuture listSyncAndWatch() { private CompletableFuture processList(Set nextKeys, String continueVal) { CompletableFuture futureResult = listerWatcher - .submitList(new ListOptionsBuilder().withLimit(listerWatcher.getLimit()).withContinue(continueVal).build()); + .submitList( + new ListOptionsBuilder().withLimit(listerWatcher.getLimit()).withContinue(continueVal) + .build()); return futureResult.thenCompose(result -> { result.getItems().forEach(i -> { @@ -147,7 +156,8 @@ private synchronized CompletableFuture startWatcher(final String latestRe } log.debug("Starting watcher for {} at v{}", this, latestResourceVersion); // there's no need to stop the old watch, that will happen automatically when this call completes - watchFuture = listerWatcher.submitWatch(new ListOptionsBuilder().withResourceVersion(latestResourceVersion) + watchFuture = listerWatcher.submitWatch( + new ListOptionsBuilder().withResourceVersion(latestResourceVersion) .withTimeoutSeconds(null) .build(), watcher); return watchFuture; @@ -171,6 +181,13 @@ public boolean isWatching() { class ReflectorWatcher implements Watcher { + private final Supplier exceptionHandlerSupplier; + + ReflectorWatcher(Supplier exceptionHandlerSupplier) { + this.exceptionHandlerSupplier = exceptionHandlerSupplier; + } + + @Override public void eventReceived(Action action, T resource) { if (action == null) { @@ -180,8 +197,9 @@ public void eventReceived(Action action, T resource) { throw new KubernetesClientException("Unrecognized resource for " + Reflector.this); } if (log.isDebugEnabled()) { - log.debug("Event received {} {} resourceVersion v{} for {}", action.name(), resource.getKind(), - resource.getMetadata().getResourceVersion(), Reflector.this); + log.debug("Event received {} {} resourceVersion v{} for {}", action.name(), + resource.getKind(), + resource.getMetadata().getResourceVersion(), Reflector.this); } switch (action) { case ERROR: @@ -215,13 +233,18 @@ public void onClose(WatcherException exception) { // start a whole new list/watch cycle, can be run in the scheduler thread because // any further operations will happen on the io thread reconnectFuture = Utils.schedule(Runnable::run, Reflector.this::listSyncAndWatch, - listerWatcher.getWatchReconnectInterval(), TimeUnit.MILLISECONDS); + listerWatcher.getWatchReconnectInterval(), TimeUnit.MILLISECONDS); } }); restarted = true; } else { - log.warn("Watch closing with exception for {}", Reflector.this, exception); running = false; // shouldn't happen, but it means the watch won't restart + InformerExceptionHandler handler = exceptionHandlerSupplier.get(); + if (handler != null) { + handler.onWatchNonrecoverable(exception); + } else { + log.warn("Watch closing with exception for {}", Reflector.this, exception); + } } } finally { if (!restarted) { @@ -240,7 +263,6 @@ public void onClose() { public boolean reconnecting() { return true; } - } ReflectorWatcher getWatcher() { @@ -251,5 +273,4 @@ ReflectorWatcher getWatcher() { public String toString() { return listerWatcher.getApiEndpointPath(); } - } From 69aa906f68894fed7462757a60ab826d340cee96 Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Fri, 2 Sep 2022 07:58:01 -0400 Subject: [PATCH 2/8] fix #4369: allowing the handler to determine if the stop or retry --- .../informers/InformerExceptionHandler.java | 25 +++++- .../client/informers/SharedIndexInformer.java | 2 +- .../informers/impl/cache/Reflector.java | 81 ++++++++++--------- 3 files changed, 68 insertions(+), 40 deletions(-) diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/InformerExceptionHandler.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/InformerExceptionHandler.java index 70d026ac6bb..21fe65ad922 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/InformerExceptionHandler.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/InformerExceptionHandler.java @@ -1,9 +1,32 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package io.fabric8.kubernetes.client.informers; import io.fabric8.kubernetes.client.WatcherException; public interface InformerExceptionHandler { - void onWatchNonrecoverable(WatcherException e); + /** + * Provides a callback when the informer could terminated with a non-recoverable exception. + * + * @param t the {@link Throwable}, which may occur as either the cause of a non-http gone {@link WatcherException} or an + * exception from calling list or watch + * @return true if the informer should retry, false if it should stop + */ + boolean retry(Throwable t); } diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java index 0919e69def4..3250cac2ed9 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java @@ -83,7 +83,7 @@ default SharedIndexInformer removeNamespaceIndex() { * @param resyncPeriod the specific resync period */ SharedIndexInformer addEventHandlerWithResyncPeriod(ResourceEventHandler handle, - long resyncPeriod); + long resyncPeriod); /** * Starts the shared informer, which will be stopped when {@link #stop()} is called. diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java index ab6aa5827c4..20db9b9a7f4 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java @@ -48,14 +48,14 @@ public class Reflector watchFuture; - private volatile Future reconnectFuture; + private volatile CompletableFuture reconnectFuture; public Reflector(ListerWatcher listerWatcher, SyncableStore store) { this(listerWatcher, store, null); } public Reflector(ListerWatcher listerWatcher, SyncableStore store, - Supplier exceptionHandlerSupplier) { + Supplier exceptionHandlerSupplier) { this.listerWatcher = listerWatcher; this.store = store; this.watcher = new ReflectorWatcher(exceptionHandlerSupplier); @@ -126,9 +126,9 @@ public CompletableFuture listSyncAndWatch() { private CompletableFuture processList(Set nextKeys, String continueVal) { CompletableFuture futureResult = listerWatcher - .submitList( - new ListOptionsBuilder().withLimit(listerWatcher.getLimit()).withContinue(continueVal) - .build()); + .submitList( + new ListOptionsBuilder().withLimit(listerWatcher.getLimit()).withContinue(continueVal) + .build()); return futureResult.thenCompose(result -> { result.getItems().forEach(i -> { @@ -157,9 +157,10 @@ private synchronized CompletableFuture startWatcher(final String latestRe log.debug("Starting watcher for {} at v{}", this, latestResourceVersion); // there's no need to stop the old watch, that will happen automatically when this call completes watchFuture = listerWatcher.submitWatch( - new ListOptionsBuilder().withResourceVersion(latestResourceVersion) - .withTimeoutSeconds(null) - .build(), watcher); + new ListOptionsBuilder().withResourceVersion(latestResourceVersion) + .withTimeoutSeconds(null) + .build(), + watcher); return watchFuture; } @@ -187,7 +188,6 @@ class ReflectorWatcher implements Watcher { this.exceptionHandlerSupplier = exceptionHandlerSupplier; } - @Override public void eventReceived(Action action, T resource) { if (action == null) { @@ -198,8 +198,8 @@ public void eventReceived(Action action, T resource) { } if (log.isDebugEnabled()) { log.debug("Event received {} {} resourceVersion v{} for {}", action.name(), - resource.getKind(), - resource.getMetadata().getResourceVersion(), Reflector.this); + resource.getKind(), + resource.getMetadata().getResourceVersion(), Reflector.this); } switch (action) { case ERROR: @@ -221,38 +221,43 @@ public void eventReceived(Action action, T resource) { public void onClose(WatcherException exception) { // this close was triggered by an exception, // not the user, it is expected that the watch retry will handle this - boolean restarted = false; - try { - if (exception.isHttpGone()) { - if (log.isDebugEnabled()) { - log.debug("Watch restarting due to http gone for {}", Reflector.this); - } - listSyncAndWatch().whenComplete((v, t) -> { - if (t != null) { - watchStopped(); - // start a whole new list/watch cycle, can be run in the scheduler thread because - // any further operations will happen on the io thread - reconnectFuture = Utils.schedule(Runnable::run, Reflector.this::listSyncAndWatch, - listerWatcher.getWatchReconnectInterval(), TimeUnit.MILLISECONDS); - } - }); - restarted = true; - } else { - running = false; // shouldn't happen, but it means the watch won't restart - InformerExceptionHandler handler = exceptionHandlerSupplier.get(); - if (handler != null) { - handler.onWatchNonrecoverable(exception); - } else { - log.warn("Watch closing with exception for {}", Reflector.this, exception); - } + watchStopped(); + InformerExceptionHandler handler = exceptionHandlerSupplier.get(); + boolean reconnect = false; + if (exception.isHttpGone()) { + if (log.isDebugEnabled()) { + log.debug("Watch restarting due to http gone for {}", Reflector.this); } - } finally { - if (!restarted) { - watchStopped(); // report the watch as stopped after a problem + reconnect = true; + } else if (handler != null) { + reconnect = handler.retry(exception.getCause()); + } + if (reconnect) { + // start a whole new list/watch cycle + reconnect(); + } else { + running = false; // shouldn't happen, but it means the watch won't restart + if (handler == null) { + log.warn("Watch closing with exception for {}", Reflector.this, exception); } } } + private void reconnect() { + // this can be run in the scheduler thread because + // any further operations will happen on the io thread + reconnectFuture = Utils.schedule(Runnable::run, Reflector.this::listSyncAndWatch, + listerWatcher.getWatchReconnectInterval(), TimeUnit.MILLISECONDS); + reconnectFuture.whenComplete((v, t) -> { + if (t != null) { + InformerExceptionHandler handler = exceptionHandlerSupplier.get(); + if (handler == null || handler.retry(t)) { + reconnect(); + } + } + }); + } + @Override public void onClose() { watchStopped(); From 401b2e3069424b91bed5c52bec456a41ce11ff16 Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Tue, 6 Sep 2022 08:59:22 -0400 Subject: [PATCH 3/8] fix #4369: refining the exception handler and adding back backoff --- .../informers/InformerExceptionHandler.java | 41 ++++++++++--- .../dsl/internal/AbstractWatchManager.java | 12 +--- .../client/dsl/internal/OperationSupport.java | 17 ++---- .../dsl/internal/WatchConnectionManager.java | 4 +- .../impl/DefaultSharedIndexInformer.java | 11 +--- .../informers/impl/cache/Reflector.java | 58 +++++++++++-------- .../informers/impl/cache/SharedProcessor.java | 18 +++++- .../ExponentialBackoffIntervalCalculator.java | 23 ++++++++ 8 files changed, 122 insertions(+), 62 deletions(-) diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/InformerExceptionHandler.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/InformerExceptionHandler.java index 21fe65ad922..7d828744c02 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/InformerExceptionHandler.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/InformerExceptionHandler.java @@ -16,17 +16,44 @@ package io.fabric8.kubernetes.client.informers; -import io.fabric8.kubernetes.client.WatcherException; - public interface InformerExceptionHandler { + public enum EventType { + /** + * an exception that occurs trying to perform the list or watch operation. The default handling is to log the exception. + */ + LIST_OR_WATCH, + /** + * an exception that occurs invoking a {@link ResourceEventHandler} method. The default handling is to log the exception. + */ + HANDLER + } + /** - * Provides a callback when the informer could terminated with a non-recoverable exception. + * Determine if the informer should stop given from a non-http gone WatchException cause. + *

+ * The default behavior is to terminate as we cannot guarantee if the state is still correct * - * @param t the {@link Throwable}, which may occur as either the cause of a non-http gone {@link WatcherException} or an - * exception from calling list or watch - * @return true if the informer should retry, false if it should stop + * @param t the non-http gone WatchException cause + * @return true if the informer should stop, false if it should attempt to keep running + */ + default boolean shouldStop(Throwable t) { + return true; + } + + /** + * Override the default handling of exceptions seen while the informer is running. + *

+ * If you want to stop the informer as a side-effect of this call, then construct your implementation + * of this class with a reference to the informer then call the stop method. */ - boolean retry(Throwable t); + void onException(EventType eventType, Throwable t); + + /** + * Called after each time the list, sync and watch operations have been successful. + */ + default void onWatching() { + + } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java index ca68b92a34a..8c6ae32a6bd 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java @@ -44,7 +44,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -60,7 +59,6 @@ public abstract class AbstractWatchManager implements Wat final AtomicBoolean forceClosed; private final int reconnectLimit; private final ExponentialBackoffIntervalCalculator retryIntervalCalculator; - final AtomicInteger currentReconnectAttempt; private Future reconnectAttempt; protected final HttpClient client; @@ -77,7 +75,6 @@ public abstract class AbstractWatchManager implements Wat this.reconnectLimit = reconnectLimit; this.retryIntervalCalculator = new ExponentialBackoffIntervalCalculator(reconnectInterval, maxIntervalExponent); this.resourceVersion = new AtomicReference<>(listOptions.getResourceVersion()); - this.currentReconnectAttempt = new AtomicInteger(0); this.forceClosed = new AtomicBoolean(); this.receiveBookmarks = Boolean.TRUE.equals(listOptions.getAllowWatchBookmarks()); // opt into bookmarks by default @@ -164,18 +161,15 @@ synchronized void reconnect() { } final boolean cannotReconnect() { - return !watcher.reconnecting() && currentReconnectAttempt.get() >= reconnectLimit && reconnectLimit >= 0; + return !watcher.reconnecting() && retryIntervalCalculator.getCurrentReconnectAttempt() >= reconnectLimit && reconnectLimit >= 0; } final long nextReconnectInterval() { - int exponentOfTwo = currentReconnectAttempt.getAndIncrement(); - long ret = retryIntervalCalculator.getInterval(exponentOfTwo); - logger.debug("Current reconnect backoff is {} milliseconds (T{})", ret, exponentOfTwo); - return ret; + return retryIntervalCalculator.nextReconnectInterval(); } void resetReconnectAttempts() { - currentReconnectAttempt.set(0); + retryIntervalCalculator.resetReconnectAttempts(); } boolean isForceClosed() { diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/OperationSupport.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/OperationSupport.java index 3a2f322363a..f86e74544f8 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/OperationSupport.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/OperationSupport.java @@ -60,7 +60,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; public class OperationSupport { @@ -83,7 +82,6 @@ public class OperationSupport { protected String apiGroupName; protected String apiGroupVersion; protected boolean dryRun; - private final ExponentialBackoffIntervalCalculator retryIntervalCalculator; private final int requestRetryBackoffLimit; public OperationSupport(Client client) { @@ -107,16 +105,11 @@ public OperationSupport(OperationContext ctx) { this.apiGroupVersion = "v1"; } - final int requestRetryBackoffInterval; if (ctx.getConfig() != null) { - requestRetryBackoffInterval = ctx.getConfig().getRequestRetryBackoffInterval(); this.requestRetryBackoffLimit = ctx.getConfig().getRequestRetryBackoffLimit(); } else { - requestRetryBackoffInterval = Config.DEFAULT_REQUEST_RETRY_BACKOFFINTERVAL; this.requestRetryBackoffLimit = Config.DEFAULT_REQUEST_RETRY_BACKOFFLIMIT; } - this.retryIntervalCalculator = new ExponentialBackoffIntervalCalculator(requestRetryBackoffInterval, - MAX_RETRY_INTERVAL_EXPONENT); } public String getAPIGroupName() { @@ -574,7 +567,7 @@ protected CompletableFuture handleResponse(HttpClient client, HttpRequest VersionUsageUtils.log(this.resourceT, this.apiGroupVersion); HttpRequest request = requestBuilder.build(); CompletableFuture> futureResponse = new CompletableFuture<>(); - retryWithExponentialBackoff(futureResponse, new AtomicInteger(), Utils.getNonNullOrElse(client, httpClient), request); + retryWithExponentialBackoff(futureResponse, new ExponentialBackoffIntervalCalculator(requestRetryBackoffLimit, MAX_RETRY_INTERVAL_EXPONENT), Utils.getNonNullOrElse(client, httpClient), request); return futureResponse.thenApply(response -> { try { @@ -593,13 +586,13 @@ protected CompletableFuture handleResponse(HttpClient client, HttpRequest } protected void retryWithExponentialBackoff(CompletableFuture> result, - AtomicInteger numRetries, + ExponentialBackoffIntervalCalculator retryIntervalCalculator, HttpClient client, HttpRequest request) { client.sendAsync(request, byte[].class) .whenComplete((response, throwable) -> { - int retries = numRetries.getAndIncrement(); + int retries = retryIntervalCalculator.getCurrentReconnectAttempt(); if (retries < requestRetryBackoffLimit) { - long retryInterval = retryIntervalCalculator.getInterval(retries); + long retryInterval = retryIntervalCalculator.nextReconnectInterval(); boolean retry = false; if (response != null && response.code() >= 500) { LOG.debug("HTTP operation on url: {} should be retried as the response code was {}, retrying after {} millis", @@ -612,7 +605,7 @@ protected void retryWithExponentialBackoff(CompletableFuture retryWithExponentialBackoff(result, numRetries, client, request), retryInterval, TimeUnit.MILLISECONDS); + () -> retryWithExponentialBackoff(result, retryIntervalCalculator, client, request), retryInterval, TimeUnit.MILLISECONDS); return; } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java index 33c43ee4568..0d9fcf23fae 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java @@ -45,6 +45,8 @@ public class WatchConnectionManager> extends AbstractWatchManager { + public static final int BACKOFF_MAX_EXPONENT = 5; + private static final Logger logger = LoggerFactory.getLogger(WatchConnectionManager.class); protected WatcherWebSocketListener listener; @@ -78,7 +80,7 @@ public WatchConnectionManager(final HttpClient client, final BaseOperation watcher, final int reconnectInterval, final int reconnectLimit, long websocketTimeout) throws MalformedURLException { // Default max 32x slowdown from base interval - this(client, baseOperation, listOptions, watcher, reconnectInterval, reconnectLimit, websocketTimeout, 5); + this(client, baseOperation, listOptions, watcher, reconnectInterval, reconnectLimit, websocketTimeout, BACKOFF_MAX_EXPONENT); } @Override diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java index 1a27f7ece20..19866930309 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java @@ -73,8 +73,6 @@ public class DefaultSharedIndexInformer initialState; - private InformerExceptionHandler exceptionHandler; - public DefaultSharedIndexInformer(Class apiTypeClass, ListerWatcher listerWatcher, long resyncPeriod, Executor informerExecutor) { if (resyncPeriod < 0) { @@ -90,11 +88,7 @@ public DefaultSharedIndexInformer(Class apiTypeClass, ListerWatcher lis this.processor = new SharedProcessor<>(informerExecutor, description); processorStore = new ProcessorStore<>(this.indexer, this.processor); - this.reflector = new Reflector<>(listerWatcher, processorStore, this::getExceptionHandler); - } - - public InformerExceptionHandler getExceptionHandler() { - return exceptionHandler; + this.reflector = new Reflector<>(listerWatcher, processorStore); } /** @@ -298,6 +292,7 @@ public String toString() { @Override public void setExceptionHandler(InformerExceptionHandler handler) { - this.exceptionHandler = handler; + this.reflector.setExceptionHandler(handler); + this.processor.setExceptionHandler(handler); } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java index 20db9b9a7f4..abfd4b33008 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java @@ -22,9 +22,12 @@ import io.fabric8.kubernetes.client.Watch; import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.WatcherException; +import io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager; import io.fabric8.kubernetes.client.informers.InformerExceptionHandler; +import io.fabric8.kubernetes.client.informers.InformerExceptionHandler.EventType; import io.fabric8.kubernetes.client.informers.impl.ListerWatcher; import io.fabric8.kubernetes.client.utils.Utils; +import io.fabric8.kubernetes.client.utils.internal.ExponentialBackoffIntervalCalculator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,7 +38,6 @@ import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; public class Reflector> { @@ -49,16 +51,14 @@ public class Reflector watchFuture; private volatile CompletableFuture reconnectFuture; + private volatile InformerExceptionHandler handler; + private final ExponentialBackoffIntervalCalculator retryIntervalCalculator; public Reflector(ListerWatcher listerWatcher, SyncableStore store) { - this(listerWatcher, store, null); - } - - public Reflector(ListerWatcher listerWatcher, SyncableStore store, - Supplier exceptionHandlerSupplier) { this.listerWatcher = listerWatcher; this.store = store; - this.watcher = new ReflectorWatcher(exceptionHandlerSupplier); + this.watcher = new ReflectorWatcher(); + this.retryIntervalCalculator = new ExponentialBackoffIntervalCalculator(listerWatcher.getWatchReconnectInterval(), WatchConnectionManager.BACKOFF_MAX_EXPONENT); } public CompletableFuture start() { @@ -114,7 +114,14 @@ public CompletableFuture listSyncAndWatch() { started.whenComplete((w, t) -> { if (w != null) { if (running) { + if (log.isDebugEnabled()) { + log.debug("Watch started for {}", Reflector.this); + } watching = true; + InformerExceptionHandler theHandler = handler; + if (theHandler != null) { + theHandler.onWatching(); + } } else { stopWatch(w); } @@ -182,12 +189,6 @@ public boolean isWatching() { class ReflectorWatcher implements Watcher { - private final Supplier exceptionHandlerSupplier; - - ReflectorWatcher(Supplier exceptionHandlerSupplier) { - this.exceptionHandlerSupplier = exceptionHandlerSupplier; - } - @Override public void eventReceived(Action action, T resource) { if (action == null) { @@ -222,38 +223,45 @@ public void onClose(WatcherException exception) { // this close was triggered by an exception, // not the user, it is expected that the watch retry will handle this watchStopped(); - InformerExceptionHandler handler = exceptionHandlerSupplier.get(); + InformerExceptionHandler theHandler = handler; boolean reconnect = false; if (exception.isHttpGone()) { if (log.isDebugEnabled()) { log.debug("Watch restarting due to http gone for {}", Reflector.this); } reconnect = true; - } else if (handler != null) { - reconnect = handler.retry(exception.getCause()); + } else if (theHandler != null) { + reconnect = !theHandler.shouldStop(exception.getCause()); + } else { + log.warn("Watch closing with exception for {}", Reflector.this, exception); } if (reconnect) { // start a whole new list/watch cycle reconnect(); } else { running = false; // shouldn't happen, but it means the watch won't restart - if (handler == null) { - log.warn("Watch closing with exception for {}", Reflector.this, exception); - } } } private void reconnect() { + if (!running) { + return; + } // this can be run in the scheduler thread because // any further operations will happen on the io thread reconnectFuture = Utils.schedule(Runnable::run, Reflector.this::listSyncAndWatch, - listerWatcher.getWatchReconnectInterval(), TimeUnit.MILLISECONDS); + retryIntervalCalculator.nextReconnectInterval(), TimeUnit.MILLISECONDS); reconnectFuture.whenComplete((v, t) -> { if (t != null) { - InformerExceptionHandler handler = exceptionHandlerSupplier.get(); - if (handler == null || handler.retry(t)) { - reconnect(); + InformerExceptionHandler theHandler = handler; + if (theHandler != null) { + theHandler.onException(EventType.LIST_OR_WATCH, t); + } else { + log.warn("listSyncAndWatch failed for {}, will retry", Reflector.this, t); } + reconnect(); + } else { + retryIntervalCalculator.resetReconnectAttempts(); } }); } @@ -278,4 +286,8 @@ ReflectorWatcher getWatcher() { public String toString() { return listerWatcher.getApiEndpointPath(); } + + public void setExceptionHandler(InformerExceptionHandler handler) { + this.handler = handler; + } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SharedProcessor.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SharedProcessor.java index 86e334bb93a..d421c19428d 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SharedProcessor.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SharedProcessor.java @@ -15,6 +15,8 @@ */ package io.fabric8.kubernetes.client.informers.impl.cache; +import io.fabric8.kubernetes.client.informers.InformerExceptionHandler; +import io.fabric8.kubernetes.client.informers.InformerExceptionHandler.EventType; import io.fabric8.kubernetes.client.informers.ResourceEventHandler; import io.fabric8.kubernetes.client.utils.internal.SerialExecutor; import org.slf4j.Logger; @@ -51,6 +53,8 @@ public class SharedProcessor { private final SerialExecutor executor; private final String informerDescription; + private volatile InformerExceptionHandler handler; + public SharedProcessor() { this(Runnable::run, "informer"); } @@ -109,8 +113,13 @@ public void distribute(Consumer> operation, boolean isSync) try { operation.accept(listener); } catch (Exception ex) { - log.error("{} failed invoking {} event handler: {}", informerDescription, listener.getHandler(), ex.getMessage(), - ex); + InformerExceptionHandler theHandler = handler; + if (theHandler != null) { + theHandler.onException(EventType.HANDLER, ex); + } else { + log.error("{} failed invoking {} event handler: {}", informerDescription, listener.getHandler(), ex.getMessage(), + ex); + } } } }); @@ -170,4 +179,9 @@ public ProcessorListener addProcessorListener(ResourceEventHandler lock.writeLock().unlock(); } } + + public void setExceptionHandler(InformerExceptionHandler handler) { + this.handler = handler; + } + } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/internal/ExponentialBackoffIntervalCalculator.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/internal/ExponentialBackoffIntervalCalculator.java index 269e2528e22..3e9696c5c5a 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/internal/ExponentialBackoffIntervalCalculator.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/internal/ExponentialBackoffIntervalCalculator.java @@ -15,10 +15,18 @@ */ package io.fabric8.kubernetes.client.utils.internal; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicInteger; + public class ExponentialBackoffIntervalCalculator { + private static final Logger logger = LoggerFactory.getLogger(ExponentialBackoffIntervalCalculator.class); + private final int initialInterval; private final int maxRetryIntervalExponent; + final AtomicInteger currentReconnectAttempt = new AtomicInteger(); public ExponentialBackoffIntervalCalculator(int initialInterval, int maxRetryIntervalExponent) { this.initialInterval = initialInterval; @@ -33,4 +41,19 @@ public long getInterval(int retryIndex) { return (long)initialInterval * (1 << exponentOfTwo); } + public void resetReconnectAttempts() { + currentReconnectAttempt.set(0); + } + + public final long nextReconnectInterval() { + int exponentOfTwo = currentReconnectAttempt.getAndIncrement(); + long ret = getInterval(exponentOfTwo); + logger.debug("Current reconnect backoff is {} milliseconds (T{})", ret, exponentOfTwo); + return ret; + } + + public int getCurrentReconnectAttempt() { + return currentReconnectAttempt.get(); + } + } From 424ef82a56b59e96cccf752e706fef94c6cfdb9c Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Tue, 6 Sep 2022 12:54:00 -0400 Subject: [PATCH 4/8] fix #4369: narrowing the changes to only a stopped future --- .../informers/InformerExceptionHandler.java | 59 ------------------- .../client/informers/SharedIndexInformer.java | 7 ++- .../dsl/internal/AbstractWatchManager.java | 3 +- .../client/dsl/internal/OperationSupport.java | 7 ++- .../dsl/internal/WatchConnectionManager.java | 3 +- .../impl/DefaultSharedIndexInformer.java | 7 +-- .../informers/impl/cache/Reflector.java | 35 ++++------- .../informers/impl/cache/SharedProcessor.java | 18 +----- .../ExponentialBackoffIntervalCalculator.java | 2 +- 9 files changed, 30 insertions(+), 111 deletions(-) delete mode 100644 kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/InformerExceptionHandler.java diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/InformerExceptionHandler.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/InformerExceptionHandler.java deleted file mode 100644 index 7d828744c02..00000000000 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/InformerExceptionHandler.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Copyright (C) 2015 Red Hat, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.fabric8.kubernetes.client.informers; - -public interface InformerExceptionHandler { - - public enum EventType { - /** - * an exception that occurs trying to perform the list or watch operation. The default handling is to log the exception. - */ - LIST_OR_WATCH, - /** - * an exception that occurs invoking a {@link ResourceEventHandler} method. The default handling is to log the exception. - */ - HANDLER - } - - /** - * Determine if the informer should stop given from a non-http gone WatchException cause. - *

- * The default behavior is to terminate as we cannot guarantee if the state is still correct - * - * @param t the non-http gone WatchException cause - * @return true if the informer should stop, false if it should attempt to keep running - */ - default boolean shouldStop(Throwable t) { - return true; - } - - /** - * Override the default handling of exceptions seen while the informer is running. - *

- * If you want to stop the informer as a side-effect of this call, then construct your implementation - * of this class with a reference to the informer then call the stop method. - */ - void onException(EventType eventType, Throwable t); - - /** - * Called after each time the list, sync and watch operations have been successful. - */ - default void onWatching() { - - } - -} diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java index 3250cac2ed9..c9082164e55 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java @@ -166,6 +166,9 @@ default boolean hasSynced() { */ CompletableFuture start(); - default void setExceptionHandler(InformerExceptionHandler handler) { - } + /** + * Return a future that will allow notification of informer stopping. + * + */ + CompletableFuture stopped(); } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java index 8c6ae32a6bd..3a664bdbd99 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java @@ -161,7 +161,8 @@ synchronized void reconnect() { } final boolean cannotReconnect() { - return !watcher.reconnecting() && retryIntervalCalculator.getCurrentReconnectAttempt() >= reconnectLimit && reconnectLimit >= 0; + return !watcher.reconnecting() && retryIntervalCalculator.getCurrentReconnectAttempt() >= reconnectLimit + && reconnectLimit >= 0; } final long nextReconnectInterval() { diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/OperationSupport.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/OperationSupport.java index f86e74544f8..7c68ff76335 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/OperationSupport.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/OperationSupport.java @@ -567,7 +567,9 @@ protected CompletableFuture handleResponse(HttpClient client, HttpRequest VersionUsageUtils.log(this.resourceT, this.apiGroupVersion); HttpRequest request = requestBuilder.build(); CompletableFuture> futureResponse = new CompletableFuture<>(); - retryWithExponentialBackoff(futureResponse, new ExponentialBackoffIntervalCalculator(requestRetryBackoffLimit, MAX_RETRY_INTERVAL_EXPONENT), Utils.getNonNullOrElse(client, httpClient), request); + retryWithExponentialBackoff(futureResponse, + new ExponentialBackoffIntervalCalculator(requestRetryBackoffLimit, MAX_RETRY_INTERVAL_EXPONENT), + Utils.getNonNullOrElse(client, httpClient), request); return futureResponse.thenApply(response -> { try { @@ -605,7 +607,8 @@ protected void retryWithExponentialBackoff(CompletableFuture retryWithExponentialBackoff(result, retryIntervalCalculator, client, request), retryInterval, TimeUnit.MILLISECONDS); + () -> retryWithExponentialBackoff(result, retryIntervalCalculator, client, request), retryInterval, + TimeUnit.MILLISECONDS); return; } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java index 0d9fcf23fae..1f1af781053 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java @@ -80,7 +80,8 @@ public WatchConnectionManager(final HttpClient client, final BaseOperation watcher, final int reconnectInterval, final int reconnectLimit, long websocketTimeout) throws MalformedURLException { // Default max 32x slowdown from base interval - this(client, baseOperation, listOptions, watcher, reconnectInterval, reconnectLimit, websocketTimeout, BACKOFF_MAX_EXPONENT); + this(client, baseOperation, listOptions, watcher, reconnectInterval, reconnectLimit, websocketTimeout, + BACKOFF_MAX_EXPONENT); } @Override diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java index 19866930309..74b5cbfff16 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java @@ -18,7 +18,6 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.KubernetesResourceList; import io.fabric8.kubernetes.client.KubernetesClientException; -import io.fabric8.kubernetes.client.informers.InformerExceptionHandler; import io.fabric8.kubernetes.client.informers.ResourceEventHandler; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import io.fabric8.kubernetes.client.informers.cache.Indexer; @@ -291,8 +290,8 @@ public String toString() { } @Override - public void setExceptionHandler(InformerExceptionHandler handler) { - this.reflector.setExceptionHandler(handler); - this.processor.setExceptionHandler(handler); + public CompletableFuture stopped() { + return this.reflector.getStopFuture(); } + } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java index abfd4b33008..c6ef5dc6614 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java @@ -23,8 +23,6 @@ import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.WatcherException; import io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager; -import io.fabric8.kubernetes.client.informers.InformerExceptionHandler; -import io.fabric8.kubernetes.client.informers.InformerExceptionHandler.EventType; import io.fabric8.kubernetes.client.informers.impl.ListerWatcher; import io.fabric8.kubernetes.client.utils.Utils; import io.fabric8.kubernetes.client.utils.internal.ExponentialBackoffIntervalCalculator; @@ -51,14 +49,15 @@ public class Reflector watchFuture; private volatile CompletableFuture reconnectFuture; - private volatile InformerExceptionHandler handler; + private volatile CompletableFuture stopFuture = new CompletableFuture(); private final ExponentialBackoffIntervalCalculator retryIntervalCalculator; public Reflector(ListerWatcher listerWatcher, SyncableStore store) { this.listerWatcher = listerWatcher; this.store = store; this.watcher = new ReflectorWatcher(); - this.retryIntervalCalculator = new ExponentialBackoffIntervalCalculator(listerWatcher.getWatchReconnectInterval(), WatchConnectionManager.BACKOFF_MAX_EXPONENT); + this.retryIntervalCalculator = new ExponentialBackoffIntervalCalculator(listerWatcher.getWatchReconnectInterval(), + WatchConnectionManager.BACKOFF_MAX_EXPONENT); } public CompletableFuture start() { @@ -68,6 +67,7 @@ public CompletableFuture start() { public void stop() { running = false; + stopFuture.complete(null); Future future = reconnectFuture; if (future != null) { future.cancel(true); @@ -118,10 +118,6 @@ public CompletableFuture listSyncAndWatch() { log.debug("Watch started for {}", Reflector.this); } watching = true; - InformerExceptionHandler theHandler = handler; - if (theHandler != null) { - theHandler.onWatching(); - } } else { stopWatch(w); } @@ -223,23 +219,16 @@ public void onClose(WatcherException exception) { // this close was triggered by an exception, // not the user, it is expected that the watch retry will handle this watchStopped(); - InformerExceptionHandler theHandler = handler; - boolean reconnect = false; if (exception.isHttpGone()) { if (log.isDebugEnabled()) { log.debug("Watch restarting due to http gone for {}", Reflector.this); } - reconnect = true; - } else if (theHandler != null) { - reconnect = !theHandler.shouldStop(exception.getCause()); - } else { - log.warn("Watch closing with exception for {}", Reflector.this, exception); - } - if (reconnect) { // start a whole new list/watch cycle reconnect(); } else { running = false; // shouldn't happen, but it means the watch won't restart + stopFuture.completeExceptionally(exception.getCause()); + log.warn("Watch closing with exception for {}", Reflector.this, exception); } } @@ -253,12 +242,7 @@ private void reconnect() { retryIntervalCalculator.nextReconnectInterval(), TimeUnit.MILLISECONDS); reconnectFuture.whenComplete((v, t) -> { if (t != null) { - InformerExceptionHandler theHandler = handler; - if (theHandler != null) { - theHandler.onException(EventType.LIST_OR_WATCH, t); - } else { - log.warn("listSyncAndWatch failed for {}, will retry", Reflector.this, t); - } + log.warn("listSyncAndWatch failed for {}, will retry", Reflector.this, t); reconnect(); } else { retryIntervalCalculator.resetReconnectAttempts(); @@ -287,7 +271,8 @@ public String toString() { return listerWatcher.getApiEndpointPath(); } - public void setExceptionHandler(InformerExceptionHandler handler) { - this.handler = handler; + public CompletableFuture getStopFuture() { + return stopFuture; } + } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SharedProcessor.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SharedProcessor.java index d421c19428d..86e334bb93a 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SharedProcessor.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SharedProcessor.java @@ -15,8 +15,6 @@ */ package io.fabric8.kubernetes.client.informers.impl.cache; -import io.fabric8.kubernetes.client.informers.InformerExceptionHandler; -import io.fabric8.kubernetes.client.informers.InformerExceptionHandler.EventType; import io.fabric8.kubernetes.client.informers.ResourceEventHandler; import io.fabric8.kubernetes.client.utils.internal.SerialExecutor; import org.slf4j.Logger; @@ -53,8 +51,6 @@ public class SharedProcessor { private final SerialExecutor executor; private final String informerDescription; - private volatile InformerExceptionHandler handler; - public SharedProcessor() { this(Runnable::run, "informer"); } @@ -113,13 +109,8 @@ public void distribute(Consumer> operation, boolean isSync) try { operation.accept(listener); } catch (Exception ex) { - InformerExceptionHandler theHandler = handler; - if (theHandler != null) { - theHandler.onException(EventType.HANDLER, ex); - } else { - log.error("{} failed invoking {} event handler: {}", informerDescription, listener.getHandler(), ex.getMessage(), - ex); - } + log.error("{} failed invoking {} event handler: {}", informerDescription, listener.getHandler(), ex.getMessage(), + ex); } } }); @@ -179,9 +170,4 @@ public ProcessorListener addProcessorListener(ResourceEventHandler lock.writeLock().unlock(); } } - - public void setExceptionHandler(InformerExceptionHandler handler) { - this.handler = handler; - } - } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/internal/ExponentialBackoffIntervalCalculator.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/internal/ExponentialBackoffIntervalCalculator.java index 3e9696c5c5a..4dc34a5a272 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/internal/ExponentialBackoffIntervalCalculator.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/internal/ExponentialBackoffIntervalCalculator.java @@ -38,7 +38,7 @@ public long getInterval(int retryIndex) { if (exponentOfTwo > maxRetryIntervalExponent) { exponentOfTwo = maxRetryIntervalExponent; } - return (long)initialInterval * (1 << exponentOfTwo); + return (long) initialInterval * (1 << exponentOfTwo); } public void resetReconnectAttempts() { From 2c8b5fa6a12ea6f8b5cf7f906407a43c35309a76 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Wed, 7 Sep 2022 11:59:14 +0200 Subject: [PATCH 5/8] feat: pass raw message to WatcherException, easier JSON error detection --- .../kubernetes/client/WatcherException.java | 14 +++- .../dsl/internal/AbstractWatchManager.java | 71 ++++++++++--------- .../informers/impl/cache/Reflector.java | 2 +- 3 files changed, 50 insertions(+), 37 deletions(-) diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/WatcherException.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/WatcherException.java index 2803fc9393e..b921849e811 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/WatcherException.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/WatcherException.java @@ -16,15 +16,23 @@ package io.fabric8.kubernetes.client; import java.net.HttpURLConnection; +import java.util.Optional; public class WatcherException extends Exception { + private final String rawWatchMessage; public WatcherException(String message, Throwable cause) { - super(message, cause); + this(message, cause, null); } public WatcherException(String message) { super(message); + rawWatchMessage = null; + } + + public WatcherException(String message, Throwable cause, String rawWatchMessage) { + super(message, cause); + this.rawWatchMessage = rawWatchMessage; } public KubernetesClientException asClientException() { @@ -39,4 +47,8 @@ public boolean isHttpGone() { || (cause.getStatus() != null && cause.getStatus().getCode() == HttpURLConnection.HTTP_GONE); } + @SuppressWarnings("unused") + public Optional getRawWatchMessage() { + return Optional.ofNullable(rawWatchMessage); + } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java index 3a664bdbd99..1b343790b9b 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java @@ -63,8 +63,8 @@ public abstract class AbstractWatchManager implements Wat protected final HttpClient client; protected BaseOperation baseOperation; - private ListOptions listOptions; - private URL requestUrl; + private final ListOptions listOptions; + private final URL requestUrl; private final boolean receiveBookmarks; @@ -187,7 +187,9 @@ void eventReceived(Watcher.Action action, HasMetadata resource) { if (resource != null && !baseOperation.getType().isAssignableFrom(resource.getClass())) { resource = Serialization.jsonMapper().convertValue(resource, baseOperation.getType()); } - watcher.eventReceived(action, (T) resource); + @SuppressWarnings("unchecked") + final T t = (T) resource; + watcher.eventReceived(action, t); } void updateResourceVersion(final String newResourceVersion) { @@ -223,29 +225,26 @@ public void close() { cancelReconnect(); } - private WatchEvent contextAwareWatchEventDeserializer(String messageSource) { + private WatchEvent contextAwareWatchEventDeserializer(String messageSource) + throws JsonProcessingException { try { return Serialization.unmarshal(messageSource, WatchEvent.class); } catch (Exception ex1) { - try { - JsonNode json = Serialization.jsonMapper().readTree(messageSource); - JsonNode objectJson = null; - if (json instanceof ObjectNode && json.has("object")) { - objectJson = ((ObjectNode) json).remove("object"); - } + JsonNode json = Serialization.jsonMapper().readTree(messageSource); + JsonNode objectJson = null; + if (json instanceof ObjectNode && json.has("object")) { + objectJson = ((ObjectNode) json).remove("object"); + } - WatchEvent watchEvent = Serialization.jsonMapper().treeToValue(json, WatchEvent.class); - KubernetesResource object = Serialization.jsonMapper().treeToValue(objectJson, baseOperation.getType()); + WatchEvent watchEvent = Serialization.jsonMapper().treeToValue(json, WatchEvent.class); + KubernetesResource object = Serialization.jsonMapper().treeToValue(objectJson, baseOperation.getType()); - watchEvent.setObject(object); - return watchEvent; - } catch (JsonProcessingException ex2) { - throw new IllegalArgumentException("Failed to deserialize WatchEvent", ex2); - } + watchEvent.setObject(object); + return watchEvent; } } - protected WatchEvent readWatchEvent(String messageSource) { + protected WatchEvent readWatchEvent(String messageSource) throws JsonProcessingException { WatchEvent event = contextAwareWatchEventDeserializer(messageSource); KubernetesResource object = null; if (event != null) { @@ -277,32 +276,34 @@ protected void onMessage(String message) { Status status = (Status) object; onStatus(status); - } else if (object instanceof KubernetesResourceList) { - // Dirty cast - should always be valid though - KubernetesResourceList list = (KubernetesResourceList) object; - updateResourceVersion(list.getMetadata().getResourceVersion()); + } else if (object instanceof HasMetadata) { + HasMetadata hasMetadata = (HasMetadata) object; + updateResourceVersion(hasMetadata.getMetadata().getResourceVersion()); Action action = Action.valueOf(event.getType()); - List items = list.getItems(); - if (items != null) { - for (HasMetadata item : items) { - eventReceived(action, item); + + if(object instanceof KubernetesResourceList) { + // Dirty cast - should always be valid though + @SuppressWarnings({"rawtypes"}) + KubernetesResourceList list = (KubernetesResourceList) hasMetadata; + @SuppressWarnings("unchecked") + List items = list.getItems(); + if (items != null) { + for (HasMetadata item : items) { + eventReceived(action, item); + } } + } else { + eventReceived(action, hasMetadata); } - } else if (object instanceof HasMetadata) { - @SuppressWarnings("unchecked") - T obj = (T) object; - updateResourceVersion(obj.getMetadata().getResourceVersion()); - Action action = Action.valueOf(event.getType()); - eventReceived(action, obj); } else { logger.error("Unknown message received: {}", message); } } catch (ClassCastException e) { final String msg = "Received wrong type of object for watch"; close(new WatcherException(msg, e)); - } catch (IllegalArgumentException e) { - final String msg = "Invalid event type"; - close(new WatcherException(msg, e)); + } catch (JsonProcessingException e) { + final String msg = "Couldn't deserialize watch event: " + message; + close(new WatcherException(msg, e, message)); } catch (Exception e) { final String msg = "Unhandled exception encountered in watcher event handler"; close(new WatcherException(msg, e)); diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java index c6ef5dc6614..7945f674d3e 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java @@ -49,7 +49,7 @@ public class Reflector watchFuture; private volatile CompletableFuture reconnectFuture; - private volatile CompletableFuture stopFuture = new CompletableFuture(); + private final CompletableFuture stopFuture = new CompletableFuture<>(); private final ExponentialBackoffIntervalCalculator retryIntervalCalculator; public Reflector(ListerWatcher listerWatcher, SyncableStore store) { From db5ba0e26b3ef6d33c4c30b8af48443ff5675cf2 Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Wed, 7 Sep 2022 07:14:09 -0400 Subject: [PATCH 6/8] fix #4369: refining the watch and informer reconnect and adding tests --- CHANGELOG.md | 3 +- .../kubernetes/client/WatcherException.java | 8 +- .../client/informers/SharedIndexInformer.java | 7 +- .../client/WatcherExceptionTest.java | 41 +++++++++++ .../dsl/internal/AbstractWatchManager.java | 30 +++++--- .../informers/impl/cache/Reflector.java | 73 +++++++++---------- .../informers/impl/cache/ReflectorTest.java | 4 +- .../mock/DefaultSharedIndexInformerTest.java | 9 ++- .../kubernetes/client/mock/WatchTest.java | 2 +- 9 files changed, 119 insertions(+), 58 deletions(-) create mode 100644 kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/WatcherExceptionTest.java diff --git a/CHANGELOG.md b/CHANGELOG.md index f7b01fde5f6..3915e9aa067 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,7 @@ * Fix #4256: crd-generator-apt pom.xml includes transitive dependencies * Fix #4294: crd-generator respects JsonIgnore annotations on enum properties * Fix #4320: corrected leader transitions field on leader election leases - +* Fix #4369: Informers will retry with a backoff on list/watch failure as they did in 5.12 and prior. #### Improvements * Fix #887: added KubernetesClient.visitResources to search and perform other operations across all resources. @@ -21,6 +21,7 @@ * Fix #4287: added WorkloadGroup for Istio v1alpha3 extension generator * Fix #4318: implemented LeaderElection releaseOnCancel * Fix #3960: adding a KubernetesMockServer.expectCustomResource helper method and additional mock crd support +* Fix #4365: The Watch retry logic will handle more cases, as well as perform an exceptional close for events that are not properly handled. Informers can directly provide those exceptional outcomes via the SharedIndexInformer.stopped CompletableFuture. #### Dependency Upgrade * Bump Knative model to v0.34.0 diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/WatcherException.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/WatcherException.java index b921849e811..2b119b421f3 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/WatcherException.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/WatcherException.java @@ -37,14 +37,14 @@ public WatcherException(String message, Throwable cause, String rawWatchMessage) public KubernetesClientException asClientException() { final Throwable cause = getCause(); - return cause instanceof KubernetesClientException ? - (KubernetesClientException) cause : new KubernetesClientException(getMessage(), cause); + return cause instanceof KubernetesClientException ? (KubernetesClientException) cause + : new KubernetesClientException(getMessage(), cause); } public boolean isHttpGone() { final KubernetesClientException cause = asClientException(); - return cause.getCode() == HttpURLConnection.HTTP_GONE - || (cause.getStatus() != null && cause.getStatus().getCode() == HttpURLConnection.HTTP_GONE); + return cause != null && (cause.getCode() == HttpURLConnection.HTTP_GONE + || (cause.getStatus() != null && cause.getStatus().getCode() == HttpURLConnection.HTTP_GONE)); } @SuppressWarnings("unused") diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java index c9082164e55..264b5222dce 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java @@ -15,6 +15,7 @@ */ package io.fabric8.kubernetes.client.informers; +import io.fabric8.kubernetes.client.WatcherException; import io.fabric8.kubernetes.client.informers.cache.Cache; import io.fabric8.kubernetes.client.informers.cache.Indexer; import io.fabric8.kubernetes.client.informers.cache.ItemStore; @@ -168,7 +169,11 @@ default boolean hasSynced() { /** * Return a future that will allow notification of informer stopping. - * + *

+ * If {@link #stop()} is called, the future will be completed with a null value. + *

+ * If an exception occurs that terminates the informer, then it will be exceptionally completed with that exception + * - typically a {@link WatcherException} */ CompletableFuture stopped(); } diff --git a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/WatcherExceptionTest.java b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/WatcherExceptionTest.java new file mode 100644 index 00000000000..3d500c3a88c --- /dev/null +++ b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/WatcherExceptionTest.java @@ -0,0 +1,41 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client; + +import io.fabric8.kubernetes.api.model.Status; +import org.junit.jupiter.api.Test; + +import java.net.HttpURLConnection; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class WatcherExceptionTest { + + @Test + void testIsHttpGone() { + WatcherException we = new WatcherException("I've failed"); + assertFalse(we.isHttpGone()); + + we = new WatcherException("I've failed", new ClassCastException()); + assertFalse(we.isHttpGone()); + + we = new WatcherException("I've failed", + new KubernetesClientException("http gone", HttpURLConnection.HTTP_GONE, new Status())); + assertTrue(we.isHttpGone()); + } + +} diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java index 1b343790b9b..e3c9a5454f5 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java @@ -226,7 +226,7 @@ public void close() { } private WatchEvent contextAwareWatchEventDeserializer(String messageSource) - throws JsonProcessingException { + throws JsonProcessingException { try { return Serialization.unmarshal(messageSource, WatchEvent.class); } catch (Exception ex1) { @@ -272,18 +272,23 @@ protected void onMessage(String message) { try { WatchEvent event = readWatchEvent(message); Object object = event.getObject(); - if (object instanceof Status) { - Status status = (Status) object; + Action action = Action.valueOf(event.getType()); + if (action == Action.ERROR) { + if (object instanceof Status) { + Status status = (Status) object; - onStatus(status); + onStatus(status); + } else { + logger.error("Error received, but object is not a status - will retry"); + closeRequest(); + } } else if (object instanceof HasMetadata) { HasMetadata hasMetadata = (HasMetadata) object; updateResourceVersion(hasMetadata.getMetadata().getResourceVersion()); - Action action = Action.valueOf(event.getType()); - if(object instanceof KubernetesResourceList) { + if (object instanceof KubernetesResourceList) { // Dirty cast - should always be valid though - @SuppressWarnings({"rawtypes"}) + @SuppressWarnings({ "rawtypes" }) KubernetesResourceList list = (KubernetesResourceList) hasMetadata; @SuppressWarnings("unchecked") List items = list.getItems(); @@ -296,17 +301,18 @@ protected void onMessage(String message) { eventReceived(action, hasMetadata); } } else { - logger.error("Unknown message received: {}", message); + final String msg = String.format("Invalid object received: %s", message); + close(new WatcherException(msg, null, message)); } } catch (ClassCastException e) { final String msg = "Received wrong type of object for watch"; - close(new WatcherException(msg, e)); + close(new WatcherException(msg, e, message)); } catch (JsonProcessingException e) { final String msg = "Couldn't deserialize watch event: " + message; close(new WatcherException(msg, e, message)); } catch (Exception e) { final String msg = "Unhandled exception encountered in watcher event handler"; - close(new WatcherException(msg, e)); + close(new WatcherException(msg, e, message)); } } @@ -317,8 +323,8 @@ protected boolean onStatus(Status status) { return true; } - eventReceived(Action.ERROR, null); - logger.error("Error received: {}", status); + logger.error("Error received: {}, will retry", status); + closeRequest(); return false; } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java index 7945f674d3e..ef336ae6db1 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java @@ -62,7 +62,7 @@ public Reflector(ListerWatcher listerWatcher, SyncableStore store) { public CompletableFuture start() { this.running = true; - return listSyncAndWatch(); + return listSyncAndWatch(false); } public void stop() { @@ -98,33 +98,50 @@ private synchronized void stopWatcher() { * * @return a future that completes when the list and watch are established */ - public CompletableFuture listSyncAndWatch() { + public CompletableFuture listSyncAndWatch(boolean reconnect) { if (!running) { return CompletableFuture.completedFuture(null); } Set nextKeys = new ConcurrentSkipListSet<>(); - return processList(nextKeys, null).thenAccept(result -> { + CompletableFuture theFuture = processList(nextKeys, null).thenCompose(result -> { store.retainAll(nextKeys); final String latestResourceVersion = result.getMetadata().getResourceVersion(); lastSyncResourceVersion = latestResourceVersion; log.debug("Listing items ({}) for {} at v{}", nextKeys.size(), this, latestResourceVersion); - CompletableFuture started = startWatcher(latestResourceVersion); - if (started != null) { - // outside of the lock - started.whenComplete((w, t) -> { - if (w != null) { - if (running) { - if (log.isDebugEnabled()) { - log.debug("Watch started for {}", Reflector.this); - } - watching = true; - } else { - stopWatch(w); - } + return startWatcher(latestResourceVersion); + }).thenAccept(w -> { + if (w != null) { + if (running) { + if (log.isDebugEnabled()) { + log.debug("Watch started for {}", Reflector.this); } - }); + watching = true; + } else { + stopWatch(w); + } } }); + if (reconnect) { + theFuture.whenComplete((v, t) -> { + if (t != null) { + log.warn("listSyncAndWatch failed for {}, will retry", Reflector.this, t); + reconnect(); + } else { + retryIntervalCalculator.resetReconnectAttempts(); + } + }); + } + return theFuture; + } + + private void reconnect() { + if (!running) { + return; + } + // this can be run in the scheduler thread because + // any further operations will happen on the io thread + reconnectFuture = Utils.schedule(Runnable::run, () -> listSyncAndWatch(true), + retryIntervalCalculator.nextReconnectInterval(), TimeUnit.MILLISECONDS); } private CompletableFuture processList(Set nextKeys, String continueVal) { @@ -155,7 +172,7 @@ private void stopWatch(Watch w) { private synchronized CompletableFuture startWatcher(final String latestResourceVersion) { if (!running) { - return null; + return CompletableFuture.completedFuture(null); } log.debug("Starting watcher for {} at v{}", this, latestResourceVersion); // there's no need to stop the old watch, that will happen automatically when this call completes @@ -227,29 +244,11 @@ public void onClose(WatcherException exception) { reconnect(); } else { running = false; // shouldn't happen, but it means the watch won't restart - stopFuture.completeExceptionally(exception.getCause()); + stopFuture.completeExceptionally(exception); log.warn("Watch closing with exception for {}", Reflector.this, exception); } } - private void reconnect() { - if (!running) { - return; - } - // this can be run in the scheduler thread because - // any further operations will happen on the io thread - reconnectFuture = Utils.schedule(Runnable::run, Reflector.this::listSyncAndWatch, - retryIntervalCalculator.nextReconnectInterval(), TimeUnit.MILLISECONDS); - reconnectFuture.whenComplete((v, t) -> { - if (t != null) { - log.warn("listSyncAndWatch failed for {}, will retry", Reflector.this, t); - reconnect(); - } else { - retryIntervalCalculator.resetReconnectAttempts(); - } - }); - } - @Override public void onClose() { watchStopped(); diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/cache/ReflectorTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/cache/ReflectorTest.java index 2d39dd13c26..a8a3d2a82f9 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/cache/ReflectorTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/cache/ReflectorTest.java @@ -59,15 +59,17 @@ void testStateFlags() { assertFalse(reflector.isWatching()); assertTrue(reflector.isRunning()); - reflector.listSyncAndWatch().join(); + reflector.listSyncAndWatch(false).join(); assertTrue(reflector.isWatching()); assertTrue(reflector.isRunning()); + assertFalse(reflector.getStopFuture().isDone()); reflector.stop(); assertFalse(reflector.isWatching()); assertFalse(reflector.isRunning()); + assertTrue(reflector.getStopFuture().isDone()); } @Test diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java index a97e69e5280..ea60bdce641 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java @@ -36,6 +36,7 @@ import io.fabric8.kubernetes.api.model.rbac.ClusterRoleBindingBuilder; import io.fabric8.kubernetes.client.CustomResourceList; import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext; import io.fabric8.kubernetes.client.dsl.base.ResourceDefinitionContext; import io.fabric8.kubernetes.client.informers.ResourceEventHandler; @@ -83,7 +84,8 @@ class DefaultSharedIndexInformerTest { .withMessage( "410: The event in requested index is outdated and cleared (the requested history has been cleared [3/1]) [2]") .build(); - static final WatchEvent outdatedEvent = new WatchEventBuilder().withStatusObject(outdatedStatus).build(); + static final WatchEvent outdatedEvent = new WatchEventBuilder().withType(Watcher.Action.ERROR.name()) + .withStatusObject(outdatedStatus).build(); static final Long WATCH_EVENT_EMIT_TIME = 1L; static final Long OUTDATED_WATCH_EVENT_EMIT_TIME = 1L; static final long RESYNC_PERIOD = 5L; @@ -844,6 +846,11 @@ void testReconnectAfterOnCloseException() throws InterruptedException { .andEmit(outdatedEvent) .done().always(); + // re-list errors + server.expect().withPath("/api/v1/pods") + .andReturn(HttpURLConnection.HTTP_FORBIDDEN, new Status()) + .times(2); + // re-list server.expect().withPath("/api/v1/pods") .andReturn(200, diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/WatchTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/WatchTest.java index 1d33082cede..ef69c0c1083 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/WatchTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/WatchTest.java @@ -315,7 +315,7 @@ public void onClose(WatcherException cause) { } private static WatchEvent outdatedEvent() { - return new WatchEventBuilder().withStatusObject( + return new WatchEventBuilder().withType(Watcher.Action.ERROR.name()).withStatusObject( new StatusBuilder().withCode(HttpURLConnection.HTTP_GONE) .withMessage( "410: The event in requested index is outdated and cleared (the requested history has been cleared [3/1]) [2]") From 64905e1bab7135c6f8ea40b8af1e2960b979cf28 Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Wed, 7 Sep 2022 09:35:57 -0400 Subject: [PATCH 7/8] cleaning up how the watch is stopped --- .../impl/DefaultSharedIndexInformer.java | 2 +- .../client/informers/impl/cache/Reflector.java | 18 +++++++----------- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java index 74b5cbfff16..c1620330165 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java @@ -164,7 +164,7 @@ public CompletableFuture start() { return reflector.start().whenComplete((v, t) -> { // stop called while run is called could be ineffective, check for it afterwards synchronized (this) { - if (stopped) { + if (stopped && reflector.isRunning()) { stop(); } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java index ef336ae6db1..627051a48fb 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java @@ -29,10 +29,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Optional; import java.util.Set; -import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -76,18 +75,15 @@ public void stop() { } private synchronized void stopWatcher() { - if (watchFuture != null) { - watchFuture.cancel(true); - try { - Watch w = watchFuture.getNow(null); + Optional.ofNullable(watchFuture).ifPresent(theFuture -> { + watchFuture = null; + theFuture.cancel(true); + theFuture.whenComplete((w, t) -> { if (w != null) { stopWatch(w); } - } catch (CompletionException | CancellationException e) { - // do nothing - } - watchFuture = null; - } + }); + }); } /** From cbbe59f7d542c12391bf350d94df558a621bf6e5 Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Wed, 7 Sep 2022 21:29:30 -0400 Subject: [PATCH 8/8] just logging resourceeventhandler exceptions also stopping messages when a watch closes --- .../dsl/internal/AbstractWatchManager.java | 10 ++++++++-- .../dsl/internal/WatchConnectionManager.java | 19 ++++++++++++++----- .../client/dsl/internal/WatchHTTPManager.java | 12 +++++++----- .../internal/WatcherWebSocketListener.java | 1 - .../kubernetes/client/mock/ResourceTest.java | 11 ++++++++++- 5 files changed, 39 insertions(+), 14 deletions(-) diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java index e3c9a5454f5..0500300f42b 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java @@ -189,7 +189,13 @@ void eventReceived(Watcher.Action action, HasMetadata resource) { } @SuppressWarnings("unchecked") final T t = (T) resource; - watcher.eventReceived(action, t); + try { + watcher.eventReceived(action, t); + } catch (Exception e) { + // for compatibility, this will just log the exception as was done in previous versions + // a case could be made for this to terminate the watch instead + logger.error("Unhandled exception encountered in watcher event handler", e); + } } void updateResourceVersion(final String newResourceVersion) { @@ -311,7 +317,7 @@ protected void onMessage(String message) { final String msg = "Couldn't deserialize watch event: " + message; close(new WatcherException(msg, e, message)); } catch (Exception e) { - final String msg = "Unhandled exception encountered in watcher event handler"; + final String msg = "Unexpected exception processing watch event"; close(new WatcherException(msg, e, message)); } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java index 1f1af781053..3404f802be3 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java @@ -33,6 +33,7 @@ import java.net.URI; import java.net.URL; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -50,7 +51,7 @@ public class WatchConnectionManager listener; - private CompletableFuture websocketFuture; + private volatile CompletableFuture websocketFuture; private WebSocket websocket; private volatile boolean ready; @@ -87,14 +88,14 @@ public WatchConnectionManager(final HttpClient client, final BaseOperation { + Optional.ofNullable(this.websocketFuture).ifPresent(theFuture -> { + this.websocketFuture = null; + theFuture.whenComplete((w, t) -> { if (w != null) { closeWebSocket(w); } }); - websocketFuture = null; - } + }); } synchronized WatcherWebSocketListener getListener() { @@ -105,6 +106,14 @@ public CompletableFuture getWebsocketFuture() { return websocketFuture; } + @Override + protected void onMessage(String message) { + // for consistency we only want to process the message when we're open + if (this.websocketFuture != null) { + super.onMessage(message); + } + } + @Override protected void start(URL url, Map headers) { this.listener = new WatcherWebSocketListener<>(this); diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java index eaf94fc2a51..3bfd38c0efc 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java @@ -29,12 +29,14 @@ import java.net.MalformedURLException; import java.net.URL; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; public class WatchHTTPManager> extends AbstractWatchManager { private static final Logger logger = LoggerFactory.getLogger(WatchHTTPManager.class); private CompletableFuture> call; + private volatile AsyncBody body; public WatchHTTPManager(final HttpClient client, final BaseOperation baseOperation, @@ -74,7 +76,7 @@ protected synchronized void start(URL url, Map headers) { scheduleReconnect(); } if (response != null) { - AsyncBody body = response.body(); + body = response.body(); if (!response.isSuccessful()) { body.cancel(); if (onStatus(OperationSupport.createStatus(response.code(), response.message()))) { @@ -101,9 +103,9 @@ protected synchronized void start(URL url, Map headers) { @Override protected synchronized void closeRequest() { - if (call != null) { - call.cancel(true); - call = null; - } + Optional.ofNullable(call).ifPresent(theFuture -> { + theFuture.cancel(true); + }); + Optional.ofNullable(body).ifPresent(AsyncBody::cancel); } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java index 51d99d170b4..29ebee897b4 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java @@ -57,7 +57,6 @@ public void onMessage(WebSocket webSocket, String text) { } finally { webSocket.request(); } - webSocket.request(); } @Override diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ResourceTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ResourceTest.java index 25d12c76613..e17bcacedf1 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ResourceTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ResourceTest.java @@ -444,7 +444,7 @@ void testWaitUntilCondition() throws InterruptedException { } @Test - void tesErrorEventDuringWaitReturnFromAPIIfMatch() throws InterruptedException { + void testErrorEventDuringWaitReturnFromAPIIfMatch() throws InterruptedException { Pod pod1 = new PodBuilder().withNewMetadata() .withName("pod1") .withResourceVersion("1") @@ -470,6 +470,15 @@ void tesErrorEventDuringWaitReturnFromAPIIfMatch() throws InterruptedException { .open() .waitFor(500) .andEmit(new WatchEvent(status, "ERROR")) + .done() + .once(); + + server.expect() + .get() + .withPath( + "/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&allowWatchBookmarks=true&watch=true") + .andUpgradeToWebSocket() + .open() .waitFor(500) .andEmit(new WatchEvent(ready, "MODIFIED")) .done()