Skip to content

Commit

Permalink
Maintain HiveMqtt5ClientActor.
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch-si.com>
  • Loading branch information
yufei-cai committed Apr 16, 2020
1 parent c83cfbe commit 9f49557
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 14 deletions.
Expand Up @@ -63,7 +63,7 @@ public Props getActorPropsForType(final Connection connection, final ActorRef co
result = HiveMqtt3ClientActor.props(connection, conciergeForwarder, connectionActor);
break;
case MQTT_5:
result = HiveMqtt5ClientActor.props(connection, conciergeForwarder);
result = HiveMqtt5ClientActor.props(connection, conciergeForwarder, connectionActor);
break;
case KAFKA:
result = KafkaClientActor.props(connection, conciergeForwarder, connectionActor,
Expand Down
Expand Up @@ -63,15 +63,17 @@ public final class HiveMqtt5ClientActor extends BaseClientActor {
@SuppressWarnings("unused") // used by `props` via reflection
private HiveMqtt5ClientActor(final Connection connection,
final ActorRef conciergeForwarder,
final HiveMqtt5ClientFactory clientFactory) {
super(connection, conciergeForwarder);
final HiveMqtt5ClientFactory clientFactory,
final ActorRef connectionActor) {
super(connection, conciergeForwarder, connectionActor);
this.connection = connection;
this.clientFactory = clientFactory;
}

@SuppressWarnings("unused") // used by `props` via reflection
private HiveMqtt5ClientActor(final Connection connection, final ActorRef conciergeForwarder) {
this(connection, conciergeForwarder, DefaultHiveMqtt5ClientFactory.getInstance());
private HiveMqtt5ClientActor(final Connection connection, final ActorRef conciergeForwarder,
final ActorRef connectionActor) {
this(connection, conciergeForwarder, DefaultHiveMqtt5ClientFactory.getInstance(), connectionActor);
}

/**
Expand All @@ -80,24 +82,27 @@ private HiveMqtt5ClientActor(final Connection connection, final ActorRef concier
* @param connection the connection.
* @param conciergeForwarder the actor used to send signals to the concierge service.
* @param clientFactory factory used to create required mqtt clients
* @param connectionActor the parent connection actor
* @return the Akka configuration Props object.
*/
public static Props props(final Connection connection, final ActorRef conciergeForwarder,
final HiveMqtt5ClientFactory clientFactory) {
final HiveMqtt5ClientFactory clientFactory, final ActorRef connectionActor) {
return Props.create(HiveMqtt5ClientActor.class, validateConnection(connection),
conciergeForwarder, clientFactory);
conciergeForwarder, clientFactory, connectionActor);
}

/**
* Creates Akka configuration object for this actor.
*
* @param connection the connection.
* @param conciergeForwarder the actor used to send signals to the concierge service.
* @param connectionActor the parent connection actor.
* @return the Akka configuration Props object.
*/
public static Props props(final Connection connection, final ActorRef conciergeForwarder) {
public static Props props(final Connection connection, final ActorRef conciergeForwarder,
final ActorRef connectionActor) {
return Props.create(HiveMqtt5ClientActor.class, validateConnection(connection),
conciergeForwarder);
conciergeForwarder, connectionActor);
}

private Mqtt5Client getClient() {
Expand Down
Expand Up @@ -95,6 +95,7 @@ public abstract class AbstractMqttClientActorTest<M> extends AbstractBaseClientA
protected static final ConnectionType connectionType = ConnectionType.MQTT;

protected ActorSystem actorSystem;
protected TestProbe mockConnectionActor;

protected ConnectionId connectionId;
private String serverHost;
Expand All @@ -113,6 +114,7 @@ public void tearDown() {
@Before
public void initializeConnection() {
actorSystem = ActorSystem.create("AkkaTestSystem", TestConstants.CONFIG);
mockConnectionActor = TestProbe.apply("connectionActor", actorSystem);
connectionId = TestConstants.createRandomConnectionId();
serverHost = "tcp://localhost:" + FREE_PORT.getPort();
connection = ConnectivityModelFactory.newConnectionBuilder(connectionId, connectionType,
Expand Down
Expand Up @@ -38,7 +38,6 @@
public class HiveMqtt5ClientActorTest extends AbstractMqttClientActorTest<Mqtt5Publish> {

private MockHiveMqtt5ClientFactory mockHiveMqtt5ClientFactory;
protected static final ConnectionType connectionType = ConnectionType.MQTT_5;

@Before
public void initClient() {
Expand All @@ -59,7 +58,8 @@ public void testSubscribeFails() {
.withTestProbe(getRef())
.withFailingSubscribe();

final Props props = HiveMqtt5ClientActor.props(connection, getRef(), clientFactory);
final Props props =
HiveMqtt5ClientActor.props(connection, getRef(), clientFactory, mockConnectionActor.ref());
final ActorRef mqttClientActor = actorSystem.actorOf(props, "mqttClientActor-testSubscribeFails");

mqttClientActor.tell(OpenConnection.of(connectionId, DittoHeaders.empty()), getRef());
Expand All @@ -73,14 +73,14 @@ public void testSubscribeFails() {
@Override
protected Props createClientActor(final ActorRef testProbe) {
return HiveMqtt5ClientActor.props(connection, testProbe,
mockHiveMqtt5ClientFactory.withTestProbe(testProbe));
mockHiveMqtt5ClientFactory.withTestProbe(testProbe), mockConnectionActor.ref());
}

@Override
protected Props createFailingClientActor(final ActorRef testProbe) {
return HiveMqtt5ClientActor.props(connection, testProbe,
mockHiveMqtt5ClientFactory
.withException(new RuntimeException("failed to connect")));
.withException(new RuntimeException("failed to connect")), mockConnectionActor.ref());
}

@Override
Expand All @@ -90,7 +90,7 @@ protected Props createClientActorWithMessages(final Connection connection,
final MockHiveMqtt5ClientFactory clientFactory = mockHiveMqtt5ClientFactory
.withMessages(messages)
.withTestProbe(testProbe);
return HiveMqtt5ClientActor.props(connection, testProbe, clientFactory);
return HiveMqtt5ClientActor.props(connection, testProbe, clientFactory, mockConnectionActor.ref());
}

@Override
Expand Down

0 comments on commit 9f49557

Please sign in to comment.