Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi discrepancies #2413

Merged
merged 3 commits into from Oct 7, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -23,6 +23,7 @@

/**
* Buffers a single item and emits it to the downstream when it requests.
*
* @param <T> the item type buffered
*/
class DeferredScalarSubscription<T> extends AtomicInteger implements Flow.Subscription {
Expand All @@ -31,91 +32,66 @@ class DeferredScalarSubscription<T> extends AtomicInteger implements Flow.Subscr

private T value;

static final int NO_VALUE_NO_REQUEST = 0;
static final int NO_VALUE_HAS_REQUEST = 1;
static final int HAS_VALUE_NO_REQUEST = 2;
static final int HAS_VALUE_HAS_REQUEST = 3;
static final int COMPLETE = 4;
static final int CANCELED = 5;
static final int REQUEST_ARRIVED = 1;
static final int VALUE_ARRIVED = 2;
static final int DONE = REQUEST_ARRIVED | VALUE_ARRIVED;


DeferredScalarSubscription(Flow.Subscriber<? super T> downstream) {
this.downstream = downstream;
}

@Override
public void cancel() {
if (getAndSet(CANCELED) != CANCELED) {
if (getAndSet(DONE) != DONE) {
value = null;
}
}

@Override
public final void request(long n) {
if (n <= 0L) {
if (getAndSet(CANCELED) != CANCELED) {
if (getAndSet(DONE) != DONE) {
value = null;
downstream.onError(
new IllegalArgumentException("Rule §3.9 violated: non-positive requests are forbidden"));
}
} else {
for (;;) {
int state = get();
if (state == HAS_VALUE_NO_REQUEST) {
T v = value;
value = null;
if (compareAndSet(HAS_VALUE_NO_REQUEST, HAS_VALUE_HAS_REQUEST)) {
downstream.onNext(v);
if (compareAndSet(HAS_VALUE_HAS_REQUEST, COMPLETE)) {
downstream.onComplete();
}
break;
}
} else if (state == NO_VALUE_NO_REQUEST) {
if (compareAndSet(NO_VALUE_NO_REQUEST, NO_VALUE_HAS_REQUEST)) {
break;
}
} else {
// state == COMPLETE
// state == HAS_VALUE_HAS_REQUEST
// state == NO_VALUE_HAS_REQUEST
// state == CANCELED
break;
}
}
return;
}

int state;
T v;
do {
state = get();
v = value;
} while (!compareAndSet(state, state | REQUEST_ARRIVED));

if (state == VALUE_ARRIVED) {
value = null;
downstream.onNext(v);
downstream.onComplete();
}
}

/**
* Signal the only item if possible or save it for later when there
* is a request for it.
* <p>
* This method should be called at most once and from only one thread.
* This method should be called at most once and from only one thread.
* </p>
*
* @param item the item to signal and then complete the downstream
*/
public final void complete(T item) {
for (;;) {
int state = get();
if (state == NO_VALUE_HAS_REQUEST) {
if (compareAndSet(NO_VALUE_HAS_REQUEST, HAS_VALUE_HAS_REQUEST)) {
downstream.onNext(item);
if (compareAndSet(HAS_VALUE_HAS_REQUEST, COMPLETE)) {
downstream.onComplete();
}
break;
}
} else if (state == NO_VALUE_NO_REQUEST) {
value = item;
if (compareAndSet(NO_VALUE_NO_REQUEST, HAS_VALUE_NO_REQUEST)) {
break;
}
value = null;
} else {
// state == COMPLETE
// state == HAS_VALUE_NO_REQUEST
// state == HAS_VALUE_HAS_REQUEST
// state == CANCELED
break;
}
value = item; // assert: even if the race occurs, we will deliver one of the items with which complete()
// has been invoked - we support only the case with a single invocation of complete()
int state = getAndUpdate(n -> n | VALUE_ARRIVED);
if (state == REQUEST_ARRIVED) {
value = null;
downstream.onNext(item);
downstream.onComplete();
} else if (state == DONE) {
value = null;
}
}

Expand All @@ -128,6 +104,7 @@ protected final void subscribeSelf() {

/**
* Returns the downstream reference.
*
* @return the downstream reference
*/
protected final Flow.Subscriber<? super T> downstream() {
Expand All @@ -138,34 +115,21 @@ protected final Flow.Subscriber<? super T> downstream() {
* Complete the downstream without emitting any items.
*/
public final void complete() {
for (;;) {
int state = get();
if (state == NO_VALUE_NO_REQUEST || state == NO_VALUE_HAS_REQUEST) {
if (compareAndSet(state, COMPLETE)) {
downstream.onComplete();
return;
}
} else {
break;
}
int state = get();
if (((state & VALUE_ARRIVED) != VALUE_ARRIVED) && compareAndSet(state, DONE)){
downstream.onComplete();
}
}

/**
* Signal error to the downstream without emitting any items.
*
* @param throwable the error to signal
*/
public final void error(Throwable throwable) {
for (;;) {
int state = get();
if (state == NO_VALUE_NO_REQUEST || state == NO_VALUE_HAS_REQUEST) {
if (compareAndSet(state, COMPLETE)) {
downstream.onError(throwable);
return;
}
} else {
break;
}
if (getAndSet(DONE) != DONE) {
value = null;
downstream.onError(throwable);
}
}

Expand Down
Expand Up @@ -39,7 +39,7 @@ public void subscribe(Flow.Subscriber<? super T> subscriber) {
}

static final class ConcatArraySubscriber<T> extends SubscriptionArbiter
implements Flow.Subscriber<T> {
implements Flow.Subscriber<T> {

private final Flow.Subscriber<? super T> downstream;

Expand Down Expand Up @@ -98,6 +98,7 @@ public void nextSource() {
@Override
public void request(long n) {
if (n <= 0) {
cancel();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not sufficient for the problem that has been discovered. We'll have to address it better.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@olotenko To be clear, your point is that the sequence onError onNext is still possible as cancellation is only a best-effort task.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you can view this as the best effort so far. But like I mentioned offline, there is another issue that needs addressing, so we may just as well postpone touching this class.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, sounds like for a future PR.

downstream.onError(new IllegalArgumentException("Rule §3.9 violated: non-positive requests are forbidden"));
} else {
super.request(n);
Expand Down
Expand Up @@ -15,14 +15,14 @@
*/
package io.helidon.common.reactive;

import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import org.junit.jupiter.api.Test;

public class MultiConcatArrayTest {

Expand All @@ -31,11 +31,26 @@ public void errors() {
TestSubscriber<Object> ts = new TestSubscriber<>(Long.MAX_VALUE);

Multi.concatArray(Multi.singleton(1), Multi.error(new IOException()), Multi.singleton(2))
.subscribe(ts);
.subscribe(ts);

ts.assertFailure(IOException.class, 1);
}

@Test
public void cancelOnNegativeNumberTest() throws InterruptedException, TimeoutException, ExecutionException {
TestSubscriber<Integer> ts = new TestSubscriber<>();

CompletableFuture<Void> cancelled = new CompletableFuture<>();

Multi.concatArray(Multi.just(1, 2).onCancel(() -> cancelled.complete(null)), Multi.just(3, 4))
.subscribe(ts);
ts.request(1);
ts.getSubcription().request(-1);

cancelled.get(200, TimeUnit.MILLISECONDS);
ts.assertFailure(IllegalArgumentException.class, 1);
}

@Test
public void millionSources() {
@SuppressWarnings("unchecked")
Expand Down