Skip to content

Commit

Permalink
Remove unnecessary graceful stop signal in order to stabilize unit test
Browse files Browse the repository at this point in the history
* With GracefulStop for example the successfullyAcknowledgeMqttPublish test
  was instable because sometimes the actor terminated before the acknowledgement
  message could be sent

Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed Aug 9, 2022
1 parent d2efbdd commit edd3d1d
Showing 1 changed file with 20 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;

Expand Down Expand Up @@ -48,14 +47,11 @@
import org.eclipse.ditto.connectivity.service.messaging.TestConstants;
import org.eclipse.ditto.connectivity.service.messaging.internal.RetrieveAddressStatus;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.message.publish.GenericMqttPublish;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.message.publish.ManualAcknowledgementDisabledException;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.message.publish.MessageAlreadyAcknowledgedException;
import org.eclipse.ditto.internal.utils.akka.ActorSystemResource;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.signals.commands.exceptions.ThingNotAccessibleException;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
Expand Down Expand Up @@ -297,7 +293,7 @@ private static ConnectivityConfig getConnectivityConfigWithCustomMqttConfig(fina
}

@Test
public void mqttPublishSourceThrottlingWorksAsExpected() {
public void mqttPublishSourceThrottlingWorksAsExpected() throws InterruptedException {
final var processedMessagesCount = new LongAdder();
final var inboundMappingSink = Sink.foreach(msg -> processedMessagesCount.increment());

Expand All @@ -311,30 +307,27 @@ public void mqttPublishSourceThrottlingWorksAsExpected() {

final var runtimeIntervalAmount = 2;

final var testKit = ACTOR_SYSTEM_RESOURCE.newTestKit();
final var underTest = testKit.watch(ACTOR_SYSTEM_RESOURCE.newActor(
ACTOR_SYSTEM_RESOURCE.newActor(
MqttConsumerActor.propsProcessing(connection,
inboundMappingSink,
connectionSource,
connectivityStatusResolver,
connectivityConfig,
Source.repeat(GenericMqttPublish.ofMqtt5Publish(MQTT_5_PUBLISH))),
testName.getMethodName()
));
);

final var scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.schedule(() -> underTest.tell(GracefulStop.INSTANCE, testKit.getRef()),
throttlingInterval.toMillis() * runtimeIntervalAmount,
TimeUnit.MILLISECONDS);
// Wait some time
Thread.sleep(throttlingInterval.toMillis() * runtimeIntervalAmount);

testKit.expectTerminated(underTest);
// Assert that the expected amount of messages was processed during the waited time
assertThat(processedMessagesCount.sum())
.isCloseTo(throttlingLimitPerInterval * runtimeIntervalAmount, Offset.offset(1L));
}

@Test
public void mqttPublishesAreDroppedIfConsumerActorOperatesInDryRunMode() {
final var receivedMqttPublishMessages = new ArrayList<>();
public void mqttPublishesAreDroppedIfConsumerActorOperatesInDryRunMode() throws InterruptedException {
final var receivedMqttPublishMessages = new ArrayList<>();

final var underTest = ACTOR_SYSTEM_RESOURCE.newActor(
MqttConsumerActor.propsDryRun(connection,
Expand All @@ -346,7 +339,8 @@ public void mqttPublishesAreDroppedIfConsumerActorOperatesInDryRunMode() {
testName.getMethodName()
);

underTest.tell(GracefulStop.INSTANCE, ActorRef.noSender());
// Wait some tome to make sure there was enough time to potentially process messages (which should not happen!)
Thread.sleep(1_000L);

// 'inbound mapping sink' should receive nothing because should have been dropped.
assertThat(receivedMqttPublishMessages).isEmpty();
Expand Down Expand Up @@ -387,25 +381,25 @@ public void mqttPublishesAreMappedToExternalMessagesAndSentToInboundMappingSink(
}

@Test
@Ignore("TODO fix flaky GH actions test")
public void successfullyAcknowledgeMqttPublish() {
final var mqtt5Publish = Mockito.spy(MQTT_5_PUBLISH);
Mockito.when(mqtt5Publish.isRetain()).thenReturn(false);

final var inboundMappingSinkElementReceiver = ACTOR_SYSTEM_RESOURCE.newTestKit();
final var onCompleteMessage = new Object();
final var underTestWatcher = ACTOR_SYSTEM_RESOURCE.newTestKit();

final var underTest = underTestWatcher.watch(ACTOR_SYSTEM_RESOURCE.newActor(
//When
ACTOR_SYSTEM_RESOURCE.newActor(
MqttConsumerActor.propsProcessing(connection,
Sink.actorRef(inboundMappingSinkElementReceiver.getRef(), onCompleteMessage),
connectionSource,
connectivityStatusResolver,
TestConstants.CONNECTIVITY_CONFIG,
Source.single(GenericMqttPublish.ofMqtt5Publish(mqtt5Publish))),
testName.getMethodName()
));
);

//Then
final var externalMessageWithSender =
inboundMappingSinkElementReceiver.expectMsgClass(ExternalMessageWithSender.class);

Expand All @@ -419,11 +413,8 @@ public void successfullyAcknowledgeMqttPublish() {
ActorRef.noSender()
);

underTest.tell(GracefulStop.INSTANCE, ActorRef.noSender());

inboundMappingSinkElementReceiver.expectMsg(onCompleteMessage);
underTestWatcher.expectTerminated(underTest);
Mockito.verify(mqtt5Publish, Mockito.timeout(5_000L)).acknowledge();
Mockito.verify(mqtt5Publish, Mockito.timeout(1_000L)).acknowledge();
}

@Test
Expand All @@ -435,7 +426,7 @@ public void reconnectConsumerClientForRedeliveryIfInboundMessageIsRejected() {
final var onCompleteMessage = new Object();
final var fakeMqttClientActor = ACTOR_SYSTEM_RESOURCE.newTestKit();

final var underTest = fakeMqttClientActor.watch(fakeMqttClientActor.childActorOf(
fakeMqttClientActor.watch(fakeMqttClientActor.childActorOf(
MqttConsumerActor.propsProcessing(connection,
Sink.actorRef(inboundMappingSinkElementReceiver.getRef(), onCompleteMessage),
connectionSource,
Expand All @@ -459,17 +450,12 @@ public void reconnectConsumerClientForRedeliveryIfInboundMessageIsRejected() {
);

fakeMqttClientActor.expectMsgClass(ReconnectConsumerClient.class);

underTest.tell(GracefulStop.INSTANCE, ActorRef.noSender());

inboundMappingSinkElementReceiver.expectMsg(onCompleteMessage);
fakeMqttClientActor.expectTerminated(underTest);
Mockito.verify(mqtt5Publish, Mockito.never()).acknowledge();
}

@Test
public void doNotRedeliverRejectedMessageIfShouldDeliverButReconnectForRedeliveryIsDisabled()
throws MessageAlreadyAcknowledgedException, ManualAcknowledgementDisabledException {
public void doNotRedeliverRejectedMessageIfShouldDeliverButReconnectForRedeliveryIsDisabled() {

final var mqttConfig = Mockito.mock(MqttConfig.class);
Mockito.when(mqttConfig.shouldReconnectForRedelivery()).thenReturn(false);
Expand All @@ -483,7 +469,7 @@ public void doNotRedeliverRejectedMessageIfShouldDeliverButReconnectForRedeliver
final var onCompleteMessage = new Object();
final var fakeMqttClientActor = ACTOR_SYSTEM_RESOURCE.newTestKit();

final var underTest = fakeMqttClientActor.watch(fakeMqttClientActor.childActorOf(
fakeMqttClientActor.watch(fakeMqttClientActor.childActorOf(
MqttConsumerActor.propsProcessing(connection,
Sink.actorRef(inboundMappingSinkElementReceiver.getRef(), onCompleteMessage),
connectionSource,
Expand All @@ -507,11 +493,7 @@ public void doNotRedeliverRejectedMessageIfShouldDeliverButReconnectForRedeliver
);

fakeMqttClientActor.expectNoMessage();

underTest.tell(GracefulStop.INSTANCE, ActorRef.noSender());

inboundMappingSinkElementReceiver.expectMsg(onCompleteMessage);
fakeMqttClientActor.expectTerminated(underTest);
Mockito.verify(mqtt5Publish).acknowledge();
}

Expand All @@ -524,7 +506,7 @@ public void acknowledgeRejectedIncomingMessageIfShouldNotRedeliver() {
final var onCompleteMessage = new Object();
final var fakeMqttClientActor = ACTOR_SYSTEM_RESOURCE.newTestKit();

final var underTest = fakeMqttClientActor.watch(fakeMqttClientActor.childActorOf(
fakeMqttClientActor.watch(fakeMqttClientActor.childActorOf(
MqttConsumerActor.propsProcessing(connection,
Sink.actorRef(inboundMappingSinkElementReceiver.getRef(), onCompleteMessage),
connectionSource,
Expand All @@ -548,11 +530,7 @@ public void acknowledgeRejectedIncomingMessageIfShouldNotRedeliver() {
);

fakeMqttClientActor.expectNoMessage();

underTest.tell(GracefulStop.INSTANCE, ActorRef.noSender());

inboundMappingSinkElementReceiver.expectMsg(onCompleteMessage);
fakeMqttClientActor.expectTerminated(underTest);
Mockito.verify(mqtt5Publish).acknowledge();
}

Expand All @@ -565,7 +543,7 @@ public void rejectIncomingMessageDueToDittoRuntimeExceptionInsteadOfAcknowledgem
final var onCompleteMessage = new Object();
final var fakeMqttClientActor = ACTOR_SYSTEM_RESOURCE.newTestKit();

final var underTest = fakeMqttClientActor.watch(fakeMqttClientActor.childActorOf(
fakeMqttClientActor.watch(fakeMqttClientActor.childActorOf(
MqttConsumerActor.propsProcessing(connection,
Sink.actorRef(inboundMappingSinkElementReceiver.getRef(), onCompleteMessage),
connectionSource,
Expand All @@ -588,11 +566,7 @@ public void rejectIncomingMessageDueToDittoRuntimeExceptionInsteadOfAcknowledgem
);

fakeMqttClientActor.expectNoMessage();

underTest.tell(GracefulStop.INSTANCE, ActorRef.noSender());

inboundMappingSinkElementReceiver.expectMsg(onCompleteMessage);
fakeMqttClientActor.expectTerminated(underTest);
Mockito.verify(mqtt5Publish).acknowledge();
}

Expand Down

0 comments on commit edd3d1d

Please sign in to comment.