Skip to content

Commit

Permalink
fixes salesforce#292 (supposedly)
Browse files Browse the repository at this point in the history
  • Loading branch information
Kirill Rodionov committed Feb 14, 2022
1 parent 4040f60 commit 851f581
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void request(long n) {
private volatile boolean done;
private Throwable error;

private volatile Subscriber<? super T> downstream;
protected volatile Subscriber<? super T> downstream;

private volatile boolean cancelled;

Expand Down Expand Up @@ -226,7 +226,7 @@ private void drainFused(final Subscriber<? super T> subscriber) {

for (;;) {
if (cancelled) {
queue.clear();
discardQueue(queue);
downstream = null;
return;
}
Expand Down Expand Up @@ -283,7 +283,7 @@ private void drain() {

private boolean checkTerminated(boolean d, boolean empty, Subscriber<? super T> subscriber, Queue<T> q) {
if (cancelled) {
q.clear();
discardQueue(q);
downstream = null;
return true;
}
Expand All @@ -305,6 +305,7 @@ private boolean checkTerminated(boolean d, boolean empty, Subscriber<? super T>
@Override
public void onNext(T t) {
if (done || cancelled) {
discardElement(t);
return;
}

Expand Down Expand Up @@ -419,7 +420,7 @@ public void cancel() {

if (!outputFused) {
if (WIP.getAndIncrement(this) == 0) {
queue.clear();
discardQueue(queue);
downstream = null;
}
}
Expand Down Expand Up @@ -456,4 +457,10 @@ public boolean isEmpty() {
public void clear() {
queue.clear();
}

protected void discardQueue(Queue<T> q) {
q.clear();
}

protected void discardElement(T t) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@
import com.salesforce.reactivegrpc.common.AbstractClientStreamObserverAndPublisher;
import com.salesforce.reactivegrpc.common.Consumer;
import io.grpc.stub.CallStreamObserver;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.publisher.Operators;
import reactor.util.concurrent.Queues;

import java.util.Queue;

/**
* TODO: Explain what this class does.
* @param <T> T
Expand Down Expand Up @@ -46,4 +50,20 @@ public int requestFusion(int requestedMode) {
}
return Fuseable.NONE;
}

@Override
protected void discardQueue(Queue<T> q) {
if(downstream instanceof CoreSubscriber) {
Operators.onDiscardQueueWithClear(q, ((CoreSubscriber) downstream).currentContext(), null);
} else {
q.clear();
}
}

@Override
protected void discardElement(T t) {
if(downstream instanceof CoreSubscriber) {
Operators.onDiscard(t, ((CoreSubscriber) downstream).currentContext());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@
import com.salesforce.reactivegrpc.common.Consumer;
import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.ServerCallStreamObserver;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.publisher.Operators;
import reactor.util.concurrent.Queues;

import java.util.Queue;

/**
* TODO: Explain what this class does.
* @param <T> T
Expand All @@ -38,4 +42,20 @@ public int requestFusion(int requestedMode) {
}
return Fuseable.NONE;
}

@Override
protected void discardQueue(Queue<T> q) {
if(downstream instanceof CoreSubscriber) {
Operators.onDiscardQueueWithClear(q, ((CoreSubscriber) downstream).currentContext(), null);
} else {
q.clear();
}
}

@Override
protected void discardElement(T t) {
if(downstream instanceof CoreSubscriber) {
Operators.onDiscard(t, ((CoreSubscriber) downstream).currentContext());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,18 @@

package com.salesforce.reactorgrpc.stub;

import java.util.concurrent.ForkJoinPool;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import reactor.core.Fuseable;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;

import java.time.Duration;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ForkJoinPool;

import static org.assertj.core.api.Assertions.assertThat;


public class ReactorClientStreamObserverAndPublisherTest {

Expand All @@ -34,7 +37,7 @@ public void multiThreadedProducerTest() {
.expectNextCount(countPerThread)
.verifyComplete();

Assertions.assertThat(observer.requestsQueue.size()).isBetween((countPerThread - DEFAULT_CHUNK_SIZE) / PART_OF_CHUNK + 1, (countPerThread - DEFAULT_CHUNK_SIZE) / PART_OF_CHUNK + 3);
assertThat(observer.requestsQueue.size()).isBetween((countPerThread - DEFAULT_CHUNK_SIZE) / PART_OF_CHUNK + 1, (countPerThread - DEFAULT_CHUNK_SIZE) / PART_OF_CHUNK + 3);
}

@Test
Expand All @@ -49,6 +52,32 @@ public void producerFusedTest() {
.expectNextCount(countPerThread)
.verifyComplete();

Assertions.assertThat(observer.requestsQueue.size()).isBetween((countPerThread - DEFAULT_CHUNK_SIZE) / PART_OF_CHUNK + 1, (countPerThread - DEFAULT_CHUNK_SIZE) / PART_OF_CHUNK + 3);
assertThat(observer.requestsQueue.size()).isBetween((countPerThread - DEFAULT_CHUNK_SIZE) / PART_OF_CHUNK + 1, (countPerThread - DEFAULT_CHUNK_SIZE) / PART_OF_CHUNK + 3);
}

@Test
public void discardQueueTest() {
ReactorClientStreamObserverAndPublisher<Integer> processor =
new ReactorClientStreamObserverAndPublisher<>(null);
int countPerThread = 5;
TestCallStreamObserverProducer observer = new TestCallStreamObserverProducer(ForkJoinPool.commonPool(), processor, countPerThread);
processor.beforeStart(observer);

ConcurrentLinkedQueue<Integer> discarded = new ConcurrentLinkedQueue<>();

Flux<Integer> consumer =
Flux.from(processor)
.log("processor")
.delayElements(Duration.ofMillis(100))
.take(1)
.log("handled")
.doOnDiscard(Integer.class, discarded::add);

StepVerifier.create(consumer)
.expectNext(0)
.verifyComplete();


assertThat(discarded).containsExactly(1, 2, 3, 4);
}
}

0 comments on commit 851f581

Please sign in to comment.