Skip to content

Commit

Permalink
fix: catch more possible errors in AssigningSubscriber and don't have…
Browse files Browse the repository at this point in the history
… hard errors on stopping subscribers (#880)

Possible improvement/fix for #871, unclear as I've been unable to reproduce.
  • Loading branch information
dpcollins-google committed Sep 15, 2021
1 parent 23a655b commit 22db237
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ private void handleAssignment(Set<Partition> assignment) {
if (!liveSubscriberMap.containsKey(partition)) startSubscriber(partition);
}
}
} catch (CheckedApiException e) {
onPermanentError(e);
} catch (Throwable t) {
onPermanentError(toCanonical(t));
}
}

Expand All @@ -106,6 +106,7 @@ private void startSubscriber(Partition partition) throws CheckedApiException {
new Listener() {
@Override
public void failed(State from, Throwable failure) {
if (State.STOPPING.equals(from)) return;
onPermanentError(toCanonical(failure));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.google.cloud.pubsublite.internal.testing.RetryingConnectionHelpers.whenTerminated;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
Expand Down Expand Up @@ -79,6 +80,15 @@ public void startStop() {
verify(assigner).stopAsync();
}

@Test
public void failedCreate() throws CheckedApiException {
when(subscriberFactory.newSubscriber(Partition.of(1)))
.thenThrow(new RuntimeException("Arbitrary error."));
leakedReceiver.handleAssignment(ImmutableSet.of(Partition.of(1)));
verify(subscriberFactory).newSubscriber(Partition.of(1));
assertThrows(IllegalStateException.class, assigningSubscriber::awaitTerminated);
}

@Test
public void createSubscribers() throws CheckedApiException {
Subscriber sub1 = spy(FakeSubscriber.class);
Expand Down

0 comments on commit 22db237

Please sign in to comment.