Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi discrepancies #2413

Merged
merged 3 commits into from Oct 7, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -23,6 +23,7 @@

/**
* Buffers a single item and emits it to the downstream when it requests.
*
* @param <T> the item type buffered
*/
class DeferredScalarSubscription<T> extends AtomicInteger implements Flow.Subscription {
Expand All @@ -31,91 +32,66 @@ class DeferredScalarSubscription<T> extends AtomicInteger implements Flow.Subscr

private T value;

static final int NO_VALUE_NO_REQUEST = 0;
static final int NO_VALUE_HAS_REQUEST = 1;
static final int HAS_VALUE_NO_REQUEST = 2;
static final int HAS_VALUE_HAS_REQUEST = 3;
static final int COMPLETE = 4;
static final int CANCELED = 5;
static final int REQUEST_ARRIVED = 1;
static final int VALUE_ARRIVED = 2;
static final int DONE = REQUEST_ARRIVED | VALUE_ARRIVED;


DeferredScalarSubscription(Flow.Subscriber<? super T> downstream) {
this.downstream = downstream;
}

@Override
public void cancel() {
if (getAndSet(CANCELED) != CANCELED) {
if (getAndSet(DONE) != DONE) {
value = null;
}
}

@Override
public final void request(long n) {
if (n <= 0L) {
if (getAndSet(CANCELED) != CANCELED) {
if (getAndSet(DONE) != DONE) {
value = null;
downstream.onError(
new IllegalArgumentException("Rule §3.9 violated: non-positive requests are forbidden"));
}
} else {
for (;;) {
int state = get();
if (state == HAS_VALUE_NO_REQUEST) {
T v = value;
value = null;
if (compareAndSet(HAS_VALUE_NO_REQUEST, HAS_VALUE_HAS_REQUEST)) {
downstream.onNext(v);
if (compareAndSet(HAS_VALUE_HAS_REQUEST, COMPLETE)) {
downstream.onComplete();
}
break;
}
} else if (state == NO_VALUE_NO_REQUEST) {
if (compareAndSet(NO_VALUE_NO_REQUEST, NO_VALUE_HAS_REQUEST)) {
break;
}
} else {
// state == COMPLETE
// state == HAS_VALUE_HAS_REQUEST
// state == NO_VALUE_HAS_REQUEST
// state == CANCELED
break;
}
}
return;
}

int state;
T v;
do {
state = get();
v = value;
} while (!compareAndSet(state, state | REQUEST_ARRIVED));

if (state == VALUE_ARRIVED) {
value = null;
downstream.onNext(v);
downstream.onComplete();
}
}

/**
* Signal the only item if possible or save it for later when there
* is a request for it.
* <p>
* This method should be called at most once and from only one thread.
* This method should be called at most once and from only one thread.
* </p>
*
* @param item the item to signal and then complete the downstream
*/
public final void complete(T item) {
for (;;) {
int state = get();
if (state == NO_VALUE_HAS_REQUEST) {
if (compareAndSet(NO_VALUE_HAS_REQUEST, HAS_VALUE_HAS_REQUEST)) {
downstream.onNext(item);
if (compareAndSet(HAS_VALUE_HAS_REQUEST, COMPLETE)) {
downstream.onComplete();
}
break;
}
} else if (state == NO_VALUE_NO_REQUEST) {
value = item;
if (compareAndSet(NO_VALUE_NO_REQUEST, HAS_VALUE_NO_REQUEST)) {
break;
}
value = null;
} else {
// state == COMPLETE
// state == HAS_VALUE_NO_REQUEST
// state == HAS_VALUE_HAS_REQUEST
// state == CANCELED
break;
}
value = item; // assert: even if the race occurs, we will deliver one of the items with which complete()
// has been invoked - we support only the case with a single invocation of complete()
int state = getAndUpdate(n -> n | VALUE_ARRIVED);
if ((state & REQUEST_ARRIVED) == REQUEST_ARRIVED) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure why you added the masking of the value. getAndUpdate returns the value that existed before - so comparing to just REQUEST_ARRIVED in the POC was intentional.

a. in the case that we support masking makes no difference.
b. if the method is abused, and does get called more than once, the POC would still deliver only one value, and masking will deliver more than one value, and break the spec by delivering onNext after onComplete.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that the masking in line 89 is redundant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha I see, but it makes sense in the other complete() right?

    public final void complete() {
        int state = get();
        if (((state & VALUE_ARRIVED) != VALUE_ARRIVED) && compareAndSet(state, DONE)){
            downstream.onComplete();
        }
    }

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in the other complete it reflects the logic of the POC correctly: basically, in that complete it is "if value has not arrived yet" (was worded with different conditions with OR).

But in this complete the logic must be "if request has been seen, but the value still not seen" - only in that case we are allowed to deliver the signals to downstream.

value = null;
downstream.onNext(item);
downstream.onComplete();
} else if (state == DONE) {
value = null;
}
}

Expand All @@ -128,6 +104,7 @@ protected final void subscribeSelf() {

/**
* Returns the downstream reference.
*
* @return the downstream reference
*/
protected final Flow.Subscriber<? super T> downstream() {
Expand All @@ -138,34 +115,22 @@ protected final Flow.Subscriber<? super T> downstream() {
* Complete the downstream without emitting any items.
*/
public final void complete() {
for (;;) {
int state = get();
if (state == NO_VALUE_NO_REQUEST || state == NO_VALUE_HAS_REQUEST) {
if (compareAndSet(state, COMPLETE)) {
downstream.onComplete();
return;
}
} else {
break;
}
int state = get();
if (((state & VALUE_ARRIVED) != VALUE_ARRIVED) && compareAndSet(state, DONE)){
downstream.onComplete();
}
}

/**
* Signal error to the downstream without emitting any items.
*
* @param throwable the error to signal
*/
public final void error(Throwable throwable) {
for (;;) {
int state = get();
if (state == NO_VALUE_NO_REQUEST || state == NO_VALUE_HAS_REQUEST) {
if (compareAndSet(state, COMPLETE)) {
downstream.onError(throwable);
return;
}
} else {
break;
}
int state = getAndSet(DONE);
if (state != DONE) {
value = null;
downstream.onError(throwable);
}
}

Expand Down
Expand Up @@ -39,7 +39,7 @@ public void subscribe(Flow.Subscriber<? super T> subscriber) {
}

static final class ConcatArraySubscriber<T> extends SubscriptionArbiter
implements Flow.Subscriber<T> {
implements Flow.Subscriber<T> {

private final Flow.Subscriber<? super T> downstream;

Expand Down Expand Up @@ -98,6 +98,7 @@ public void nextSource() {
@Override
public void request(long n) {
if (n <= 0) {
cancel();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not sufficient for the problem that has been discovered. We'll have to address it better.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@olotenko To be clear, your point is that the sequence onError onNext is still possible as cancellation is only a best-effort task.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you can view this as the best effort so far. But like I mentioned offline, there is another issue that needs addressing, so we may just as well postpone touching this class.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, sounds like for a future PR.

downstream.onError(new IllegalArgumentException("Rule §3.9 violated: non-positive requests are forbidden"));
} else {
super.request(n);
Expand Down
Expand Up @@ -15,14 +15,14 @@
*/
package io.helidon.common.reactive;

import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import org.junit.jupiter.api.Test;

public class MultiConcatArrayTest {

Expand All @@ -31,11 +31,26 @@ public void errors() {
TestSubscriber<Object> ts = new TestSubscriber<>(Long.MAX_VALUE);

Multi.concatArray(Multi.singleton(1), Multi.error(new IOException()), Multi.singleton(2))
.subscribe(ts);
.subscribe(ts);

ts.assertFailure(IOException.class, 1);
}

@Test
public void cancelOnNegativeNumberTest() throws InterruptedException, TimeoutException, ExecutionException {
TestSubscriber<Integer> ts = new TestSubscriber<>();

CompletableFuture<Void> cancelled = new CompletableFuture<>();

Multi.concatArray(Multi.just(1, 2).onCancel(() -> cancelled.complete(null)), Multi.just(3, 4))
.subscribe(ts);
ts.request(1);
ts.getSubcription().request(-1);

cancelled.get(200, TimeUnit.MILLISECONDS);
ts.assertFailure(IllegalArgumentException.class, 1);
}

@Test
public void millionSources() {
@SuppressWarnings("unchecked")
Expand Down