Skip to content

Commit

Permalink
Manually shutdown the executor of ThingSearchSubscription.
Browse files Browse the repository at this point in the history
Some JVMs do not shutdown single threaded executors when they
are garbage collected.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Aug 5, 2022
1 parent 35d06f1 commit 5bcb8ff
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,30 +130,34 @@ public void onNext(final T t) {
@Override
public void onError(final Throwable t) {
LOGGER.trace("onError", t);
cancelled.set(true);
cancel();
addErrors(t);
}

@Override
public void onComplete() {
LOGGER.trace("onComplete");
cancelled.set(true);
cancel();
addEos();
}

private void cancel() {
cancelled.set(true);
final Subscription s = subscription.get();
if (s != null) {
s.cancel();
}
}

// always cancel the stream on error thrown, because user code catching the error is outside
// the element handling code and should consider this spliterator "used up."
// as a precaution, the error is propagated to all threads reading from this spliterator.
private void cancelOnError(final Consumer<? super T> consumer, final T element) {
try {
consumer.accept(element);
} catch (final RuntimeException e) {
cancelled.set(true);
cancel();
addErrors(e);
final Subscription s = subscription.get();
if (s != null) {
s.cancel();
}
throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ private ThingSearchSubscription(final String subscriptionId,
cancelled = new AtomicBoolean(false);
busSubscription = new AtomicReference<>();

// not shutdown to handle queued messages; will be shutdown by garbage collector
singleThreadedExecutorService = Executors.newSingleThreadExecutor();
}

Expand Down Expand Up @@ -134,6 +133,7 @@ private void onTimeout(final Throwable timeoutError) {
subscriber.onError(timeoutError);
}
});
singleThreadedExecutorService.shutdown();
}

// called by bus
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.client.internal.AbstractDittoClientTest;
import org.eclipse.ditto.json.JsonArray;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.thingsearch.model.signals.commands.subscription.CreateSubscription;
import org.eclipse.ditto.thingsearch.model.signals.commands.subscription.RequestFromSubscription;
import org.eclipse.ditto.thingsearch.model.signals.events.SubscriptionComplete;
Expand All @@ -43,9 +44,9 @@ public void run() throws Exception {
final Publisher<SubscriptionHasNextPage> underTest =
ThingSearchPublisher.of(CreateSubscription.of(DittoHeaders.empty()), PROTOCOL_ADAPTER, messaging);
final SpliteratorSubscriber<SubscriptionHasNextPage> subscriber = SpliteratorSubscriber.of();
final ExecutorService executor = Executors.newSingleThreadExecutor();
final CompletableFuture<List<SubscriptionHasNextPage>> subscriberFuture =
CompletableFuture.supplyAsync(() -> subscriber.asStream().collect(Collectors.toList()),
Executors.newSingleThreadExecutor());
CompletableFuture.supplyAsync(() -> subscriber.asStream().collect(Collectors.toList()), executor);
underTest.subscribe(subscriber);
final CreateSubscription createSubscription = expectMsgClass(CreateSubscription.class);
final String subscriptionId = "subscription1234";
Expand All @@ -62,6 +63,7 @@ public void run() throws Exception {
final RequestFromSubscription futileRequest = expectMsgClass(RequestFromSubscription.class);
reply(SubscriptionComplete.of(subscriptionId, futileRequest.getDittoHeaders()));
subscriberFuture.get(1L, TimeUnit.SECONDS);
executor.shutdown();
assertThat(subscriberFuture).isCompletedWithValue(expectedResult);
}
}

0 comments on commit 5bcb8ff

Please sign in to comment.