Skip to content

Commit

Permalink
ARTEMIS-4476 Fixing Ghost consumer situation with AMQP
Browse files Browse the repository at this point in the history
  • Loading branch information
clebertsuconic committed Nov 9, 2023
1 parent b041f2c commit 18692ec
Show file tree
Hide file tree
Showing 26 changed files with 1,153 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2675,4 +2675,11 @@ static void getCurrentTimeMillis() {
@LogMessage(id = 601771, value = "User {} is getting name on target resource: {}", level = LogMessage.Level.INFO)
void getCurrentTimeMillis(Object source);

static void verifyConnections(Object source) {
BASE_LOGGER.verifyConnections(getCaller(), source);
}

@LogMessage(id = 601772, value = "User {} is calling verifyConnections on resource: {}", level = LogMessage.Level.INFO)
void verifyConnections(String user, Object queue);

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,29 @@
package org.apache.activemq.artemis.protocol.amqp.broker;

import javax.security.auth.Subject;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.Executor;

import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQRemoteDisconnectException;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.EndpointState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This is a Server's Connection representation used by ActiveMQ Artemis.
*/
public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection {

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private final AMQPConnectionContext amqpConnection;

private final ProtonProtocolManager manager;
Expand Down Expand Up @@ -73,17 +78,31 @@ public void fail(final ActiveMQException me, String scaleDownTargetNodeID) {

destroyed = true;

//filter it like the other protocols
if (!(me instanceof ActiveMQRemoteDisconnectException)) {
ActiveMQClientLogger.LOGGER.connectionFailureDetected(amqpConnection.getConnectionCallback().getTransportConnection().getRemoteAddress(), me.getMessage(), me.getType());
if (logger.isDebugEnabled()) {
try {
logger.debug("Connection failure detected. amqpConnection.getHandler().getConnection().getRemoteState() = {}, remoteIP={}", amqpConnection.getHandler().getConnection().getRemoteState(), amqpConnection.getConnectionCallback().getTransportConnection().getRemoteAddress());
} catch (Throwable e) { // just to avoid a possible NPE from the debug statement itself
logger.debug(e.getMessage(), e);
}
}

// Then call the listeners
callFailureListeners(me, scaleDownTargetNodeID);
try {
if (amqpConnection.getHandler().getConnection().getRemoteState() != EndpointState.CLOSED) {
// A remote close was received on the client, on that case it's just a normal operation and we don't need to log this.
ActiveMQClientLogger.LOGGER.connectionFailureDetected(amqpConnection.getConnectionCallback().getTransportConnection().getRemoteAddress(), me.getMessage(), me.getType());
}
} catch (Throwable e) { // avoiding NPEs from te logging statement. I don't think this would happen, but just in case
logger.warn(e.getMessage(), e);
}

callClosingListeners();
amqpConnection.runNow(() -> {
// Then call the listeners
callFailureListeners(me, scaleDownTargetNodeID);

internalClose();
callClosingListeners();

internalClose();
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,7 @@ public interface ActiveMQAMQPProtocolLogger {

@LogMessage(id = 111006, value = "Unable to send message {} to Dead Letter Address.", level = LogMessage.Level.WARN)
void unableToSendMessageToDLA(MessageReference ref, Throwable t);

@LogMessage(id = 111007, value = "Invalid Connection State: {} for remote IP {}", level = LogMessage.Level.WARN)
void invalidAMQPConnectionState(Object state, Object remoteIP);
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,7 @@ public interface ActiveMQAMQPProtocolMessageBundle {

@Message(id = 119026, value = "Malformed Federation control message: {}")
ActiveMQException malformedFederationControlMessage(String address);

@Message(id = 119027, value = "Invalid AMQPConnection Remote State: {}")
ActiveMQException invalidAMQPConnectionState(Object state);
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.handler.ProtonHandler;
import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionImpl;
import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
Expand Down Expand Up @@ -92,6 +93,7 @@
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.codec.WritableBuffer;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Link;
Expand Down Expand Up @@ -295,7 +297,9 @@ public void initialize() throws Exception {
} catch (ActiveMQException e) {
throw e;
} catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingConsumer(e.getMessage());
ActiveMQAMQPInternalErrorException internalErrorException = ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingConsumer(e.getMessage());
internalErrorException.initCause(e);
throw internalErrorException;
}
}

Expand Down Expand Up @@ -980,6 +984,8 @@ class DefaultController implements SenderController {

@Override
public Consumer init(ProtonServerSenderContext senderContext) throws Exception {
validateConnectionState();

Source source = (Source) sender.getRemoteSource();
final Map<Symbol, Object> supportedFilters = new HashMap<>();

Expand Down Expand Up @@ -1334,4 +1340,22 @@ public void close() throws Exception {
}
}
}

private void validateConnectionState() throws ActiveMQException {
ProtonHandler handler = null;
Connection qpidConnection = null;

if (connection == null || (handler = connection.getHandler()) == null || (qpidConnection = handler.getConnection()) == null) {
if (logger.isDebugEnabled()) {
logger.debug("validateConnectionState:: connection={}, handler={}, qpidConnection={}", connection, handler, qpidConnection);
}

ActiveMQAMQPProtocolLogger.LOGGER.invalidAMQPConnectionState("null", "null");
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.invalidAMQPConnectionState("null");
}
if (qpidConnection.getRemoteState() == EndpointState.CLOSED) {
ActiveMQAMQPProtocolLogger.LOGGER.invalidAMQPConnectionState(qpidConnection.getRemoteState(), connection.getRemoteAddress());
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.invalidAMQPConnectionState(qpidConnection.getRemoteState());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException;
import org.apache.activemq.artemis.protocol.amqp.proton.handler.ProtonHandler;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Sender;

import org.junit.Test;
Expand All @@ -41,6 +44,13 @@ public void testAcceptsNullSourceAddressWhenInitialising() throws Exception {
when(mock.getServer()).thenReturn(mock(ActiveMQServer.class));
Sender mockSender = mock(Sender.class);
AMQPConnectionContext mockConnContext = mock(AMQPConnectionContext.class);

ProtonHandler handler = mock(ProtonHandler.class);
Connection connection = mock(Connection.class);
when(connection.getRemoteState()).thenReturn(EndpointState.ACTIVE);
when(mockConnContext.getHandler()).thenReturn(handler);
when(handler.getConnection()).thenReturn(connection);

when(mockConnContext.getProtocolManager()).thenReturn(mock);

AMQPSessionCallback mockSessionCallback = mock(AMQPSessionCallback.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se

private final OpenWireProtocolManager protocolManager;

private boolean destroyed = false;

private volatile ScheduledFuture ttlCheck;

//separated in/out wireFormats allow deliveries (eg async and consumers) to not slow down bufferReceived
Expand Down Expand Up @@ -1233,6 +1231,8 @@ public Response processAddConnection(ConnectionInfo info) throws Exception {
}
dispatchAsync(command);
}
// During a chanceClientID a disconnect could have been sent by the client, and the client will then re-issue a connect packet
destroyed = false;
return null;

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2297,7 +2297,7 @@ public boolean closeConsumerConnectionsForAddress(final String address) {
RemotingConnection connection = null;

for (RemotingConnection potentialConnection : remotingService.getConnections()) {
if (potentialConnection.getID().toString().equals(serverConsumer.getConnectionID())) {
if (potentialConnection.getID().toString().equals(String.valueOf(serverConsumer.getConnectionID()))) {
connection = potentialConnection;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1841,6 +1841,11 @@ public String listConsumersAsJSON() throws Exception {
.add(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName(), serverConsumer.getMessagesAcknowledgedAwaitingCommit())
.add(ConsumerField.LAST_DELIVERED_TIME.getName(), serverConsumer.getLastDeliveredTime())
.add(ConsumerField.LAST_ACKNOWLEDGED_TIME.getName(), serverConsumer.getLastAcknowledgedTime());

if (server.getRemotingService().getConnection(((ServerConsumer) consumer).getConnectionID()) == null) {
obj.add(ConsumerField.ORPHANED.getName(), true);
}

jsonArray.add(obj);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ public enum ConsumerField {
MESSAGES_ACKNOWLEDGED("messagesAcknowledged"),
MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT("messagesAcknowledgedAwaitingCommit"),
LAST_DELIVERED_TIME("lastDeliveredTime"),
LAST_ACKNOWLEDGED_TIME("lastAcknowledgedTime");
LAST_ACKNOWLEDGED_TIME("lastAcknowledgedTime"),
ORPHANED("orphaned");


private static final Map<String, ConsumerField> lookup = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public interface RemotingService {
*/
RemotingConnection removeConnection(Object remotingConnectionID);

RemotingConnection getConnection(Object remotingConnectionID);

Set<RemotingConnection> getConnections();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,8 @@ public boolean isStarted() {
return started;
}

private RemotingConnection getConnection(final Object remotingConnectionID) {
@Override
public RemotingConnection getConnection(final Object remotingConnectionID) {
ConnectionEntry entry = connections.get(remotingConnectionID);

if (entry != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,4 +539,7 @@ IllegalStateException invalidRoutingTypeUpdate(String queueName,

@Message(id = 229249, value = "Invalid Store property, only DATABASE property is supported")
RuntimeException unsupportedStorePropertyType();

@Message(id = 229250, value = "Connection has been marked as destroyed for remote connection {}.")
ActiveMQException connectionDestroyed(String remoteAddress);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4762,7 +4762,7 @@ public void run() {
RemotingService remotingService = server.getRemotingService();

for (RemotingConnection potentialConnection : remotingService.getConnections()) {
if (potentialConnection.getID().toString().equals(serverConsumer.getConnectionID())) {
if (potentialConnection.getID().toString().equals(String.valueOf(serverConsumer.getConnectionID()))) {
connection = potentialConnection;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,16 @@ public ServerConsumerImpl(final long id,
final boolean supportLargeMessage,
final Integer credits,
final ActiveMQServer server) throws Exception {


if (session == null || session.getRemotingConnection() == null) {
throw new NullPointerException("session = " + session);
}

if (session != null && session.getRemotingConnection() != null && session.getRemotingConnection().isDestroyed()) {
throw ActiveMQMessageBundle.BUNDLE.connectionDestroyed(session.getRemotingConnection().getRemoteAddress());
}

this.id = id;

this.sequentialID = server.getStorageManager().generateID();
Expand Down Expand Up @@ -356,8 +366,8 @@ public long getCreationTime() {
}

@Override
public String getConnectionID() {
return this.session.getConnectionID().toString();
public Object getConnectionID() {
return this.session.getConnectionID();
}

@Override
Expand Down Expand Up @@ -1575,7 +1585,11 @@ public String getConnectionLocalAddress() {

@Override
public String getConnectionRemoteAddress() {
return this.session.getRemotingConnection().getTransportConnection().getRemoteAddress();
if (this.session == null || this.session.getRemotingConnection() == null || this.session.getRemotingConnection().getTransportConnection() == null) {
return null;
} else {
return this.session.getRemotingConnection().getTransportConnection().getRemoteAddress();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,8 @@ public void registerInJMX(final ObjectName objectName, final Object managedResou
public synchronized void registerInRegistry(final String resourceName, final Object managedResource) {
unregisterFromRegistry(resourceName);

logger.debug("Registering {} as {}", resourceName, managedResource);

registry.put(resourceName, managedResource);
}

Expand Down
33 changes: 33 additions & 0 deletions tests/integration-tests-isolated/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-openwire-protocol</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-unit-test-support</artifactId>
Expand Down Expand Up @@ -194,6 +200,33 @@
<artifactId>jakarta.json-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>protonj2-test-driver</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>jakarta.jms</groupId>
<artifactId>jakarta.jms-api</artifactId>
</dependency>
<dependency>
<groupId>jakarta.management.j2ee</groupId>
<artifactId>jakarta.management.j2ee-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jms_1.1_spec</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-j2ee-management_1.1_spec</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<build>
Expand Down

0 comments on commit 18692ec

Please sign in to comment.