Skip to content

Commit

Permalink
fix #4794: clarifying event processing wrt to a manual informer stop
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins authored and manusa committed Feb 1, 2023
1 parent c8266bf commit 237ad00
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 6.5-SNAPSHOT

#### Bugs
* Fix #4794: improving the semantics of manually calling informer stop

#### Improvements
* Fix #4800: [java-generator] Reflect the `scope` field when implementing the `Namespaced` interface
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ SharedIndexInformer<T> addEventHandlerWithResyncPeriod(ResourceEventHandler<? su

/**
* Stops the shared informer. The informer cannot be started again.
* <p>
* Once this call completes the informer will stop processing events, but the underlying watch closure may not yet be
* completed
*/
void stop();

Expand Down Expand Up @@ -184,7 +187,8 @@ default boolean hasSynced() {
SharedIndexInformer<T> exceptionHandler(ExceptionHandler handler);

/**
* Return a {@link CompletionStage} that will allow notification of the informer stopping.
* Return a {@link CompletionStage} that will allow notification of the informer stopping. This will be completed after
* event processing has stopped.
* <p>
* If {@link #stop()} is called, the CompletionStage will complete normally.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,13 @@ public CompletableFuture<Void> getStartFuture() {
}

public void stop() {
stopFuture.complete(null);
startFuture.completeExceptionally(new KubernetesClientException("informer manually stopped before starting"));
Future<?> future = reconnectFuture;
if (future != null) {
future.cancel(true);
}
stopWatcher();
stopFuture.complete(null);
}

private synchronized void stopWatcher() {
Expand Down Expand Up @@ -220,6 +220,9 @@ class ReflectorWatcher implements Watcher<T> {

@Override
public void eventReceived(Action action, T resource) {
if (stopFuture.isDone()) {
return;
}
if (action == null) {
throw new KubernetesClientException("Unrecognized event for " + Reflector.this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
package io.fabric8.kubernetes.client.informers.impl.cache;

import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.PodListBuilder;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher.Action;
import io.fabric8.kubernetes.client.WatcherException;
import io.fabric8.kubernetes.client.informers.impl.ListerWatcher;
import org.junit.jupiter.api.Test;
Expand All @@ -32,6 +34,7 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.times;

class ReflectorTest {

Expand All @@ -41,7 +44,8 @@ void testStateFlags() {
PodList list = new PodListBuilder().withNewMetadata().withResourceVersion("1").endMetadata().build();
Mockito.when(mock.submitList(Mockito.any())).thenReturn(CompletableFuture.completedFuture(list));

Reflector<Pod, PodList> reflector = new Reflector<Pod, PodList>(mock, Mockito.mock(SyncableStore.class)) {
SyncableStore<Pod> mockStore = Mockito.mock(SyncableStore.class);
Reflector<Pod, PodList> reflector = new Reflector<Pod, PodList>(mock, mockStore) {
@Override
protected void reconnect() {
// do nothing
Expand Down Expand Up @@ -74,8 +78,17 @@ protected void reconnect() {
assertTrue(future.isDone());
assertTrue(!future.isCompletedExceptionally());

reflector.getStopFuture().whenComplete((t, v) -> {
// try to process an event once stopped
reflector.getWatcher().eventReceived(Action.ADDED,
new PodBuilder().withNewMetadata().withResourceVersion("1").endMetadata().build());
});

reflector.stop();

// verify no event processing after stopped
Mockito.verify(mockStore, times(0)).add(Mockito.any());

assertFalse(reflector.isWatching());
assertTrue(reflector.isStopped());
assertTrue(reflector.getStopFuture().isDone());
Expand Down

0 comments on commit 237ad00

Please sign in to comment.