From 103ed265ae4301d1440dda5879b4ee11384e4f99 Mon Sep 17 00:00:00 2001 From: Yufei Cai Date: Mon, 8 Aug 2022 14:41:02 +0200 Subject: [PATCH] Move creation of single threaded executor back into ThingSearchSubscription for spec conformance (Rule 1.03) Signed-off-by: Yufei Cai --- .../streaming/SpliteratorSubscriber.java | 21 +++---- .../streaming/ThingSearchSubscription.java | 57 +++++++++++-------- 2 files changed, 39 insertions(+), 39 deletions(-) diff --git a/java/src/main/java/org/eclipse/ditto/client/streaming/SpliteratorSubscriber.java b/java/src/main/java/org/eclipse/ditto/client/streaming/SpliteratorSubscriber.java index 6dbe3d2d..dd0e012c 100644 --- a/java/src/main/java/org/eclipse/ditto/client/streaming/SpliteratorSubscriber.java +++ b/java/src/main/java/org/eclipse/ditto/client/streaming/SpliteratorSubscriber.java @@ -16,13 +16,10 @@ import java.time.Duration; import java.util.Collections; -import java.util.Objects; import java.util.Spliterator; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -33,8 +30,6 @@ import java.util.stream.Stream; import java.util.stream.StreamSupport; -import javax.annotation.Nullable; - import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import org.slf4j.Logger; @@ -59,7 +54,6 @@ public final class SpliteratorSubscriber implements Subscriber, Spliterato private final AtomicInteger splits; private final AtomicInteger quota; private final AtomicBoolean cancelled; - @Nullable private volatile ExecutorService subscriptionExecutor; private SpliteratorSubscriber(final long timeoutMillis, final int bufferSize, final int batchSize) { // reserve 2*bufferSize+1 space in buffer for <=bufferSize elements and bufferSize+1 EOS markers @@ -116,10 +110,6 @@ public void onSubscribe(final Subscription s) { previousSubscription = subscription.get(); if (previousSubscription == null) { subscription.set(s); - if (s instanceof ThingSearchSubscription) { - subscriptionExecutor = Executors.newSingleThreadExecutor(); - ((ThingSearchSubscription) s).setSingleThreadedExecutor(subscriptionExecutor); - } } } if (previousSubscription == null) { @@ -127,7 +117,7 @@ public void onSubscribe(final Subscription s) { s.request(capacity); } else { LOGGER.warn("onSubscribe() called a second time; cancelling subscription <{}>.", s); - s.cancel(); + cancelSubscription(s); } } @@ -153,12 +143,15 @@ public void onComplete() { private void goToCancelledState() { if (!cancelled.getAndSet(true)) { - if (subscriptionExecutor != null) { - Objects.requireNonNull(subscriptionExecutor).shutdown(); - } + ThingSearchSubscription.terminate(subscription.get()); } } + private void cancelSubscription(final Subscription s) { + s.cancel(); + ThingSearchSubscription.terminate(s); + } + // always cancel the stream on error thrown, because user code catching the error is outside // the element handling code and should consider this spliterator "used up." // as a precaution, the error is propagated to all threads reading from this spliterator. diff --git a/java/src/main/java/org/eclipse/ditto/client/streaming/ThingSearchSubscription.java b/java/src/main/java/org/eclipse/ditto/client/streaming/ThingSearchSubscription.java index ecfc13ee..193e0f51 100644 --- a/java/src/main/java/org/eclipse/ditto/client/streaming/ThingSearchSubscription.java +++ b/java/src/main/java/org/eclipse/ditto/client/streaming/ThingSearchSubscription.java @@ -13,8 +13,10 @@ package org.eclipse.ditto.client.streaming; import java.time.Duration; -import java.util.Objects; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -54,7 +56,7 @@ public final class ThingSearchSubscription implements Subscription { private final Subscriber subscriber; private final AtomicBoolean cancelled; private final AtomicReference busSubscription; - @Nullable private volatile ExecutorService singleThreadedExecutor; + private final ExecutorService singleThreadedExecutorService; private ThingSearchSubscription(final String subscriptionId, final ProtocolAdapter protocolAdapter, @@ -66,6 +68,8 @@ private ThingSearchSubscription(final String subscriptionId, this.subscriber = subscriber; cancelled = new AtomicBoolean(false); busSubscription = new AtomicReference<>(); + + singleThreadedExecutorService = Executors.newSingleThreadExecutor(); } /** @@ -89,10 +93,29 @@ public static void start(final SubscriptionCreated event, thingSearchSubscription.startForwarding(); } + /** + * Terminate the executor of any {@code ThingSearchSubscription}, or do nothing if the subscription is not a + * {@code ThingSearchSubscription}. + * After upgrading to Java 9, it is better to replace this method by the {@code Cleaner} interface. + * + * @param subscription the subscription. + * @since 3.0.0 + */ + public static void terminate(@Nullable final Subscription subscription) { + if (subscription instanceof ThingSearchSubscription) { + final ThingSearchSubscription s = (ThingSearchSubscription) subscription; + try { + s.singleThreadedExecutorService.submit(s.singleThreadedExecutorService::shutdown); + } catch (final RejectedExecutionException e) { + // executor already shut down + } + } + } + // called by subscriber @Override public void request(final long n) { - singleThreaded(() -> { + singleThreadedExecutorService.submit(() -> { if (n <= 0) { doCancel(); subscriber.onError(new IllegalArgumentException("Expect positive demand, got: " + n)); @@ -110,26 +133,9 @@ public void request(final long n) { // called by subscriber @Override public void cancel() { - singleThreaded(this::doCancel); - } - - /** - * Set the single-threaded executor of this subscription. - * Subscription methods run in the executor in order to maintain element order. - * Creating the executor within this class is not possible because the garbage collector may not stop the executor. - * - * @param singleThreadedExecutor The single-threaded executor. - */ - public void setSingleThreadedExecutor(final ExecutorService singleThreadedExecutor) { - // TODO: After upgrading to Java 9, consider using the Cleaner interface instead. - this.singleThreadedExecutor = singleThreadedExecutor; - } - - private void singleThreaded(final Runnable runnable) { - if (singleThreadedExecutor != null) { - Objects.requireNonNull(singleThreadedExecutor).submit(runnable); - } else { - runnable.run(); + if (!singleThreadedExecutorService.isShutdown() && !singleThreadedExecutorService.isTerminated()) { + CompletableFuture.runAsync(this::doCancel, singleThreadedExecutorService) + .whenComplete((result, error) -> singleThreadedExecutorService.shutdownNow()); } } @@ -142,18 +148,19 @@ private void doCancel() { // called by bus private void onTimeout(final Throwable timeoutError) { - singleThreaded(() -> { + singleThreadedExecutorService.submit(() -> { if (!cancelled.getAndSet(true)) { // bus subscription already cancelled // trust back-end to free resources on its own subscriber.onError(timeoutError); } }); + singleThreadedExecutorService.shutdown(); } // called by bus private void onNext(final Adaptable adaptable) { - singleThreaded(() -> { + singleThreadedExecutorService.submit(() -> { LOGGER.trace("Received from bus: <{}>", adaptable); handleAdaptable(adaptable); });