Skip to content

Commit

Permalink
CAMEL-10807: fixing backpressure
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolaferraro authored and lburgazzoli committed May 5, 2017
1 parent 1ca43e4 commit 4266cf6
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 28 deletions.
Expand Up @@ -28,17 +28,15 @@
import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer; import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer;
import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.ObjectHelper;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import reactor.core.publisher.DirectProcessor; import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink; import reactor.core.publisher.FluxSink;
import reactor.core.publisher.SynchronousSink; import reactor.core.publisher.SynchronousSink;
import reactor.util.concurrent.QueueSupplier; import reactor.util.concurrent.QueueSupplier;


final class ReactorCamelProcessor implements Closeable { final class ReactorCamelProcessor implements Closeable {
private final String name; private final String name;
private final FluxProcessor<Exchange, Exchange> processor; private final EmitterProcessor<Exchange> publisher;
private final Flux<Exchange> processorFlux;
private final AtomicReference<FluxSink<Exchange>> camelSink; private final AtomicReference<FluxSink<Exchange>> camelSink;


private final ReactorStreamsService service; private final ReactorStreamsService service;
Expand All @@ -51,19 +49,20 @@ final class ReactorCamelProcessor implements Closeable {
this.camelProducer = null; this.camelProducer = null;
this.camelSink = new AtomicReference<>(); this.camelSink = new AtomicReference<>();


this.processor = DirectProcessor.create();


// As the processor can be shared among a number of subscribers we need // TODO: A emitter processor with buffer-size 0 would be perfect
// to make it shared we can multicast events. // The effect of having a prefetch of 1 element is that the chain buffers at least 2 elements instead of only one
this.processorFlux = this.processor.handle(this::onItemEmitted).share(); // (one in the FluxSink and one in the EmitterProcessor) when using the "latest" or "oldest" strategy.
// This affects slightly the behavior of the backpressure strategy "latest" (but it doesn't change the semantics).
this.publisher = EmitterProcessor.create(1);
} }


@Override @Override
public void close() throws IOException { public void close() throws IOException {
} }


Publisher<Exchange> getPublisher() { Publisher<Exchange> getPublisher() {
return processorFlux; return publisher;
} }


synchronized void attach(ReactiveStreamsProducer producer) { synchronized void attach(ReactiveStreamsProducer producer) {
Expand All @@ -80,15 +79,18 @@ synchronized void attach(ReactiveStreamsProducer producer) {
Flux<Exchange> flux = Flux.create(camelSink::set); Flux<Exchange> flux = Flux.create(camelSink::set);


if (ObjectHelper.equal(strategy, ReactiveStreamsBackpressureStrategy.OLDEST)) { if (ObjectHelper.equal(strategy, ReactiveStreamsBackpressureStrategy.OLDEST)) {
flux = flux.onBackpressureDrop(this::onBackPressure); // signal item emitted for non-dropped items only
flux = flux.onBackpressureDrop(this::onBackPressure).handle(this::onItemEmitted);
} else if (ObjectHelper.equal(strategy, ReactiveStreamsBackpressureStrategy.LATEST)) { } else if (ObjectHelper.equal(strategy, ReactiveStreamsBackpressureStrategy.LATEST)) {
flux = flux.onBackpressureLatest(); // Since there is no callback for dropped elements on backpressure "latest", item emission is signaled before dropping
// No exception is reported back to the exchanges
flux = flux.handle(this::onItemEmitted).onBackpressureLatest();
} else { } else {
// Default strategy is BUFFER // Default strategy is BUFFER
flux = flux.onBackpressureBuffer(QueueSupplier.SMALL_BUFFER_SIZE, this::onBackPressure); flux = flux.onBackpressureBuffer(QueueSupplier.SMALL_BUFFER_SIZE, this::onBackPressure).handle(this::onItemEmitted);
} }


flux.subscribe(this.processor); flux.subscribe(this.publisher);


camelProducer = producer; camelProducer = producer;
} }
Expand Down
Expand Up @@ -25,8 +25,8 @@
import org.apache.camel.component.reactive.streams.ReactiveStreamsBackpressureStrategy; import org.apache.camel.component.reactive.streams.ReactiveStreamsBackpressureStrategy;
import org.apache.camel.component.reactor.engine.suport.TestSubscriber; import org.apache.camel.component.reactor.engine.suport.TestSubscriber;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;

import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;


public class ReactorStreamsServiceBackpressureTest extends ReactorStreamsServiceTestSupport { public class ReactorStreamsServiceBackpressureTest extends ReactorStreamsServiceTestSupport {
Expand All @@ -39,9 +39,9 @@ public void testBufferStrategy() throws Exception {
@Override @Override
public void configure() throws Exception { public void configure() throws Exception {
from("timer:gen?period=20&repeatCount=20") from("timer:gen?period=20&repeatCount=20")
.setBody() .setBody()
.header(Exchange.TIMER_COUNTER) .header(Exchange.TIMER_COUNTER)
.to("reactive-streams:integers"); .to("reactive-streams:integers");
} }
}); });


Expand All @@ -50,10 +50,10 @@ public void configure() throws Exception {
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);


Flux.range(0, 50) Flux.range(0, 50)
.zipWith(integers, (l, i) -> i) .zipWith(integers, (l, i) -> i)
.timeoutMillis(2000, Flux.empty()) .timeoutMillis(2000, Flux.empty())
.doOnComplete(latch::countDown) .doOnComplete(latch::countDown)
.subscribe(queue::add); .subscribe(queue::add);


context.start(); context.start();


Expand All @@ -66,7 +66,7 @@ public void configure() throws Exception {
} }
} }


@Ignore
@Test @Test
public void testDropStrategy() throws Exception { public void testDropStrategy() throws Exception {
getReactiveStreamsComponent().setBackpressureStrategy(ReactiveStreamsBackpressureStrategy.OLDEST); getReactiveStreamsComponent().setBackpressureStrategy(ReactiveStreamsBackpressureStrategy.OLDEST);
Expand All @@ -75,9 +75,9 @@ public void testDropStrategy() throws Exception {
@Override @Override
public void configure() throws Exception { public void configure() throws Exception {
from("timer:gen?period=20&repeatCount=20") from("timer:gen?period=20&repeatCount=20")
.setBody() .setBody()
.header(Exchange.TIMER_COUNTER) .header(Exchange.TIMER_COUNTER)
.to("reactive-streams:integers"); .to("reactive-streams:integers");
} }
}); });


Expand Down Expand Up @@ -114,7 +114,6 @@ public void onNext(Integer o) {
subscriber.cancel(); subscriber.cancel();
} }


@Ignore
@Test @Test
public void testLatestStrategy() throws Exception { public void testLatestStrategy() throws Exception {
getReactiveStreamsComponent().setBackpressureStrategy(ReactiveStreamsBackpressureStrategy.LATEST); getReactiveStreamsComponent().setBackpressureStrategy(ReactiveStreamsBackpressureStrategy.LATEST);
Expand All @@ -123,9 +122,9 @@ public void testLatestStrategy() throws Exception {
@Override @Override
public void configure() throws Exception { public void configure() throws Exception {
from("timer:gen?period=20&repeatCount=20") from("timer:gen?period=20&repeatCount=20")
.setBody() .setBody()
.header(Exchange.TIMER_COUNTER) .header(Exchange.TIMER_COUNTER)
.to("reactive-streams:integers"); .to("reactive-streams:integers");
} }
}); });


Expand Down Expand Up @@ -154,10 +153,13 @@ public void onNext(Integer o) {
Assert.assertTrue(latch2.await(1, TimeUnit.SECONDS)); Assert.assertTrue(latch2.await(1, TimeUnit.SECONDS));


Thread.sleep(200); // add other time to ensure no other items arrive Thread.sleep(200); // add other time to ensure no other items arrive
Assert.assertEquals(2, queue.size()); // TODO the chain caches two elements instead of one: change it if you find an EmitterProcessor without prefetch
// Assert.assertEquals(2, queue.size());
Assert.assertEquals(3, queue.size());


int sum = queue.stream().reduce((i, j) -> i + j).get(); int sum = queue.stream().reduce((i, j) -> i + j).get();
Assert.assertEquals(21, sum); // 1 + 20 = 21 // Assert.assertEquals(21, sum); // 1 + 20 = 21
Assert.assertEquals(23, sum); // 1 + 2 + 20 = 23


subscriber.cancel(); subscriber.cancel();
} }
Expand Down

0 comments on commit 4266cf6

Please sign in to comment.