Skip to content

Commit

Permalink
feat: The TopicConfigWatcher polls the topic config and calls a handl…
Browse files Browse the repository at this point in the history
…er whenever it changes (#377)
  • Loading branch information
palmere-google committed Nov 19, 2020
1 parent 8c7ebd0 commit b98c501
Show file tree
Hide file tree
Showing 4 changed files with 265 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.pubsublite.internal.wire;

import com.google.api.core.ApiService;
import java.util.function.Consumer;

public interface PartitionCountWatcher extends ApiService {
interface Factory {
PartitionCountWatcher newWatcher(Consumer<Long> receiver);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.pubsublite.internal.wire;

import com.google.api.core.AbstractApiService;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.common.flogger.GoogleLogger;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.*;
import java.util.function.Consumer;

public class PartitionCountWatcherImpl extends AbstractApiService implements PartitionCountWatcher {
private static final GoogleLogger log = GoogleLogger.forEnclosingClass();
private final Duration period;
private final TopicPath topicPath;
private final AdminClient adminClient;
private final ScheduledExecutorService executorService;
private final Consumer<Long> partitionCountReceiver;

private ScheduledFuture<?> partitionCountPoll;
private Optional<Long> currentPartitionCount = Optional.empty();

public static class Factory implements PartitionCountWatcher.Factory {
private final TopicPath topicPath;
private final AdminClient adminClient;
private final Duration period;

public Factory(TopicPath topicPath, AdminClient adminClient, Duration period) {
this.topicPath = topicPath;
this.adminClient = adminClient;
this.period = period;
}

@Override
public PartitionCountWatcher newWatcher(Consumer<Long> receiver) {
return new PartitionCountWatcherImpl(topicPath, adminClient, receiver, period);
}
}

private PartitionCountWatcherImpl(
TopicPath topicPath, AdminClient adminClient, Consumer<Long> receiver, Duration period) {
this.period = period;
this.topicPath = topicPath;
this.adminClient = adminClient;
this.partitionCountReceiver = receiver;
this.executorService = Executors.newSingleThreadScheduledExecutor();
}

private void pollTopicConfig() {
Long partitionCount;
try {
partitionCount = adminClient.getTopicPartitionCount(topicPath).get();
} catch (InterruptedException | ExecutionException e) {
// If we encounter an exception on our first topic config poll, then fail. We need to fetch
// the topic
// config at least once to start up properly.
if (!currentPartitionCount.isPresent()) {
notifyFailed(ExtractStatus.toCanonical(e.getCause()));
stop();
}
log.atWarning().withCause(e).log("Failed to refresh partition count");
return;
}
if (currentPartitionCount.isPresent() && currentPartitionCount.get().equals(partitionCount)) {
return;
}
partitionCountReceiver.accept(partitionCount);
// Notify started after we successfully receive the config once.
if (!currentPartitionCount.isPresent()) {
notifyStarted();
}
currentPartitionCount = Optional.of(partitionCount);
}

private void stop() {
partitionCountPoll.cancel(true);
adminClient.close();
}

@Override
protected void doStart() {
partitionCountPoll =
executorService.scheduleAtFixedRate(
this::pollTopicConfig, 0, period.toMillis(), TimeUnit.MILLISECONDS);
}

@Override
protected void doStop() {
try {
stop();
notifyStopped();
} catch (Exception e) {
notifyFailed(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ public boolean matches(Throwable argument) {

public static void assertFutureThrowsCode(Future<?> f, Code code) {
ExecutionException exception = assertThrows(ExecutionException.class, f::get);
Optional<CheckedApiException> statusOr = ExtractStatus.extract(exception.getCause());
assertThrowableMatches(exception.getCause(), code);
}

public static void assertThrowableMatches(Throwable t, Code code) {
Optional<CheckedApiException> statusOr = ExtractStatus.extract(t);
assertThat(statusOr).isPresent();
assertThat(statusOr.get().code()).isEqualTo(code);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.pubsublite.internal.wire;

import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.*;
import static org.mockito.MockitoAnnotations.initMocks;

import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.StatusCode;
import com.google.cloud.pubsublite.*;
import com.google.cloud.pubsublite.internal.ApiExceptionMatcher;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import java.time.Duration;
import java.util.function.Consumer;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.Mockito;

@RunWith(JUnit4.class)
public class PartitionCountWatcherImplTest {
private static final CloudRegion REGION = CloudRegion.of("us-east1");

private static TopicPath path() {
return TopicPath.newBuilder()
.setName(TopicName.of("a"))
.setProject(ProjectNumber.of(4))
.setLocation(CloudZone.of(REGION, 'a'))
.build();
}

PartitionCountWatcher.Factory watcherFactory;
@Mock AdminClient mockClient;
@Mock Consumer<Long> mockConsumer;

@Before
public void setUp() {
initMocks(this);
watcherFactory =
new PartitionCountWatcherImpl.Factory(path(), mockClient, Duration.ofMillis(10));
}

@Test
public void testFirstCallFails() {
when(mockClient.getTopicPartitionCount(path()))
.thenReturn(
ApiFutures.immediateFailedFuture(
new CheckedApiException(StatusCode.Code.FAILED_PRECONDITION).underlying));
PartitionCountWatcher watcher = watcherFactory.newWatcher(mockConsumer);
watcher.startAsync();
assertThrows(IllegalStateException.class, watcher::awaitTerminated);
ApiExceptionMatcher.assertThrowableMatches(
watcher.failureCause(), StatusCode.Code.FAILED_PRECONDITION);
verify(mockClient, times(1)).getTopicPartitionCount(path());
}

@Test
public void testCallsHandlerOnStart() {
when(mockClient.getTopicPartitionCount(path())).thenReturn(ApiFutures.immediateFuture(1L));
PartitionCountWatcher watcher = watcherFactory.newWatcher(mockConsumer);
watcher.startAsync();
verify(mockConsumer, after(1000)).accept(1L);
verifyNoMoreInteractions(mockConsumer);
}

@Test
public void testHandlerCalledOnUpdates() {
when(mockClient.getTopicPartitionCount(path()))
.thenReturn(ApiFutures.immediateFuture(1L))
.thenReturn(ApiFutures.immediateFuture(1L))
.thenReturn(ApiFutures.immediateFuture(2L));
PartitionCountWatcher watcher = watcherFactory.newWatcher(mockConsumer);
watcher.startAsync();
verify(mockClient, after(1000).atLeast(3)).getTopicPartitionCount(path());
verify(mockConsumer, after(1000)).accept(1L);
verify(mockConsumer, after(1000)).accept(2L);
verifyNoMoreInteractions(mockConsumer);
}

@Test
public void testFailuresAfterFirstSuccessIgnored() {
when(mockClient.getTopicPartitionCount(path()))
.thenReturn(ApiFutures.immediateFuture(1L))
.thenReturn(
ApiFutures.immediateFailedFuture(
new CheckedApiException(StatusCode.Code.FAILED_PRECONDITION)))
.thenReturn(ApiFutures.immediateFuture(2L));
PartitionCountWatcher watcher = watcherFactory.newWatcher(mockConsumer);
watcher.startAsync();
verify(mockClient, after(1000).atLeast(3)).getTopicPartitionCount(path());
verify(mockConsumer, after(1000)).accept(1L);
verify(mockConsumer, after(1000)).accept(2L);
verifyNoMoreInteractions(mockConsumer);
}

@Test
public void testStopPreventsFutureCalls() {
when(mockClient.getTopicPartitionCount(path())).thenReturn(ApiFutures.immediateFuture(1L));
PartitionCountWatcher watcher = watcherFactory.newWatcher(mockConsumer);
watcher.startAsync();
watcher.stopAsync();
watcher.awaitTerminated();
verify(mockClient, after(1000).atLeast(1)).getTopicPartitionCount(path());
Mockito.reset(mockClient);
verify(mockClient, after(20).never()).getTopic(any());
}
}

0 comments on commit b98c501

Please sign in to comment.