Skip to content

Commit

Permalink
Revert concat array changes
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Kec <daniel.kec@oracle.com>
  • Loading branch information
danielkec committed Oct 7, 2020
1 parent 693ad01 commit d3cb64c
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 23 deletions.
Expand Up @@ -39,7 +39,7 @@ public void subscribe(Flow.Subscriber<? super T> subscriber) {
}

static final class ConcatArraySubscriber<T> extends SubscriptionArbiter
implements Flow.Subscriber<T> {
implements Flow.Subscriber<T> {

private final Flow.Subscriber<? super T> downstream;

Expand Down Expand Up @@ -98,7 +98,6 @@ public void nextSource() {
@Override
public void request(long n) {
if (n <= 0) {
cancel();
downstream.onError(new IllegalArgumentException("Rule §3.9 violated: non-positive requests are forbidden"));
} else {
super.request(n);
Expand Down
Expand Up @@ -15,14 +15,14 @@
*/
package io.helidon.common.reactive;

import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsInstanceOf.instanceOf;

public class MultiConcatArrayTest {

Expand All @@ -31,26 +31,11 @@ public void errors() {
TestSubscriber<Object> ts = new TestSubscriber<>(Long.MAX_VALUE);

Multi.concatArray(Multi.singleton(1), Multi.error(new IOException()), Multi.singleton(2))
.subscribe(ts);
.subscribe(ts);

ts.assertFailure(IOException.class, 1);
}

@Test
public void cancelOnNegativeNumberTest() throws InterruptedException, TimeoutException, ExecutionException {
TestSubscriber<Integer> ts = new TestSubscriber<>();

CompletableFuture<Void> cancelled = new CompletableFuture<>();

Multi.concatArray(Multi.just(1, 2).onCancel(() -> cancelled.complete(null)), Multi.just(3, 4))
.subscribe(ts);
ts.request(1);
ts.getSubcription().request(-1);

cancelled.get(200, TimeUnit.MILLISECONDS);
ts.assertFailure(IllegalArgumentException.class, 1);
}

@Test
public void millionSources() {
@SuppressWarnings("unchecked")
Expand Down

0 comments on commit d3cb64c

Please sign in to comment.