Skip to content

Commit

Permalink
[Reactive] FIXMEs and signature corrections (#1579)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Mar 25, 2020
1 parent 72e97aa commit 2696085
Show file tree
Hide file tree
Showing 15 changed files with 84 additions and 143 deletions.
21 changes: 10 additions & 11 deletions common/reactive/src/main/java/io/helidon/common/reactive/Multi.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ static <T> Multi<T> defer(Supplier<? extends Flow.Publisher<? extends T>> suppli
* @return Multi
* @throws NullPointerException if mapper is {@code null}
*/
default <U> Multi<U> map(Mapper<T, U> mapper) {
default <U> Multi<U> map(Mapper<? super T, ? extends U> mapper) {
Objects.requireNonNull(mapper, "mapper is null");
return new MultiMapperPublisher<>(this, mapper);
}
Expand Down Expand Up @@ -99,7 +99,7 @@ default Multi<T> switchIfEmpty(Flow.Publisher<T> other) {
* @param consumer consumer to be invoked
* @return Multi
*/
default Multi<T> peek(Consumer<T> consumer) {
default Multi<T> peek(Consumer<? super T> consumer) {
return new MultiTappedPublisher<>(this, null, consumer,
null, null, null, null);
}
Expand All @@ -119,7 +119,7 @@ default Multi<T> distinct() {
* @param predicate predicate to filter stream with
* @return Multi
*/
default Multi<T> filter(Predicate<T> predicate) {
default Multi<T> filter(Predicate<? super T> predicate) {
return new MultiFilterPublisher<>(this, predicate);
}

Expand All @@ -131,7 +131,7 @@ default Multi<T> filter(Predicate<T> predicate) {
* @param predicate predicate to filter stream with
* @return Multi
*/
default Multi<T> takeWhile(Predicate<T> predicate) {
default Multi<T> takeWhile(Predicate<? super T> predicate) {
return new MultiTakeWhilePublisher<>(this, predicate);
}

Expand Down Expand Up @@ -175,7 +175,7 @@ default Multi<T> skip(long skip) {
* @param <U> output item type
* @return Multi
*/
default <U> Multi<U> flatMap(Function<T, Flow.Publisher<U>> publisherMapper) {
default <U> Multi<U> flatMap(Function<? super T, ? extends Flow.Publisher<? extends U>> publisherMapper) {
return new MultiFlatMapPublisher<>(this, publisherMapper, 32, 32, false);
}
/**
Expand All @@ -192,7 +192,8 @@ default <U> Multi<U> flatMap(Function<T, Flow.Publisher<U>> publisherMapper) {
* has been delivered
* @return Multi
*/
default <U> Multi<U> flatMap(Function<T, Flow.Publisher<U>> mapper, long maxConcurrency, boolean delayErrors, long prefetch) {
default <U> Multi<U> flatMap(Function<? super T, ? extends Flow.Publisher<? extends U>> mapper,
long maxConcurrency, boolean delayErrors, long prefetch) {
return new MultiFlatMapPublisher<>(this, mapper, maxConcurrency, prefetch, delayErrors);
}

Expand Down Expand Up @@ -256,7 +257,7 @@ default <U> Multi<U> flatMapIterable(Function<? super T, ? extends Iterable<? ex
*
* @param consumer consumer to be invoked for each item
*/
default void forEach(Consumer<T> consumer) {
default void forEach(Consumer<? super T> consumer) {
FunctionalSubscriber<T> subscriber = new FunctionalSubscriber<>(consumer, null, null, null);
this.subscribe(subscriber);
}
Expand Down Expand Up @@ -292,7 +293,7 @@ default <U> Single<U> collect(Collector<T, U> collector) {
* @return Single
* @throws NullPointerException if {@code collectionSupplier} or {@code combiner} is {@code null}
*/
default <U> Single<U> collect(Supplier<U> collectionSupplier, BiConsumer<U, T> accumulator) {
default <U> Single<U> collect(Supplier<? extends U> collectionSupplier, BiConsumer<U, T> accumulator) {
Objects.requireNonNull(collectionSupplier, "collectionSupplier is null");
Objects.requireNonNull(accumulator, "combiner is null");
return new MultiCollectPublisher<>(this, collectionSupplier, accumulator);
Expand Down Expand Up @@ -392,7 +393,6 @@ static <T> Multi<T> from(CompletionStage<T> completionStage, boolean nullMeansEm
* @return Multi
* @throws NullPointerException if source is {@code null}
*/
@SuppressWarnings("unchecked")
static <T> Multi<T> from(Publisher<T> source) {
if (source instanceof Multi) {
return (Multi<T>) source;
Expand Down Expand Up @@ -564,7 +564,7 @@ default Multi<T> onComplete(Runnable onComplete) {
* @param onErrorConsumer {@link java.util.function.Consumer} to be executed.
* @return Multi
*/
default Multi<T> onError(Consumer<Throwable> onErrorConsumer) {
default Multi<T> onError(Consumer<? super Throwable> onErrorConsumer) {
return new MultiTappedPublisher<>(this,
null,
null,
Expand Down Expand Up @@ -709,7 +709,6 @@ default Multi<T> timeout(long timeout, TimeUnit unit, ScheduledExecutorService e
return new MultiTimeout<>(this, timeout, unit, executor, null);
}


/**
* Switches to a fallback single if the upstream doesn't signal the next item, error
* or completion within the specified time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ final class MultiCollectPublisher<T, U> implements Single<U> {

private final Multi<T> source;

private final Supplier<U> collectionSupplier;
private final Supplier<? extends U> collectionSupplier;

private final BiConsumer<U, T> accumulator;

MultiCollectPublisher(Multi<T> source, Supplier<U> collectionSupplier, BiConsumer<U, T> combiner) {
MultiCollectPublisher(Multi<T> source, Supplier<? extends U> collectionSupplier, BiConsumer<U, T> combiner) {
this.source = source;
this.collectionSupplier = collectionSupplier;
this.accumulator = combiner;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package io.helidon.common.reactive;

import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Flow;
import java.util.function.Function;
Expand Down Expand Up @@ -58,11 +57,7 @@ static final class DistinctSubscriber<T, K> implements Flow.Subscriber<T>, Flow.

@Override
public void onSubscribe(Flow.Subscription subscription) {
Objects.requireNonNull(subscription, "subscription is null");
if (upstream != null) {
subscription.cancel();
throw new IllegalStateException("Subscription already set");
}
SubscriptionHelper.validate(upstream, subscription);
this.upstream = subscription;
downstream.onSubscribe(this);
}
Expand All @@ -71,7 +66,7 @@ public void onSubscribe(Flow.Subscription subscription) {
public void onNext(T item) {
Flow.Subscription s = upstream;
Set<K> m = memory;
if (s != null && m != null) {
if (s != SubscriptionHelper.CANCELED && m != null) {
boolean pass;
try {
pass = memory.add(keySelector.apply(item));
Expand All @@ -91,41 +86,32 @@ public void onNext(T item) {

@Override
public void onError(Throwable throwable) {
if (upstream != null) {
// FIXME better to set it to SubscriptionHelper.CANCELED
upstream = null;
if (upstream != SubscriptionHelper.CANCELED) {
upstream = SubscriptionHelper.CANCELED;
memory = null;
downstream.onError(throwable);
}
}

@Override
public void onComplete() {
if (upstream != null) {
// FIXME better to set it to SubscriptionHelper.CANCELED
upstream = null;
if (upstream != SubscriptionHelper.CANCELED) {
upstream = SubscriptionHelper.CANCELED;
memory = null;
downstream.onComplete();
}
}

@Override
public void request(long n) {
Flow.Subscription s = upstream;
if (s != null) {
s.request(n);
}
upstream.request(n);
}

@Override
public void cancel() {
Flow.Subscription s = upstream;
// FIXME better to set it to SubscriptionHelper.CANCELED
upstream = null;
memory = null;
if (s != null) {
s.cancel();
}
upstream.cancel();
upstream = SubscriptionHelper.CANCELED;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,23 +174,10 @@ void doError(Throwable throwable) {
@Override
public void request(long n) {
if (n <= 0L) {
doError(new IllegalArgumentException("Rule §3.9 violated: non-positive request amount is forbidded"));
doError(new IllegalArgumentException("Rule §3.9 violated: non-positive request amount is forbidden"));
} else {
// FIXME replace with SubscriptionHelper.addRequest
for (;;) {
long current = requested.get();
if (current == Long.MAX_VALUE) {
return;
}
long update = current + n;
if (update < 0L) {
update = Long.MAX_VALUE;
}
if (requested.compareAndSet(current, update)) {
drain();
return;
}
}
SubscriptionHelper.addRequest(requested, n);
drain();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void request(long n) {
iterator = null;
if (isCanceled == BAD_REQUEST) {
downstream.onError(new IllegalArgumentException(
"Rule §3.9 violated: non-positive request amount is forbidded"));
"Rule §3.9 violated: non-positive request amount is forbidden"));
}
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void request(long n) {
iterator = null;
if (isCanceled == BAD_REQUEST) {
downstream.onError(new IllegalArgumentException(
"Rule §3.9 violated: non-positive request amount is forbidded"));
"Rule §3.9 violated: non-positive request amount is forbidden"));
}
close();
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,10 @@ static final class LimitSubscriber<T> implements Flow.Subscriber<T>, Flow.Subscr

@Override
public void onSubscribe(Flow.Subscription subscription) {
Objects.requireNonNull(subscription, "subscription is null");
if (upstream != null) {
subscription.cancel();
throw new IllegalStateException("Subscription already set");
}
SubscriptionHelper.validate(upstream, subscription);
if (remaining == 0L) {
subscription.cancel();
// FIXME use SubscriptionHelper.CANCELED instead for clarity
upstream = null;
upstream = SubscriptionHelper.CANCELED;
downstream.onSubscribe(EmptySubscription.INSTANCE);
downstream.onComplete();
} else {
Expand All @@ -79,14 +74,13 @@ public void onSubscribe(Flow.Subscription subscription) {
@Override
public void onNext(T item) {
Flow.Subscription s = upstream;
if (s != null) {
if (s != SubscriptionHelper.CANCELED) {
long r = remaining - 1;
remaining = r;
downstream.onNext(item);
if (r == 0L) {
s.cancel();
// FIXME use SubscriptionHelper.CANCELED instead for clarity
upstream = null;
upstream = SubscriptionHelper.CANCELED;
downstream.onComplete();
}
}
Expand All @@ -95,38 +89,30 @@ public void onNext(T item) {
@Override
public void onError(Throwable throwable) {
Flow.Subscription s = upstream;
if (s != null) {
// FIXME use SubscriptionHelper.CANCELED instead for clarity
upstream = null;
if (s != SubscriptionHelper.CANCELED) {
upstream = SubscriptionHelper.CANCELED;
downstream.onError(throwable);
}
}

@Override
public void onComplete() {
Flow.Subscription s = upstream;
if (s != null) {
// FIXME use SubscriptionHelper.CANCELED instead for clarity
upstream = null;
if (s != SubscriptionHelper.CANCELED) {
upstream = SubscriptionHelper.CANCELED;
downstream.onComplete();
}
}

@Override
public void request(long n) {
Flow.Subscription s = upstream;
if (s != null) {
s.request(n);
}
upstream.request(n);
}

@Override
public void cancel() {
Flow.Subscription s = upstream;
upstream = null;
if (s != null) {
s.cancel();
}
upstream.cancel();
upstream = SubscriptionHelper.CANCELED;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ final class MultiMapperPublisher<T, R> implements Multi<R> {

private final Flow.Publisher<T> source;

private final Mapper<T, R> mapper;
private final Mapper<? super T, ? extends R> mapper;

MultiMapperPublisher(Flow.Publisher<T> source, Mapper<T, R> mapper) {
MultiMapperPublisher(Flow.Publisher<T> source, Mapper<? super T, ? extends R> mapper) {
this.source = source;
this.mapper = mapper;
}
Expand All @@ -45,11 +45,11 @@ static final class MapperSubscriber<T, R> implements Flow.Subscriber<T>, Flow.Su

private final Flow.Subscriber<? super R> downstream;

private final Mapper<T, R> mapper;
private final Mapper<? super T, ? extends R> mapper;

private Flow.Subscription upstream;

MapperSubscriber(Flow.Subscriber<? super R> downstream, Mapper<T, R> mapper) {
MapperSubscriber(Flow.Subscriber<? super R> downstream, Mapper<? super T, ? extends R> mapper) {
this.downstream = downstream;
this.mapper = mapper;
}
Expand All @@ -65,7 +65,7 @@ public void onSubscribe(Flow.Subscription subscription) {
public void onNext(T item) {
// in case the upstream doesn't stop immediately after a failed mapping
Flow.Subscription s = upstream;
if (s != null) {
if (s != SubscriptionHelper.CANCELED) {
R result;

try {
Expand All @@ -83,36 +83,30 @@ public void onNext(T item) {
@Override
public void onError(Throwable throwable) {
// if mapper.map fails above, the upstream may still emit an onError without request
if (upstream != null) {
upstream = null;
if (upstream != SubscriptionHelper.CANCELED) {
upstream = SubscriptionHelper.CANCELED;
downstream.onError(throwable);
}
}

@Override
public void onComplete() {
// if mapper.map fails above, the upstream may still emit an onComplete without request
if (upstream != null) {
upstream = null;
if (upstream != SubscriptionHelper.CANCELED) {
upstream = SubscriptionHelper.CANCELED;
downstream.onComplete();
}
}

@Override
public void request(long n) {
Flow.Subscription s = upstream;
if (s != null) {
s.request(n);
}
upstream.request(n);
}

@Override
public void cancel() {
Flow.Subscription s = upstream;
upstream = null;
if (s != null) {
s.cancel();
}
upstream.cancel();
upstream = SubscriptionHelper.CANCELED;
}
}
}
Loading

0 comments on commit 2696085

Please sign in to comment.