Skip to content

Commit

Permalink
Add some checks to BufferingFlowableWrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
dimabarbul committed Nov 17, 2023
1 parent 75b60b2 commit 18fbcaa
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
*/
package org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client;

import org.eclipse.ditto.base.model.common.ConditionChecker;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.disposables.Disposable;
Expand All @@ -25,6 +27,8 @@
* @param <T> type of items
*/
public class BufferingFlowableWrapper<T> implements Disposable {
private static final String DISPOSED_ERROR_MESSAGE = "The wrapper is disposed.";

private final Flowable<T> originalFlowable;
private final PublishSubject<T> buffered;
private final PublishSubject<T> unbuffered;
Expand All @@ -45,9 +49,22 @@ private BufferingFlowableWrapper(final Flowable<T> flowable) {
.toFlowable(BackpressureStrategy.BUFFER);

this.originalSubscription = flowable.subscribe(
x -> (isBuffering ? buffered : unbuffered).onNext(x),
e -> (isBuffering ? buffered : unbuffered).onError(e),
x -> {
if (isDisposed) {
return;
}
(isBuffering ? buffered : unbuffered).onNext(x);
},
e -> {
if (isDisposed) {
return;
}
(isBuffering ? buffered : unbuffered).onError(e);
},
() -> {
if (isDisposed) {
return;
}
buffered.onComplete();
unbuffered.onComplete();
isBuffering = false;
Expand All @@ -65,13 +82,18 @@ private BufferingFlowableWrapper(final Flowable<T> flowable) {
* @param <T> type of items of the flowable.
*/
public static <T> BufferingFlowableWrapper<T> of(final Flowable<T> flowable) {
ConditionChecker.checkNotNull(flowable, "flowable");
return new BufferingFlowableWrapper<>(flowable);
}

/**
* @return the {@code Flowable} which can be used to consume messages from original flowable.
*/
public Flowable<T> toFlowable() {
if (isDisposed) {
throw new IllegalStateException(DISPOSED_ERROR_MESSAGE);
}

return isBuffering ? this.flowable : this.originalFlowable;
}

Expand All @@ -80,6 +102,10 @@ public Flowable<T> toFlowable() {
* only new items.
*/
public void stopBuffering() {
if (isDisposed) {
throw new IllegalStateException(DISPOSED_ERROR_MESSAGE);
}

isBuffering = false;
buffered.onComplete();
}
Expand All @@ -88,9 +114,9 @@ public void stopBuffering() {

@Override
public void dispose() {
isDisposed = true;
this.originalSubscription.dispose();
this.subscription.dispose();
isDisposed = true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import java.util.ArrayList;

import org.assertj.core.api.Assertions;
import org.junit.Test;

import io.reactivex.BackpressureStrategy;
Expand All @@ -31,6 +32,14 @@ public class BufferingFlowableWrapperTest {
private final Flowable<Integer> flowable = emitter.toFlowable(BackpressureStrategy.DROP);
private final BufferingFlowableWrapper<Integer> bufferingFlowableWrapper = BufferingFlowableWrapper.of(flowable);

@Test
public void newInstanceWithNullGenericMqttSubscribingClientThrowsException() {
Assertions.assertThatNullPointerException()
.isThrownBy(() -> BufferingFlowableWrapper.of(null))
.withMessage("The flowable must not be null!")
.withNoCause();
}

@Test
public void toFlowableEmitsErrors() {
final var errors = new ArrayList<Throwable>();
Expand Down Expand Up @@ -120,6 +129,24 @@ public void toFlowableEmitsPreviousItemsToAllSubscribers() {
assertThat(received3).containsExactly(1, 2, 3);
}

@Test
public void toFlowableOnDisposedWrapperThrowsException() {
bufferingFlowableWrapper.dispose();
Assertions.assertThatIllegalStateException()
.isThrownBy(bufferingFlowableWrapper::toFlowable)
.withMessage("The wrapper is disposed.")
.withNoCause();
}

@Test
public void stopBufferingOnDisposedWrapperThrowsException() {
bufferingFlowableWrapper.dispose();
Assertions.assertThatIllegalStateException()
.isThrownBy(bufferingFlowableWrapper::stopBuffering)
.withMessage("The wrapper is disposed.")
.withNoCause();
}

@Test
public void toFlowableEmitsAllItemsFromCompletedFlowable() {
final var bufferingFlowableWrapper = BufferingFlowableWrapper.of(Flowable.just(1));
Expand Down

0 comments on commit 18fbcaa

Please sign in to comment.