Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BaseProcessor back-pressure fix backport from #1282 #1343

Merged
merged 1 commit into from
Mar 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2019, 2020 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -34,22 +34,28 @@ abstract class BaseProcessor<T, U> implements Processor<T, U>, Subscription {
private Subscription subscription;
private final SingleSubscriberHolder<U> subscriber;
private final RequestedCounter requested;
private final RequestedCounter ableToSubmit;
private final AtomicBoolean ready;
private final AtomicBoolean subscribed;
private volatile boolean done;
private Throwable error;

BaseProcessor() {
requested = new RequestedCounter();
ableToSubmit = new RequestedCounter();
ready = new AtomicBoolean();
subscribed = new AtomicBoolean();
subscriber = new SingleSubscriberHolder<>();
}

@Override
public final void request(long n) {
requested.increment(n, ex -> onError(ex));
tryRequest(subscription);
ableToSubmit.increment(n, this::onError);
if (subscription != null && !subscriber.isClosed()) {
subscription.request(n);
} else {
requested.increment(n, this::onError);
}
if (done) {
tryComplete();
}
Expand Down Expand Up @@ -108,10 +114,11 @@ public void subscribe(Subscriber<? super U> s) {

/**
* Submit an item to the subscriber.
*
* @param item item to be submitted
*/
protected void submit(U item) {
if (requested.tryDecrement()) {
if (ableToSubmit.tryDecrement()) {
try {
subscriber.get().onNext(item);
} catch (InterruptedException ex) {
Expand Down Expand Up @@ -156,7 +163,12 @@ protected final void doSubscribe(Publisher<U> delegate) {
delegate.subscribe(new Subscriber<U>() {
@Override
public void onSubscribe(Subscription subscription) {
tryRequest(subscription);
if (subscription != null && !subscriber.isClosed()) {
long n = ableToSubmit.get();
if (n > 0) {
subscription.request(n);
}
}
}

@Override
Expand Down Expand Up @@ -200,7 +212,7 @@ private void tryComplete() {

private void tryRequest(Subscription s) {
if (s != null && !subscriber.isClosed()) {
long n = requested.get();
long n = requested.getAndReset();
if (n > 0) {
s.request(n);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2018 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2020 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -84,4 +84,13 @@ public boolean tryDecrement() {
public long get() {
return requested.get();
}

/**
* Gets the current requested event counter value and resets it to 0.
*
* @return current value of the requested event counter.
*/
public long getAndReset() {
return requested.getAndSet(0);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2019, 2020 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,6 +21,8 @@

import org.junit.jupiter.api.Test;

import java.util.concurrent.atomic.AtomicLong;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.CoreMatchers.instanceOf;
Expand Down Expand Up @@ -86,6 +88,37 @@ public void testOnNextAfterOnComplete() {
assertThat(subscriber.getItems(), is(empty()));
}

@Test
public void testCumulatedBackpressure() {
AtomicLong publisherReceivedRequestSum = new AtomicLong();
Flow.Publisher<String> publisher = subscriber -> {
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) {
publisherReceivedRequestSum.addAndGet(n);
}

@Override
public void cancel() {
//noop
}
});
};
TestProcessor<String> processor = new TestProcessor<>();
TestSubscriber<String> subscriber = new TestSubscriber<String>() {
@Override
public void onNext(String item) {
//noop
}
};
publisher.subscribe(processor);
processor.subscribe(subscriber);
subscriber.request(2);
subscriber.request(2);
subscriber.request(2);
assertThat(publisherReceivedRequestSum.get(), is(equalTo(6L)));
}

@Test
public void testDoOnNextError() {
TestProcessor<String> processor = new TestProcessor<String>() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2018, 2020 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -39,7 +39,11 @@ public void onSubscribe(Flow.Subscription subscription) {
* Request one item.
*/
public void request1() {
this.subcription.request(1);
this.request(1);
}

public void request(long n) {
this.subcription.request(n);
}

/**
Expand All @@ -62,6 +66,7 @@ public void onError(Throwable throwable) {

/**
* Indicates completeness.
*
* @return {@code true} if complete, {@code false} otherwise
*/
public boolean isComplete() {
Expand All @@ -75,6 +80,7 @@ public void onComplete() {

/**
* Get the items accumulated by this subscriber.
*
* @return list of items
*/
public List<T> getItems() {
Expand All @@ -83,6 +89,7 @@ public List<T> getItems() {

/**
* Get the last item accumulated by this subscriber.
*
* @return last item, or {@code null} or this subscriber has not
* received any items yet
*/
Expand All @@ -92,6 +99,7 @@ public T getLastItem() {

/**
* Get the last error received by this subscriber.
*
* @return a {@code Throwable} or {@code null} or this subscriber has not
* received any
*/
Expand All @@ -101,6 +109,7 @@ public Throwable getLastError() {

/**
* Get the subscription set on this subscriber.
*
* @return a {@code Flow.Subscription} or {@code null} if onSubcribe has not
* been called
*/
Expand Down