Skip to content

Commit

Permalink
Move creation of single threaded executor back into ThingSearchSubscr…
Browse files Browse the repository at this point in the history
…iption for spec conformance (Rule 1.03)

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Aug 8, 2022
1 parent 2e9bb2a commit 103ed26
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,10 @@

import java.time.Duration;
import java.util.Collections;
import java.util.Objects;
import java.util.Spliterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -33,8 +30,6 @@
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import javax.annotation.Nullable;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
Expand All @@ -59,7 +54,6 @@ public final class SpliteratorSubscriber<T> implements Subscriber<T>, Spliterato
private final AtomicInteger splits;
private final AtomicInteger quota;
private final AtomicBoolean cancelled;
@Nullable private volatile ExecutorService subscriptionExecutor;

private SpliteratorSubscriber(final long timeoutMillis, final int bufferSize, final int batchSize) {
// reserve 2*bufferSize+1 space in buffer for <=bufferSize elements and bufferSize+1 EOS markers
Expand Down Expand Up @@ -116,18 +110,14 @@ public void onSubscribe(final Subscription s) {
previousSubscription = subscription.get();
if (previousSubscription == null) {
subscription.set(s);
if (s instanceof ThingSearchSubscription) {
subscriptionExecutor = Executors.newSingleThreadExecutor();
((ThingSearchSubscription) s).setSingleThreadedExecutor(subscriptionExecutor);
}
}
}
if (previousSubscription == null) {
LOGGER.trace("Initial request: <{}>", capacity);
s.request(capacity);
} else {
LOGGER.warn("onSubscribe() called a second time; cancelling subscription <{}>.", s);
s.cancel();
cancelSubscription(s);
}
}

Expand All @@ -153,12 +143,15 @@ public void onComplete() {

private void goToCancelledState() {
if (!cancelled.getAndSet(true)) {
if (subscriptionExecutor != null) {
Objects.requireNonNull(subscriptionExecutor).shutdown();
}
ThingSearchSubscription.terminate(subscription.get());
}
}

private void cancelSubscription(final Subscription s) {
s.cancel();
ThingSearchSubscription.terminate(s);
}

// always cancel the stream on error thrown, because user code catching the error is outside
// the element handling code and should consider this spliterator "used up."
// as a precaution, the error is propagated to all threads reading from this spliterator.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
package org.eclipse.ditto.client.streaming;

import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -54,7 +56,7 @@ public final class ThingSearchSubscription implements Subscription {
private final Subscriber<? super SubscriptionHasNextPage> subscriber;
private final AtomicBoolean cancelled;
private final AtomicReference<AdaptableBus.SubscriptionId> busSubscription;
@Nullable private volatile ExecutorService singleThreadedExecutor;
private final ExecutorService singleThreadedExecutorService;

private ThingSearchSubscription(final String subscriptionId,
final ProtocolAdapter protocolAdapter,
Expand All @@ -66,6 +68,8 @@ private ThingSearchSubscription(final String subscriptionId,
this.subscriber = subscriber;
cancelled = new AtomicBoolean(false);
busSubscription = new AtomicReference<>();

singleThreadedExecutorService = Executors.newSingleThreadExecutor();
}

/**
Expand All @@ -89,10 +93,29 @@ public static void start(final SubscriptionCreated event,
thingSearchSubscription.startForwarding();
}

/**
* Terminate the executor of any {@code ThingSearchSubscription}, or do nothing if the subscription is not a
* {@code ThingSearchSubscription}.
* After upgrading to Java 9, it is better to replace this method by the {@code Cleaner} interface.
*
* @param subscription the subscription.
* @since 3.0.0
*/
public static void terminate(@Nullable final Subscription subscription) {
if (subscription instanceof ThingSearchSubscription) {
final ThingSearchSubscription s = (ThingSearchSubscription) subscription;
try {
s.singleThreadedExecutorService.submit(s.singleThreadedExecutorService::shutdown);
} catch (final RejectedExecutionException e) {
// executor already shut down
}
}
}

// called by subscriber
@Override
public void request(final long n) {
singleThreaded(() -> {
singleThreadedExecutorService.submit(() -> {
if (n <= 0) {
doCancel();
subscriber.onError(new IllegalArgumentException("Expect positive demand, got: " + n));
Expand All @@ -110,26 +133,9 @@ public void request(final long n) {
// called by subscriber
@Override
public void cancel() {
singleThreaded(this::doCancel);
}

/**
* Set the single-threaded executor of this subscription.
* Subscription methods run in the executor in order to maintain element order.
* Creating the executor within this class is not possible because the garbage collector may not stop the executor.
*
* @param singleThreadedExecutor The single-threaded executor.
*/
public void setSingleThreadedExecutor(final ExecutorService singleThreadedExecutor) {
// TODO: After upgrading to Java 9, consider using the Cleaner interface instead.
this.singleThreadedExecutor = singleThreadedExecutor;
}

private void singleThreaded(final Runnable runnable) {
if (singleThreadedExecutor != null) {
Objects.requireNonNull(singleThreadedExecutor).submit(runnable);
} else {
runnable.run();
if (!singleThreadedExecutorService.isShutdown() && !singleThreadedExecutorService.isTerminated()) {
CompletableFuture.runAsync(this::doCancel, singleThreadedExecutorService)
.whenComplete((result, error) -> singleThreadedExecutorService.shutdownNow());
}
}

Expand All @@ -142,18 +148,19 @@ private void doCancel() {

// called by bus
private void onTimeout(final Throwable timeoutError) {
singleThreaded(() -> {
singleThreadedExecutorService.submit(() -> {
if (!cancelled.getAndSet(true)) {
// bus subscription already cancelled
// trust back-end to free resources on its own
subscriber.onError(timeoutError);
}
});
singleThreadedExecutorService.shutdown();
}

// called by bus
private void onNext(final Adaptable adaptable) {
singleThreaded(() -> {
singleThreadedExecutorService.submit(() -> {
LOGGER.trace("Received from bus: <{}>", adaptable);
handleAdaptable(adaptable);
});
Expand Down

0 comments on commit 103ed26

Please sign in to comment.