Skip to content

Commit

Permalink
reset amqp connection status when connection is restored
Browse files Browse the repository at this point in the history
Signed-off-by: Dominik Guggemos <dominik.guggemos@bosch-si.com>
  • Loading branch information
dguggemos committed Aug 7, 2018
1 parent cb08b64 commit da75c84
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 4 deletions.
Expand Up @@ -444,6 +444,10 @@ private FSM.State<BaseClientState, BaseClientData> handleStatusReport(final Stat
if (statusReport.hasConsumedMessage()) {
incrementConsumedMessageCounter();
}
if (statusReport.isConnectionRestored()) {
data = data.setConnectionStatus(ConnectionStatus.OPEN)
.setConnectionStatusDetails("Connection restored at " + Instant.now());
}
if (statusReport.getFailure().isPresent()) {
final ConnectionFailure failure = statusReport.getFailure().get();
final String message = MessageFormat.format("Failure: {0}, Description: {1}",
Expand Down Expand Up @@ -560,41 +564,52 @@ static class JmsDisconnected extends AbstractWithOrigin implements ClientDisconn
private static final class StatusReport {

private final boolean consumedMessage;
private final boolean connectionRestored;
@Nullable private final ConnectionFailure failure;
@Nullable private final MessageConsumer closedConsumer;
@Nullable private final MessageProducer closedProducer;

private StatusReport(
final boolean consumedMessage,
final boolean connectionRestored,
@Nullable final ConnectionFailure failure,
@Nullable final MessageConsumer closedConsumer,
@Nullable final MessageProducer closedProducer) {
this.consumedMessage = consumedMessage;
this.connectionRestored = connectionRestored;
this.failure = failure;
this.closedConsumer = closedConsumer;
this.closedProducer = closedProducer;
}

private static StatusReport connectionRestored() {
return new StatusReport(false, true, null, null, null);
}

private static StatusReport failure(final ConnectionFailure failure) {
return new StatusReport(false, failure, null, null);
return new StatusReport(false, false, failure, null, null);
}

private static StatusReport consumedMessage() {
return new StatusReport(true, null, null, null);
return new StatusReport(true, false, null, null, null);
}

private static StatusReport consumerClosed(final MessageConsumer consumer) {
return new StatusReport(false, null, consumer, null);
return new StatusReport(false, false, null, consumer, null);
}

private static StatusReport producerClosed(final MessageProducer producer) {
return new StatusReport(false, null, null, producer);
return new StatusReport(false, false, null, null, producer);
}

private boolean hasConsumedMessage() {
return consumedMessage;
}

private boolean isConnectionRestored() {
return connectionRestored;
}

private Optional<ConnectionFailure> getFailure() {
return Optional.ofNullable(failure);
}
Expand Down Expand Up @@ -651,6 +666,7 @@ public void onConnectionInterrupted(final URI remoteURI) {
public void onConnectionRestored(final URI remoteURI) {
LogUtil.enhanceLogWithCustomField(log, BaseClientData.MDC_CONNECTION_ID, connectionId);
log.info("Connection restored: {}", remoteURI);
self.tell(StatusReport.connectionRestored(), ActorRef.noSender());
}

@Override
Expand Down
Expand Up @@ -16,14 +16,17 @@
import static java.util.Collections.singletonList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.eclipse.ditto.model.base.common.ConditionChecker.checkNotNull;
import static org.eclipse.ditto.services.connectivity.messaging.TestConstants.createRandomConnectionId;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
Expand All @@ -45,11 +48,13 @@
import javax.jms.TextMessage;

import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionListener;
import org.apache.qpid.jms.JmsQueue;
import org.apache.qpid.jms.message.JmsTextMessage;
import org.apache.qpid.jms.provider.amqp.AmqpConnection;
import org.apache.qpid.jms.provider.amqp.message.AmqpJmsTextMessageFacade;
import org.assertj.core.api.ThrowableAssert;
import org.awaitility.Awaitility;
import org.eclipse.ditto.model.base.common.DittoConstants;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.connectivity.AddressMetric;
Expand Down Expand Up @@ -102,6 +107,7 @@ public class AmqpClientActorTest {
private static final Status.Success CONNECTED_SUCCESS = new Status.Success(BaseClientState.CONNECTED);
private static final Status.Success DISCONNECTED_SUCCESS = new Status.Success(BaseClientState.DISCONNECTED);
private static final JMSException JMS_EXCEPTION = new JMSException("FAIL");
private static final URI DUMMY = URI.create("amqp://test:1234");


@SuppressWarnings("NullableProblems") private static ActorSystem actorSystem;
Expand All @@ -127,6 +133,7 @@ public class AmqpClientActorTest {
private final TextMessage mockReplyMessage = Mockito.mock(TextMessage.class);
@Mock
private final TextMessage mockErrorMessage = Mockito.mock(TextMessage.class);
private ArgumentCaptor<JmsConnectionListener> listenerArgumentCaptor;

@BeforeClass
public static void setUp() {
Expand All @@ -143,6 +150,10 @@ public static void tearDown() {
@Before
public void init() throws JMSException {
when(mockConnection.createSession(Session.CLIENT_ACKNOWLEDGE)).thenReturn(mockSession);

listenerArgumentCaptor = ArgumentCaptor.forClass(JmsConnectionListener.class);
doNothing().when(mockConnection).addConnectionListener(listenerArgumentCaptor.capture());

when(mockSession.createConsumer(any(JmsQueue.class))).thenReturn(mockConsumer);
when(mockSession.createProducer(any(Destination.class))).thenReturn(mockProducer);
when(mockSession.createTextMessage(anyString())).thenAnswer(invocation -> {
Expand Down Expand Up @@ -237,6 +248,46 @@ public void testReconnect() {
}};
}


@Test
public void testReconnectAndVerifyConnectionStatus() {
new TestKit(actorSystem) {
{
final Props props = AmqpClientActor.propsForTests(connection, connectionStatus, getRef(),
(connection1, exceptionListener) -> mockConnection);
final ActorRef amqpClientActor = actorSystem.actorOf(props);
watch(amqpClientActor);

amqpClientActor.tell(CreateConnection.of(connection, DittoHeaders.empty()), getRef());
expectMsg(CONNECTED_SUCCESS);

amqpClientActor.tell(RetrieveConnectionMetrics.of(connectionId, DittoHeaders.empty()), getRef());
final RetrieveConnectionMetricsResponse retrieveConnectionMetricsResponse =
expectMsgClass(RetrieveConnectionMetricsResponse.class);

final JmsConnectionListener connectionListener = checkNotNull(listenerArgumentCaptor.getValue());

connectionListener.onConnectionInterrupted(DUMMY);
Awaitility.await().until(() -> awaitStatusInMetricsResponse(amqpClientActor, ConnectionStatus.FAILED));

connectionListener.onConnectionRestored(DUMMY);
Awaitility.await().until(() -> awaitStatusInMetricsResponse(amqpClientActor, ConnectionStatus.OPEN));

amqpClientActor.tell(CloseConnection.of(connectionId, DittoHeaders.empty()), getRef());
expectMsg(DISCONNECTED_SUCCESS);

}

private Boolean awaitStatusInMetricsResponse(final ActorRef amqpClientActor, final ConnectionStatus open) {
amqpClientActor.tell(RetrieveConnectionMetrics.of(connectionId, DittoHeaders.empty()), getRef());
final RetrieveConnectionMetricsResponse metrics =
expectMsgClass(RetrieveConnectionMetricsResponse.class);
return open.equals(metrics.getConnectionMetrics().getConnectionStatus());
}
};
}


@Test
public void sendCommandDuringInit() {
new TestKit(actorSystem) {{
Expand Down

0 comments on commit da75c84

Please sign in to comment.