Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.spine3.test.Verify.assertInstanceOf;
import static org.spine3.test.Verify.assertSize;
import static org.spine3.testdata.TestBoundedContextFactory.newBoundedContext;

Expand Down Expand Up @@ -158,6 +159,22 @@ public void subscribe_to_topic() {
assertTrue(observer.isCompleted);
}

@Test
public void handle_subscription_process_exceptions_and_call_observer_error_callback() {
final BoundedContext boundedContext = setupBoundedContextForAggregateRepo();

final SubscriptionService subscriptionService = SubscriptionService.newBuilder()
.addBoundedContext(boundedContext)
.build();
final MemoizeStreamObserver<Subscription> observer = new MemoizeStreamObserver<>();
// Causes NPE
subscriptionService.subscribe(null, observer);
assertNull(observer.streamFlowValue);
assertFalse(observer.isCompleted);
assertNotNull(observer.throwable);
assertInstanceOf(NullPointerException.class, observer.throwable);
}

@Test
public void activate_subscription() {
final BoundedContext boundedContext = setupBoundedContextForAggregateRepo();
Expand All @@ -170,7 +187,6 @@ public void activate_subscription() {
final Topic topic = Topic.newBuilder()
.setTarget(target)
.build();

// Subscribe on the topic
final MemoizeStreamObserver<Subscription> subscriptionObserver = new MemoizeStreamObserver<>();
subscriptionService.subscribe(topic, subscriptionObserver);
Expand All @@ -194,6 +210,22 @@ public void activate_subscription() {
activationObserver.verifyState(false);
}

@Test
public void handle_activation_process_exceptions_and_call_observer_error_callback() {
final BoundedContext boundedContext = setupBoundedContextForAggregateRepo();

final SubscriptionService subscriptionService = SubscriptionService.newBuilder()
.addBoundedContext(boundedContext)
.build();
final MemoizeStreamObserver<SubscriptionUpdate> observer = new MemoizeStreamObserver<>();
// Causes NPE
subscriptionService.activate(null, observer);
assertNull(observer.streamFlowValue);
assertFalse(observer.isCompleted);
assertNotNull(observer.throwable);
assertInstanceOf(NullPointerException.class, observer.throwable);
}

@Test
public void cancel_subscription_on_topic() {
final BoundedContext boundedContext = setupBoundedContextForAggregateRepo();
Expand Down Expand Up @@ -234,6 +266,36 @@ public void cancel_subscription_on_topic() {
verify(activateSubscription, never()).onCompleted();
}

@Test
public void handle_cancellation_process_exceptions_and_call_observer_error_callback() {
final BoundedContext boundedContext = setupBoundedContextForAggregateRepo();

final SubscriptionService subscriptionService = SubscriptionService.newBuilder()
.addBoundedContext(boundedContext)
.build();
final Target target = getProjectQueryTarget();
final Topic topic = Topic.newBuilder()
.setTarget(target)
.build();
final MemoizeStreamObserver<Subscription> subscriptionObserver = new MemoizeStreamObserver<>();
subscriptionService.subscribe(topic, subscriptionObserver);

final String failureMessage = "Execution breaking exception";
final MemoizeStreamObserver<Response> observer = new MemoizeStreamObserver<Response>() {
@Override
public void onNext(Response value) {
super.onNext(value);
throw new RuntimeException(failureMessage);
}
};
subscriptionService.cancel(subscriptionObserver.streamFlowValue, observer);
assertNotNull(observer.streamFlowValue);
assertFalse(observer.isCompleted);
assertNotNull(observer.throwable);
assertInstanceOf(RuntimeException.class, observer.throwable);
assertEquals(observer.throwable.getMessage(), failureMessage);
}

private static BoundedContext setupBoundedContextForAggregateRepo() {
final Stand stand = Stand.newBuilder()
.build();
Expand Down