diff --git a/server/src/test/java/org/spine3/server/SubscriptionServiceShould.java b/server/src/test/java/org/spine3/server/SubscriptionServiceShould.java index 59420012ab5..0dface7177a 100644 --- a/server/src/test/java/org/spine3/server/SubscriptionServiceShould.java +++ b/server/src/test/java/org/spine3/server/SubscriptionServiceShould.java @@ -45,6 +45,7 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; +import static org.spine3.test.Verify.assertInstanceOf; import static org.spine3.test.Verify.assertSize; import static org.spine3.testdata.TestBoundedContextFactory.newBoundedContext; @@ -158,6 +159,22 @@ public void subscribe_to_topic() { assertTrue(observer.isCompleted); } + @Test + public void handle_subscription_process_exceptions_and_call_observer_error_callback() { + final BoundedContext boundedContext = setupBoundedContextForAggregateRepo(); + + final SubscriptionService subscriptionService = SubscriptionService.newBuilder() + .addBoundedContext(boundedContext) + .build(); + final MemoizeStreamObserver observer = new MemoizeStreamObserver<>(); + // Causes NPE + subscriptionService.subscribe(null, observer); + assertNull(observer.streamFlowValue); + assertFalse(observer.isCompleted); + assertNotNull(observer.throwable); + assertInstanceOf(NullPointerException.class, observer.throwable); + } + @Test public void activate_subscription() { final BoundedContext boundedContext = setupBoundedContextForAggregateRepo(); @@ -170,7 +187,6 @@ public void activate_subscription() { final Topic topic = Topic.newBuilder() .setTarget(target) .build(); - // Subscribe on the topic final MemoizeStreamObserver subscriptionObserver = new MemoizeStreamObserver<>(); subscriptionService.subscribe(topic, subscriptionObserver); @@ -194,6 +210,22 @@ public void activate_subscription() { activationObserver.verifyState(false); } + @Test + public void handle_activation_process_exceptions_and_call_observer_error_callback() { + final BoundedContext boundedContext = setupBoundedContextForAggregateRepo(); + + final SubscriptionService subscriptionService = SubscriptionService.newBuilder() + .addBoundedContext(boundedContext) + .build(); + final MemoizeStreamObserver observer = new MemoizeStreamObserver<>(); + // Causes NPE + subscriptionService.activate(null, observer); + assertNull(observer.streamFlowValue); + assertFalse(observer.isCompleted); + assertNotNull(observer.throwable); + assertInstanceOf(NullPointerException.class, observer.throwable); + } + @Test public void cancel_subscription_on_topic() { final BoundedContext boundedContext = setupBoundedContextForAggregateRepo(); @@ -234,6 +266,36 @@ public void cancel_subscription_on_topic() { verify(activateSubscription, never()).onCompleted(); } + @Test + public void handle_cancellation_process_exceptions_and_call_observer_error_callback() { + final BoundedContext boundedContext = setupBoundedContextForAggregateRepo(); + + final SubscriptionService subscriptionService = SubscriptionService.newBuilder() + .addBoundedContext(boundedContext) + .build(); + final Target target = getProjectQueryTarget(); + final Topic topic = Topic.newBuilder() + .setTarget(target) + .build(); + final MemoizeStreamObserver subscriptionObserver = new MemoizeStreamObserver<>(); + subscriptionService.subscribe(topic, subscriptionObserver); + + final String failureMessage = "Execution breaking exception"; + final MemoizeStreamObserver observer = new MemoizeStreamObserver() { + @Override + public void onNext(Response value) { + super.onNext(value); + throw new RuntimeException(failureMessage); + } + }; + subscriptionService.cancel(subscriptionObserver.streamFlowValue, observer); + assertNotNull(observer.streamFlowValue); + assertFalse(observer.isCompleted); + assertNotNull(observer.throwable); + assertInstanceOf(RuntimeException.class, observer.throwable); + assertEquals(observer.throwable.getMessage(), failureMessage); + } + private static BoundedContext setupBoundedContextForAggregateRepo() { final Stand stand = Stand.newBuilder() .build();