Skip to content

Commit

Permalink
rename proxyActor to commandForwarder
Browse files Browse the repository at this point in the history
Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed Jul 15, 2022
1 parent 780e68a commit 015b2bc
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,12 @@ public final class MqttClientActor extends BaseClientActor {

@SuppressWarnings("java:S1144") // called by reflection
private MqttClientActor(final Connection connection,
final ActorRef proxyActor,
final ActorRef commandForwarder,
final ActorRef connectionActor,
final DittoHeaders dittoHeaders,
final Config connectivityConfigOverwrites) {

super(connection, proxyActor, connectionActor, dittoHeaders, connectivityConfigOverwrites);
super(connection, commandForwarder, connectionActor, dittoHeaders, connectivityConfigOverwrites);

final var connectivityConfig = connectivityConfig();
final var connectionConfig = connectivityConfig.getConnectionConfig();
Expand All @@ -107,22 +107,22 @@ private MqttClientActor(final Connection connection,
* Returns the {@code Props} for creating a {@code GenericMqttClientActor} with the specified arguments.
*
* @param mqttConnection the MQTT connection.
* @param proxyActor the actor used to send signals into the Ditto cluster.
* @param commandForwarder the actor used to send signals into the Ditto cluster.
* @param connectionActor the connection persistence actor which creates the returned client actor.
* @param dittoHeaders headers of the command that caused the returned client actor to be created.
* @param connectivityConfigOverwrites the overwrites of the connectivity config for the given connection.
* @return the Props.
* @throws NullPointerException if any argument is {@code null}.
*/
public static Props props(final Connection mqttConnection,
final ActorRef proxyActor,
final ActorRef commandForwarder,
final ActorRef connectionActor,
final DittoHeaders dittoHeaders,
final Config connectivityConfigOverwrites) {

return Props.create(MqttClientActor.class,
ConditionChecker.checkNotNull(mqttConnection, "mqttConnection"),
ConditionChecker.checkNotNull(proxyActor, "proxyActor"),
ConditionChecker.checkNotNull(commandForwarder, "commandForwarder"),
ConditionChecker.checkNotNull(connectionActor, "connectionActor"),
ConditionChecker.checkNotNull(dittoHeaders, "dittoHeaders"),
ConditionChecker.checkNotNull(connectivityConfigOverwrites, "connectivityConfigOverwrites"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.nio.charset.StandardCharsets;
import java.text.MessageFormat;
import java.time.Duration;
import java.util.Arrays;
Expand Down Expand Up @@ -89,7 +90,7 @@
import akka.actor.Status;
import akka.http.javadsl.model.Uri;
import akka.testkit.TestActorRef;
import akka.testkit.javadsl.TestKit;
import akka.testkit.TestProbe;
import io.reactivex.Flowable;
import io.reactivex.Single;

Expand Down Expand Up @@ -137,8 +138,8 @@ public final class MqttClientActorTest extends AbstractBaseClientActorTest {
@Mock private GenericMqttClientFactory genericMqttClientFactory;
@Mock private GenericMqttClient genericMqttClient;

private TestKit proxyActor;
private TestKit connectionActor;
private TestProbe commandForwarder;
private TestProbe connectionActor;

@BeforeClass
public static void beforeClass() {
Expand All @@ -152,8 +153,8 @@ public static void beforeClass() {

@Before
public void before() {
proxyActor = actorSystemResource.newTestKit();
connectionActor = actorSystemResource.newTestKit();
commandForwarder = actorSystemResource.newTestProbe();
connectionActor = actorSystemResource.newTestProbe();

enableGenericMqttClientMethodStubbing();
enableGenericMqttClientFactoryMethodStubbing();
Expand All @@ -169,8 +170,8 @@ private void enableGenericMqttClientMethodStubbing() {
Mockito.when(genericMqttClient.publish(Mockito.any()))
.thenAnswer(invocation -> {
final GenericMqttPublish genericMqttPublish = invocation.getArgument(0);
final var proxyActorRef = proxyActor.getRef();
proxyActorRef.tell(genericMqttPublish, ActorRef.noSender());
final var commandForwarderRef = commandForwarder.ref();
commandForwarderRef.tell(genericMqttPublish, ActorRef.noSender());
return CompletableFuture.completedFuture(GenericMqttPublishResult.success(genericMqttPublish));
});
}
Expand All @@ -197,10 +198,10 @@ protected Connection getConnection(final boolean isSecure) {
}

@Override
protected Props createClientActor(final ActorRef proxyActor, final Connection connection) {
protected Props createClientActor(final ActorRef commandForwarder, final Connection connection) {
return MqttClientActor.props(connection,
proxyActor,
connectionActor.getRef(),
commandForwarder,
connectionActor.ref(),
DittoHeaders.empty(),
ConfigFactory.empty());
}
Expand All @@ -212,7 +213,7 @@ protected ActorSystem getActorSystem() {

@Test
public void openAndCloseConnection() {
final var underTest = TestActorRef.apply(createClientActor(proxyActor.getRef(),
final var underTest = TestActorRef.apply(createClientActor(commandForwarder.ref(),
ConnectivityModelFactory.newConnectionBuilder(getConnection(false))
.connectionStatus(ConnectivityStatus.CLOSED)
.specificConfig(Map.of("separatePublisherClient", "false"))
Expand Down Expand Up @@ -240,7 +241,7 @@ public void subscribeFails() {
final var mqttSubscribeException = new MqttSubscribeException("Quisquam omnis in quia hic et libero.", null);
Mockito.when(genericMqttClient.subscribe(Mockito.any())).thenReturn(Single.error(mqttSubscribeException));
final var underTest = TestActorRef.apply(
createClientActor(proxyActor.getRef(),
createClientActor(commandForwarder.ref(),
ConnectivityModelFactory.newConnectionBuilder(getConnection(false))
.connectionStatus(ConnectivityStatus.CLOSED)
.build()),
Expand All @@ -261,7 +262,7 @@ public void consumeFromTopicAndRetrieveConnectionMetrics() {
enableSubscribingAndConsumingMethodStubbing(getMqttPublish(SOURCE_ADDRESS, getSerializedModifyThingCommand()));
final var underTest = TestActorRef.apply(
createClientActor(
proxyActor.getRef(),
commandForwarder.ref(),
ConnectivityModelFactory.newConnectionBuilder(getConnection(false))
.sources(List.of(ConnectivityModelFactory.newSourceBuilder()
.authorizationContext(TestConstants.Authorization.AUTHORIZATION_CONTEXT)
Expand All @@ -279,7 +280,7 @@ public void consumeFromTopicAndRetrieveConnectionMetrics() {
final var testKit = actorSystemResource.newTestKit();

testKit.expectNoMessage();
final var modifyThing = proxyActor.expectMsgClass(ModifyThing.class);
final var modifyThing = commandForwarder.expectMsgClass(ModifyThing.class);
assertThat(modifyThing.getDittoHeaders())
.doesNotContainKeys(MqttHeader.getHeaderNames().toArray(String[]::new));

Expand All @@ -296,7 +297,7 @@ public void testConnectionIsSuccessful() {
Mockito.when(genericMqttClient.connect(Mockito.any())).thenReturn(CompletableFuture.completedStage(null));
final var testKit = actorSystemResource.newTestKit();
final var underTest = testKit.watch(TestActorRef.apply(
createClientActor(proxyActor.getRef(), connection),
createClientActor(commandForwarder.ref(), connection),
actorSystemResource.getActorSystem()
));

Expand All @@ -317,7 +318,7 @@ public void testConnectionFails() {
.build();
final var testKit = actorSystemResource.newTestKit();
final var underTest = testKit.watch(TestActorRef.apply(
createClientActor(proxyActor.getRef(), connection),
createClientActor(commandForwarder.ref(), connection),
actorSystemResource.getActorSystem()
));

Expand Down Expand Up @@ -391,7 +392,7 @@ public void consumeFromTopicWithSourceHeaderMapping() {
));
TestActorRef.apply(
createClientActor(
proxyActor.getRef(),
commandForwarder.ref(),
ConnectivityModelFactory.newConnectionBuilder(connection)
.setSources(connection.getSources()
.stream()
Expand All @@ -403,7 +404,7 @@ public void consumeFromTopicWithSourceHeaderMapping() {
actorSystemResource.getActorSystem()
);

final var modifyThing = proxyActor.expectMsgClass(ModifyThing.class);
final var modifyThing = commandForwarder.expectMsgClass(ModifyThing.class);
assertThat(modifyThing.getDittoHeaders())
.contains(
Map.entry(MqttHeader.MQTT_TOPIC.getName(), SOURCE_ADDRESS),
Expand All @@ -425,7 +426,7 @@ public void consumeFromTopicWithIdEnforcement() {
getSerializedModifyThingCommand()));
TestActorRef.apply(
createClientActor(
proxyActor.getRef(),
commandForwarder.ref(),
ConnectivityModelFactory.newConnectionBuilder(getConnection(false))
.setSources(List.of(ConnectivityModelFactory.newSourceBuilder()
.addresses(Set.of("eclipse/+/+"))
Expand All @@ -441,7 +442,7 @@ public void consumeFromTopicWithIdEnforcement() {
),
actorSystemResource.getActorSystem());

proxyActor.expectMsgClass(ModifyThing.class);
commandForwarder.expectMsgClass(ModifyThing.class);
}

@Test
Expand All @@ -450,7 +451,7 @@ public void consumeFromTopicWithIdEnforcementExpectErrorResponse() {
getSerializedModifyThingCommand()));
TestActorRef.apply(
createClientActor(
proxyActor.getRef(),
commandForwarder.ref(),
ConnectivityModelFactory.newConnectionBuilder(getConnection(false))
.setSources(List.of(ConnectivityModelFactory.newSourceBuilder()
.addresses(Set.of("eclipse/+/+"))
Expand All @@ -467,7 +468,7 @@ public void consumeFromTopicWithIdEnforcementExpectErrorResponse() {
actorSystemResource.getActorSystem()
);

final var genericMqttPublish = proxyActor.expectMsgClass(GenericMqttPublish.class);
final var genericMqttPublish = commandForwarder.expectMsgClass(GenericMqttPublish.class);
assertThat(genericMqttPublish.getPayloadAsHumanReadable())
.hasValueSatisfying(payload -> assertThat(payload)
.contains(ConnectionSignalIdEnforcementFailedException.ERROR_CODE));
Expand Down Expand Up @@ -508,7 +509,7 @@ public void consumeMultipleSources() {
));
TestActorRef.apply(
createClientActor(
proxyActor.getRef(),
commandForwarder.ref(),
ConnectivityModelFactory.newConnectionBuilder(getConnection(false))
.sources(List.of(
ConnectivityModelFactory.newSourceBuilder()
Expand Down Expand Up @@ -542,7 +543,7 @@ public void consumeMultipleSources() {
);

final var receivedTopicsCounts = IntStream.range(0, genericMqttPublishesForRelevantTopics.size())
.mapToObj(i -> proxyActor.expectMsgClass(ModifyThing.class))
.mapToObj(i -> commandForwarder.expectMsgClass(ModifyThing.class))
.map(ModifyThing::getDittoHeaders)
.map(dittoHeaders -> dittoHeaders.getOrDefault("custom.topic", "n/a"))
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
Expand All @@ -553,12 +554,12 @@ public void consumeMultipleSources() {
@Test
public void reconnectAndConsumeFromTopic() {
enableSubscribingAndConsumingMethodStubbing(getMqttPublish(SOURCE_ADDRESS, getSerializedModifyThingCommand()));
final var underTest = TestActorRef.apply(createClientActor(proxyActor.getRef(), getConnection(false)),
final var underTest = TestActorRef.apply(createClientActor(commandForwarder.ref(), getConnection(false)),
actorSystemResource.getActorSystem());
final var dittoHeadersWithCorrelationId = getDittoHeadersWithCorrelationId();
final var testKit = actorSystemResource.newTestKit();

proxyActor.expectMsgClass(ModifyThing.class);
commandForwarder.expectMsgClass(ModifyThing.class);

underTest.tell(CloseConnection.of(CONNECTION_ID, dittoHeadersWithCorrelationId), testKit.getRef());

Expand All @@ -569,7 +570,7 @@ public void reconnectAndConsumeFromTopic() {
testKit.expectMsg(CONNECTED_SUCCESS);

// ModifyThing automatically published by mock connection.
proxyActor.expectMsgClass(ModifyThing.class);
commandForwarder.expectMsgClass(ModifyThing.class);

underTest.tell(CloseConnection.of(CONNECTION_ID, dittoHeadersWithCorrelationId), testKit.getRef());

Expand All @@ -582,7 +583,7 @@ public void publishToTopic() {
final var thingModifiedEvent = TestConstants.thingModified(authorizationContext.getAuthorizationSubjects());
final var underTest = TestActorRef.apply(
createClientActor(
proxyActor.getRef(),
commandForwarder.ref(),
ConnectivityModelFactory.newConnectionBuilder(getConnection(false))
.connectionStatus(ConnectivityStatus.CLOSED)
.build()
Expand All @@ -598,21 +599,22 @@ public void publishToTopic() {

underTest.tell(thingModifiedEvent, testKit.getRef());

assertThat(proxyActor.expectMsgClass(GenericMqttPublish.class))
assertThat(commandForwarder.expectMsgClass(GenericMqttPublish.class))
.satisfies(genericMqttPublish -> {
assertThat(genericMqttPublish.getTopic()).isEqualTo(MqttTopic.of(MQTT_TARGET.getAddress()));
assertThat(genericMqttPublish.getPayload())
.hasValue(ByteBufferUtils.fromUtf8String(
assertThat(genericMqttPublish.getPayload()
.map(byteBuffer -> StandardCharsets.UTF_8.decode(byteBuffer).toString()))
.hasValue(
TestConstants.signalToDittoProtocolJsonString(thingModifiedEvent)
));
);
});
}

@Test
public void publishToReplyTarget() {
final var underTest = TestActorRef.apply(
createClientActor(
proxyActor.getRef(),
commandForwarder.ref(),
ConnectivityModelFactory.newConnectionBuilder(getConnection(false))
.connectionStatus(ConnectivityStatus.CLOSED)
.setSources(TestConstants.Sources.SOURCES_WITH_AUTH_CONTEXT)
Expand All @@ -639,7 +641,7 @@ public void publishToReplyTarget() {
testKit.getRef()
);

assertThat(proxyActor.expectMsgClass(GenericMqttPublish.class))
assertThat(commandForwarder.expectMsgClass(GenericMqttPublish.class))
.satisfies(genericMqttPublish ->
assertThat(genericMqttPublish.getTopic()).isEqualTo(MqttTopic.of("replyTarget/" + thingId)));
}
Expand Down

0 comments on commit 015b2bc

Please sign in to comment.