Skip to content

Commit

Permalink
ARTEMIS-3875 - adding consumer and producer metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
andytaylor authored and clebertsuconic committed Jan 13, 2023
1 parent bc1258a commit b02002f
Show file tree
Hide file tree
Showing 65 changed files with 4,432 additions and 1,149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public class ClientProducerImpl implements ClientProducerInternal {

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private final int id;

private final SimpleString address;

private final ClientSessionInternal session;
Expand Down Expand Up @@ -75,7 +77,10 @@ public ClientProducerImpl(final ClientSessionInternal session,
final boolean autoGroup,
final SimpleString groupID,
final int minLargeMessageSize,
final SessionContext sessionContext) {
final SessionContext sessionContext,
final int producerID) {
this.id = producerID;

this.sessionContext = sessionContext;

this.session = session;
Expand Down Expand Up @@ -197,13 +202,20 @@ public ClientProducerCredits getProducerCredits() {
return producerCredits;
}

@Override
public int getID() {
return id;
}

private void doCleanup() {
if (address != null) {
session.returnCredits(address);
}

session.removeProducer(this);

sessionContext.removeProducer(id);

closed = true;
}

Expand Down Expand Up @@ -290,7 +302,7 @@ private void sendRegularMessage(final SimpleString sendingAddress,

theCredits.acquireCredits(creditSize);

sessionContext.sendFullMessage(msgI, sendBlocking, handler, address);
sessionContext.sendFullMessage(msgI, sendBlocking, handler, address, id);
}

private void checkClosed() throws ActiveMQException {
Expand Down Expand Up @@ -381,7 +393,7 @@ private void largeMessageSendServer(final boolean sendBlocking,
lastChunk = pos >= bodySize;
SendAcknowledgementHandler messageHandler = lastChunk ? handler : null;

int creditsUsed = sessionContext.sendServerLargeMessageChunk(msgI, -1, sendBlocking, lastChunk, bodyBuffer.array(), messageHandler);
int creditsUsed = sessionContext.sendServerLargeMessageChunk(msgI, -1, sendBlocking, lastChunk, bodyBuffer.array(), id, messageHandler);

credits.acquireCredits(creditsUsed);
}
Expand Down Expand Up @@ -492,7 +504,7 @@ private void largeMessageSendStreamed(final boolean sendBlocking,
headerSent = true;
sendInitialLargeMessageHeader(msgI, credits);
}
int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, true, buff, reconnectID, handler);
int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, true, buff, reconnectID, id, handler);
credits.acquireCredits(creditsSent);
}
} else {
Expand All @@ -501,7 +513,7 @@ private void largeMessageSendStreamed(final boolean sendBlocking,
sendInitialLargeMessageHeader(msgI, credits);
}

int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, false, buff, reconnectID, handler);
int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, false, buff, reconnectID, id, handler);
credits.acquireCredits(creditsSent);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,6 @@ public interface ClientProducerInternal extends ClientProducer {
void cleanUp();

ClientProducerCredits getProducerCredits();

int getID();
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi

private final CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();

private AtomicInteger producerIDs = new AtomicInteger();

ClientSessionImpl(final ClientSessionFactoryInternal sessionFactory,
final String name,
final String username,
Expand Down Expand Up @@ -2031,10 +2033,20 @@ private ClientProducer internalCreateProducer(final SimpleString address,
final int maxRate) throws ActiveMQException {
checkClosed();

ClientProducerInternal producer = new ClientProducerImpl(this, address, maxRate == -1 ? null : new TokenBucketLimiterImpl(maxRate, false), autoCommitSends && blockOnNonDurableSend, autoCommitSends && blockOnDurableSend, autoGroup, groupID == null ? null : new SimpleString(groupID), minLargeMessageSize, sessionContext);
ClientProducerInternal producer = new ClientProducerImpl(this,
address,
maxRate == -1 ? null : new TokenBucketLimiterImpl(maxRate, false),
autoCommitSends && blockOnNonDurableSend,
autoCommitSends && blockOnDurableSend,
autoGroup, groupID == null ? null : new SimpleString(groupID),
minLargeMessageSize,
sessionContext,
producerIDs.incrementAndGet());

addProducer(producer);

sessionContext.createProducer(producer);

return producer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ default boolean isBeforeTwoEighteen() {
return version < PacketImpl.ARTEMIS_2_18_0_VERSION;
}

default boolean isBeforeProducerMetricsChanged() {
int version = getChannelVersion();
return version < PacketImpl.ARTEMIS_2_28_0_VERSION;
}

/**
* Sets the client protocol used on the communication. This will determine if the client has
* support for certain packet types
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal;
import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal;
import org.apache.activemq.artemis.core.client.impl.ClientProducerCredits;
import org.apache.activemq.artemis.core.client.impl.ClientProducerInternal;
import org.apache.activemq.artemis.core.client.impl.ClientSessionImpl;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.protocol.core.Channel;
Expand All @@ -68,6 +69,7 @@
import org.apache.activemq.artemis.core.protocol.core.ResponseHandler;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateAddressMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateProducerMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage;
Expand All @@ -76,6 +78,7 @@
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerWithKillMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReattachSessionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReattachSessionResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.RemoveProducerMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.RollbackMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAddMetaDataMessageV2;
Expand Down Expand Up @@ -105,10 +108,12 @@
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage_V3;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_V3;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAAfterFailedMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXACommitMessage;
Expand Down Expand Up @@ -371,6 +376,20 @@ public boolean isWritable(ReadyListener callback) {
return remotingConnection.isWritable(callback);
}

@Override
public void createProducer(ClientProducerInternal producer) {
if (!sessionChannel.getConnection().isBeforeProducerMetricsChanged()) {
sessionChannel.send(new CreateProducerMessage(producer.getID(), producer.getAddress()));
}
}

@Override
public void removeProducer(int id) {
if (!sessionChannel.getConnection().isBeforeProducerMetricsChanged()) {
sessionChannel.send(new RemoveProducerMessage(id));
}
}

@Override
public ClientConsumerInternal createConsumer(SimpleString queueName,
SimpleString filterString,
Expand Down Expand Up @@ -546,15 +565,19 @@ public int getCreditsOnSendingFull(Message msgI) {
public void sendFullMessage(ICoreMessage msgI,
boolean sendBlocking,
SendAcknowledgementHandler handler,
SimpleString defaultAddress) throws ActiveMQException {
SimpleString defaultAddress,
int senderID) throws ActiveMQException {
final SessionSendMessage packet;
if (sessionChannel.getConnection().isVersionBeforeAddressChange()) {
packet = new SessionSendMessage_1X(msgI, sendBlocking, handler);
} else if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) {
packet = new SessionSendMessage(msgI, sendBlocking, handler);
} else {
} else if (sessionChannel.getConnection().isBeforeProducerMetricsChanged()) {
boolean responseRequired = confirmationWindow != -1 || sendBlocking;
packet = new SessionSendMessage_V2(msgI, responseRequired, handler);
} else {
boolean responseRequired = confirmationWindow != -1 || sendBlocking;
packet = new SessionSendMessage_V3(msgI, responseRequired, handler, senderID);
}
if (sendBlocking) {
sessionChannel.sendBlocking(packet, PacketImpl.NULL_RESPONSE);
Expand All @@ -579,8 +602,9 @@ public int sendLargeMessageChunk(Message msgI,
boolean lastChunk,
byte[] chunk,
int reconnectID,
int senderID,
SendAcknowledgementHandler messageHandler) throws ActiveMQException {
return sendSessionSendContinuationMessage(this.sessionChannel, msgI, messageBodySize, sendBlocking, lastChunk, chunk, messageHandler);
return sendSessionSendContinuationMessage(this.sessionChannel, msgI, messageBodySize, sendBlocking, lastChunk, chunk, senderID, messageHandler);
}

@Override
Expand All @@ -589,8 +613,9 @@ public int sendServerLargeMessageChunk(Message msgI,
boolean sendBlocking,
boolean lastChunk,
byte[] chunk,
int senderID,
SendAcknowledgementHandler messageHandler) throws ActiveMQException {
return sendSessionSendContinuationMessage(this.sessionChannel, msgI, messageBodySize, sendBlocking, lastChunk, chunk, messageHandler);
return sendSessionSendContinuationMessage(this.sessionChannel, msgI, messageBodySize, sendBlocking, lastChunk, chunk, senderID, messageHandler);
}

@Override
Expand Down Expand Up @@ -1043,13 +1068,16 @@ private int sendSessionSendContinuationMessage(Channel channel,
boolean sendBlocking,
boolean lastChunk,
byte[] chunk,
int senderID,
SendAcknowledgementHandler messageHandler) throws ActiveMQException {
final boolean requiresResponse = lastChunk && sendBlocking;
final SessionSendContinuationMessage chunkPacket;
if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) {
chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
} else {
} else if (sessionChannel.getConnection().isBeforeProducerMetricsChanged()) {
chunkPacket = new SessionSendContinuationMessage_V2(msgI, chunk, !lastChunk, requiresResponse || confirmationWindow != -1, messageBodySize, messageHandler);
} else {
chunkPacket = new SessionSendContinuationMessage_V3(msgI, chunk, !lastChunk, requiresResponse || confirmationWindow != -1, messageBodySize, senderID, messageHandler);
}
//perform a weak form of flow control to avoid OOM on tight loops
final CoreRemotingConnection connection = channel.getConnection();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage_V3;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAAfterFailedMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXACommitMessage;
Expand Down Expand Up @@ -423,8 +424,10 @@ public Packet decode(byte packetType, CoreRemotingConnection connection) {
case SESS_SEND_CONTINUATION: {
if (connection.isVersionBeforeAsyncResponseChange()) {
packet = new SessionSendContinuationMessage();
} else {
} else if (connection.isBeforeProducerMetricsChanged()) {
packet = new SessionSendContinuationMessage_V2();
} else {
packet = new SessionSendContinuationMessage_V3();
}
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ public class PacketImpl implements Packet {
// 2.24.0
public static final int ARTEMIS_2_24_0_VERSION = 133;

// 2.28.0
public static final int ARTEMIS_2_28_0_VERSION = 134;

public static final SimpleString OLD_QUEUE_PREFIX = new SimpleString("jms.queue.");
public static final SimpleString OLD_TEMP_QUEUE_PREFIX = new SimpleString("jms.tempqueue.");
public static final SimpleString OLD_TOPIC_PREFIX = new SimpleString("jms.topic.");
Expand Down Expand Up @@ -293,7 +296,9 @@ public class PacketImpl implements Packet {

public static final byte DISCONNECT_V3 = -19;

public static final byte CREATE_PRODUCER = -20;

public static final byte REMOVE_PRODUCER = -21;

public PacketImpl(final byte type) {
this.type = type;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;

import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;

import java.util.Objects;

public class CreateProducerMessage extends PacketImpl {
protected int id;

protected SimpleString address;

public CreateProducerMessage() {
super(PacketImpl.CREATE_PRODUCER);
}

public CreateProducerMessage(int id, SimpleString address) {
super(PacketImpl.CREATE_PRODUCER);
this.id = id;
this.address = address;
}

public int getId() {
return id;
}

public void setId(int id) {
this.id = id;
}

public SimpleString getAddress() {
return address;
}

public void setAddress(SimpleString address) {
this.address = address;
}



@Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeInt(id);
buffer.writeNullableSimpleString(address);
}

@Override
public void decodeRest(final ActiveMQBuffer buffer) {
id = buffer.readInt();
address = buffer.readNullableSimpleString();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
CreateProducerMessage that = (CreateProducerMessage) o;
return Objects.equals(id, that.id) && Objects.equals(address, that.address);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), id, address);
}
}
Loading

0 comments on commit b02002f

Please sign in to comment.