Skip to content

Commit

Permalink
Merge pull request #437 from bsinno/bugfix/session-recovery
Browse files Browse the repository at this point in the history
Fix amqp session recovery
  • Loading branch information
Yannic92 committed Jul 5, 2019
2 parents 1f25f70 + b37ad5f commit bd72dcc
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 15 deletions.
Expand Up @@ -52,6 +52,7 @@
import org.eclipse.ditto.services.connectivity.messaging.internal.AbstractWithOrigin;
import org.eclipse.ditto.services.connectivity.messaging.internal.ClientConnected;
import org.eclipse.ditto.services.connectivity.messaging.internal.ClientDisconnected;
import org.eclipse.ditto.services.connectivity.messaging.internal.CloseSession;
import org.eclipse.ditto.services.connectivity.messaging.internal.ConnectClient;
import org.eclipse.ditto.services.connectivity.messaging.internal.ConnectionFailure;
import org.eclipse.ditto.services.connectivity.messaging.internal.DisconnectClient;
Expand All @@ -77,6 +78,8 @@
*/
public final class AmqpClientActor extends BaseClientActor implements ExceptionListener {

private static final String SPEC_CONFIG_RECOVER_ON_SESSION_CLOSED = "recover.on-session-closed";
private static final String SPEC_CONFIG_RECOVER_ON_CONNECTION_RESTORED = "recover.on-connection-restored";
private final JmsConnectionFactory jmsConnectionFactory;
private final StatusReportingListener connectionListener;
private final List<ConsumerData> consumers;
Expand All @@ -90,6 +93,8 @@ public final class AmqpClientActor extends BaseClientActor implements ExceptionL
@Nullable private ActorRef disconnectConnectionHandler;

private final Map<String, ActorRef> consumerByNamePrefix;
private final boolean recoverSessionOnSessionClosed;
private final boolean recoverSessionOnConnectionRestored;

/*
* This constructor is called via reflection by the static method propsForTest.
Expand All @@ -105,6 +110,8 @@ private AmqpClientActor(final Connection connection,
connectionListener = new StatusReportingListener(getSelf(), connection.getId(), log, connectionLogger);
consumers = new LinkedList<>();
consumerByNamePrefix = new HashMap<>();
recoverSessionOnSessionClosed = isRecoverSessionOnSessionClosedEnabled();
recoverSessionOnConnectionRestored = isRecoverSessionOnConnectionRestoredEnabled();
}

/*
Expand Down Expand Up @@ -178,10 +185,11 @@ protected FSMStateFunctionBuilder<BaseClientState, BaseClientData> inConnectedSt
.event(JmsSessionRecovered.class, this::handleSessionRecovered);
}

private State<BaseClientState, BaseClientData> handleConnectionFailureWhenConnected(final ConnectionFailure failure, final BaseClientData data) {
private State<BaseClientState, BaseClientData> handleConnectionFailureWhenConnected(final ConnectionFailure failure,
final BaseClientData data) {
return goTo(BaseClientState.UNKNOWN)
.using(data.setConnectionStatus(ConnectivityStatus.FAILED)
.setConnectionStatusDetails(failure.getFailureDescription()));
.setConnectionStatusDetails(failure.getFailureDescription()));
}

@Override
Expand Down Expand Up @@ -268,7 +276,7 @@ protected void allocateResourcesOnConnection(final ClientConnected clientConnect

@Override
protected void cleanupResourcesForConnection() {
log.debug("cleaning up");
log.debug("cleaning up resources for connection '{}'", connectionId());
stopCommandConsumers();
stopMessageMappingProcessorActor();
stopCommandProducer();
Expand Down Expand Up @@ -437,7 +445,7 @@ private CompletableFuture<Void> ensureJmsConnectionClosed() {
}

private FSM.State<BaseClientState, BaseClientData> handleConnectionRestored(final BaseClientData currentData) {
if (jmsSession == null || ((JmsSession) jmsSession).isClosed()) {
if (recoverSessionOnConnectionRestored && (jmsSession == null || ((JmsSession) jmsSession).isClosed())) {
log.info("Restored connection has closed session, trying to recover...");
recoverSession(jmsSession);
}
Expand Down Expand Up @@ -491,14 +499,18 @@ private FSM.State<BaseClientState, BaseClientData> handleProducerClosed(
private FSM.State<BaseClientState, BaseClientData> handleSessionClosed(
final SessionClosedStatusReport statusReport,
final BaseClientData currentData) {

connectionLogger.failure("Session has been closed. Trying to recover the session.");
recoverSession(statusReport.getSession());
connectionLogger.failure("Session has been closed.");
if (recoverSessionOnSessionClosed) {
recoverSession(statusReport.getSession());
} else {
log.debug("Not recovering session after session was closed.");
}
return stay().using(currentData);
}

private void recoverSession(@Nullable final Session session) {
log.debug("Closing all child actors before recovering closed JMS session.");
connectionLogger.failure("Trying to recover the session.");
log.info("Recovering closed JMS session.");
// first stop all child actors, they relied on the closed/corrupt session
stopCommandConsumers();
stopMessageMappingProcessorActor();
Expand All @@ -512,6 +524,11 @@ private FSM.State<BaseClientState, BaseClientData> handleSessionRecovered(
final JmsSessionRecovered sessionRecovered,
final BaseClientData currentData) {

// make sure that we close any previous session
if (jmsSession != null) {
getConnectConnectionHandler(connection()).tell(new JmsCloseSession(getSender(), jmsSession), getSelf());
}

jmsSession = sessionRecovered.getSession();
consumers.clear();
consumers.addAll(sessionRecovered.getConsumerList());
Expand All @@ -525,6 +542,17 @@ private FSM.State<BaseClientState, BaseClientData> handleSessionRecovered(
return stay().using(currentData);
}

private boolean isRecoverSessionOnSessionClosedEnabled() {
final String recoverOnSessionClosed =
connection().getSpecificConfig().getOrDefault(SPEC_CONFIG_RECOVER_ON_SESSION_CLOSED, "false");
return Boolean.valueOf(recoverOnSessionClosed);
}

private boolean isRecoverSessionOnConnectionRestoredEnabled() {
final String recoverOnConnectionRestored = connection().getSpecificConfig().getOrDefault(SPEC_CONFIG_RECOVER_ON_CONNECTION_RESTORED, "true");
return Boolean.valueOf(recoverOnConnectionRestored);
}

@Override
public void onException(final JMSException exception) {
connectionLogger.exception("Exception occured: {0}", exception.getMessage());
Expand Down Expand Up @@ -566,6 +594,23 @@ Optional<javax.jms.Session> getSession() {
}
}

/**
* {@code CloseSession} message for internal communication with {@link JMSConnectionHandlingActor}.
*/
static final class JmsCloseSession extends AbstractWithOrigin implements CloseSession {

private final Session session;

JmsCloseSession(@Nullable final ActorRef origin, final Session session) {
super(origin);
this.session = session;
}

javax.jms.Session getSession() {
return session;
}
}

/**
* {@code Disconnect} message for internal communication with {@link JMSConnectionHandlingActor}.
*/
Expand Down
Expand Up @@ -116,10 +116,24 @@ public Receive createReceive() {
return receiveBuilder()
.match(AmqpClientActor.JmsConnect.class, this::handleConnect)
.match(AmqpClientActor.JmsRecoverSession.class, this::handleRecoverSession)
.match(AmqpClientActor.JmsCloseSession.class, this::handleCloseSession)
.match(AmqpClientActor.JmsDisconnect.class, this::handleDisconnect)
.build();
}

private void handleCloseSession(final AmqpClientActor.JmsCloseSession closeSession) {
log.debug("Processing JmsCloseSession message.");
final Session session = closeSession.getSession();
try {
safelyExecuteJmsOperation(null, "close session", () -> {
session.close();
return null;
});
} catch (final Exception e) {
log.debug("Closing session failed: {}", e.getMessage());
}
}

private void handleRecoverSession(final AmqpClientActor.JmsRecoverSession recoverSession) {

log.debug("Processing JmsRecoverSession message.");
Expand Down
@@ -0,0 +1,19 @@
/*
* Copyright (c) 2019 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.services.connectivity.messaging.internal;

/**
* Messaging internal command for closing a JMS session.
*/
public interface CloseSession extends WithOrigin {
}
Expand Up @@ -22,8 +22,8 @@
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.withSettings;

Expand Down Expand Up @@ -56,6 +56,7 @@
import org.apache.qpid.jms.JmsConnectionListener;
import org.apache.qpid.jms.JmsMessageConsumer;
import org.apache.qpid.jms.JmsQueue;
import org.apache.qpid.jms.JmsSession;
import org.apache.qpid.jms.message.JmsMessage;
import org.apache.qpid.jms.message.JmsTextMessage;
import org.apache.qpid.jms.provider.amqp.AmqpConnection;
Expand Down Expand Up @@ -102,7 +103,6 @@
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockSettings;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
Expand Down Expand Up @@ -136,7 +136,7 @@ public final class AmqpClientActorTest extends AbstractBaseClientActorTest {
private final JmsConnection mockConnection = Mockito.mock(JmsConnection.class);
private final JmsConnectionFactory jmsConnectionFactory = (connection1, exceptionListener) -> mockConnection;
@Mock
private final Session mockSession = Mockito.mock(Session.class);
private final JmsSession mockSession = Mockito.mock(JmsSession.class);
@Mock
private final JmsMessageConsumer mockConsumer = Mockito.mock(JmsMessageConsumer.class);
private MessageProducer mockProducer;
Expand Down Expand Up @@ -447,13 +447,16 @@ public void testCloseConnectionFails() throws JMSException {


@Test
public void testCloseSessionWhenConnectedExpectRecreateSession() throws JMSException {
public void testConnectionRestoredExpectRecreateSession() throws JMSException {
final String expectedAddress = "target";
final Target target = ConnectivityModelFactory.newTarget(expectedAddress, Authorization.AUTHORIZATION_CONTEXT,
null, null, Topic.TWIN_EVENTS);
final MessageProducer recoveredProducer = Mockito.mock(MessageProducer.class);
final MessageConsumer recoveredConsumer = Mockito.mock(MessageConsumer.class);
final Session newSession = Mockito.mock(Session.class, withSettings().name("recoveredSession"));
final JmsSession newSession = Mockito.mock(JmsSession.class, withSettings().name("recoveredSession"));

// existing session was closed
when(mockSession.isClosed()).thenReturn(true);
when(mockConnection.createSession(Session.CLIENT_ACKNOWLEDGE))
.thenReturn(mockSession) // initial session
.thenReturn(newSession); // recovered session
Expand Down Expand Up @@ -481,11 +484,11 @@ public void testCloseSessionWhenConnectedExpectRecreateSession() throws JMSExcep
expectMsgClass(Command.class);

// now close session
jmsConnectionListener.onSessionClosed(mockSession, new JMSException("Session closed for unknown reason"));
jmsConnectionListener.onConnectionRestored(URI.create("amqp://broker:5671"));
verify(mockConnection, timeout(2000).times(2)).createSession(Session.CLIENT_ACKNOWLEDGE);

// close is called on old session
verify(mockSession).close();
verify(mockSession, times(2)).close();

// verify publishing an event works with new session/producer
sendThingEventAndExpectPublish(amqpClientActor, target, recoveredProducer);
Expand Down

0 comments on commit bd72dcc

Please sign in to comment.