Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
 - recovery of connection actors (send open instead of create)
 - opening amqp 0.9.1 connections without a publisher (NPE)

Signed-off-by: Dominik Guggemos <dominik.guggemos@bosch-si.com>
  • Loading branch information
dguggemos committed Aug 15, 2018
1 parent fe82aba commit fbed089
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -396,8 +396,10 @@ protected final void stopChildActor(final String name) {
* @param actor the ActorRef
*/
protected final void stopChildActor(final ActorRef actor) {
log.debug("Stopping child actor <{}>.", actor.path());
getContext().stop(actor);
if (actor != null) {
log.debug("Stopping child actor <{}>.", actor.path());
getContext().stop(actor);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,11 +265,11 @@ public Receive createReceiveRecover() {
if (ConnectionStatus.OPEN.equals(connection.getConnectionStatus())) {
log.debug("Opening connection <{}> after recovery.", connectionId);

final CreateConnection connect = CreateConnection.of(connection, DittoHeaders.empty());
final OpenConnection connect = OpenConnection.of(connectionId, DittoHeaders.empty());

final ActorRef origin = getSender();
askClientActor(connect,
response -> log.info("CreateConnection result: {}", response),
response -> log.info("OpenConnection result: {}", response),
error -> handleException("recovery-connect", origin, error)
);
subscribeForEvents();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,9 @@ private CompletionStage<Status.Status> connect(final Connection connection, fina
if (throwable != null) {
future.complete(new Status.Failure(throwable));
} else {
rmqPublisherActor.tell(reply, rmqConnectionActor);
if (rmqPublisherActor != null) {
rmqPublisherActor.tell(reply, rmqConnectionActor);
}
future.complete(new Status.Success("channel created"));
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,9 +298,12 @@ public void modifyConnectionClosesAndRestartsClientActor() {
@Test
public void recoverOpenConnection() {
new TestKit(actorSystem) {{

final TestProbe mockClientProbe = TestProbe.apply(actorSystem);
ActorRef underTest =
TestConstants.createConnectionSupervisorActor(connectionId, actorSystem, pubSubMediator,
conciergeForwarder);
conciergeForwarder,
(connection, concierge) -> MockClientActor.props(mockClientProbe.ref()));
watch(underTest);

// create connection
Expand All @@ -316,6 +319,9 @@ public void recoverOpenConnection() {
TestConstants.createConnectionSupervisorActor(connectionId, actorSystem, pubSubMediator,
conciergeForwarder));

// connection is opened after recovery -> client actor receives OpenConnection command
mockClientProbe.expectMsg(OpenConnection.of(connectionId, DittoHeaders.empty()));

// retrieve connection status
underTest.tell(retrieveConnectionStatus, getRef());
expectMsg(retrieveConnectionStatusOpenResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.eclipse.ditto.model.connectivity.ConnectionStatus;
import org.eclipse.ditto.model.connectivity.ConnectionType;
import org.eclipse.ditto.model.connectivity.ConnectivityModelFactory;
import org.eclipse.ditto.model.connectivity.Target;
import org.eclipse.ditto.model.connectivity.Topic;
import org.eclipse.ditto.services.connectivity.messaging.BaseClientState;
import org.eclipse.ditto.services.connectivity.messaging.TestConstants;
Expand Down Expand Up @@ -146,6 +147,25 @@ public void testConnectionHandling() {
}};
}

@Test
public void testConnectionWithoutPublisherHandling() {
new TestKit(actorSystem) {{
final String randomConnectionId = TestConstants.createRandomConnectionId();
final Connection connectionWithoutTargets =
TestConstants.createConnection(randomConnectionId, actorSystem, new Target[0]);
final Props props = RabbitMQClientActor.propsForTests(connectionWithoutTargets, connectionStatus, getRef(),
(con, exHandler) -> mockConnectionFactory).withDispatcher(CallingThreadDispatcher.Id());
final ActorRef rabbitClientActor = actorSystem.actorOf(props);
watch(rabbitClientActor);

rabbitClientActor.tell(OpenConnection.of(randomConnectionId, DittoHeaders.empty()), getRef());
expectMsg(CONNECTED_SUCCESS);

rabbitClientActor.tell(CloseConnection.of(randomConnectionId, DittoHeaders.empty()), getRef());
expectMsg(DISCONNECTED_SUCCESS);
}};
}

@Test
public void testReconnection() {
new TestKit(actorSystem) {{
Expand Down

0 comments on commit fbed089

Please sign in to comment.