Skip to content

Commit

Permalink
Merge 917bc93 into c88686f
Browse files Browse the repository at this point in the history
  • Loading branch information
drpmma committed Sep 26, 2022
2 parents c88686f + 917bc93 commit 0443af3
Show file tree
Hide file tree
Showing 23 changed files with 137 additions and 58 deletions.
Expand Up @@ -173,6 +173,7 @@ private PushReplyResult pushReplyMessage(final ChannelHandlerContext ctx,
replyMessageRequestHeader.setProperties(requestHeader.getProperties());
replyMessageRequestHeader.setReconsumeTimes(requestHeader.getReconsumeTimes());
replyMessageRequestHeader.setUnitMode(requestHeader.isUnitMode());
replyMessageRequestHeader.setBname(requestHeader.getBname());

RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT, replyMessageRequestHeader);
request.setBody(msg.getBody());
Expand Down
Expand Up @@ -211,6 +211,7 @@ public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean is
requestHeader.setConsumerGroup(this.groupName);
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setCommitOffset(offset);
requestHeader.setBname(mq.getBrokerName());

if (isOneway) {
this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
Expand Down Expand Up @@ -238,6 +239,7 @@ private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingExcept
requestHeader.setTopic(mq.getTopic());
requestHeader.setConsumerGroup(this.groupName);
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setBname(mq.getBrokerName());

return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
Expand Down
Expand Up @@ -191,7 +191,7 @@ public long searchOffset(MessageQueue mq, long timestamp) throws MQClientExcepti

if (brokerAddr != null) {
try {
return this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timestamp,
return this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq, timestamp,
timeoutMillis);
} catch (Exception e) {
throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
Expand All @@ -210,7 +210,7 @@ public long maxOffset(MessageQueue mq) throws MQClientException {

if (brokerAddr != null) {
try {
return this.mQClientFactory.getMQClientAPIImpl().getMaxOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timeoutMillis);
return this.mQClientFactory.getMQClientAPIImpl().getMaxOffset(brokerAddr, mq, timeoutMillis);
} catch (Exception e) {
throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
}
Expand All @@ -228,7 +228,7 @@ public long minOffset(MessageQueue mq) throws MQClientException {

if (brokerAddr != null) {
try {
return this.mQClientFactory.getMQClientAPIImpl().getMinOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timeoutMillis);
return this.mQClientFactory.getMQClientAPIImpl().getMinOffset(brokerAddr, mq, timeoutMillis);
} catch (Exception e) {
throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
}
Expand All @@ -246,8 +246,7 @@ public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {

if (brokerAddr != null) {
try {
return this.mQClientFactory.getMQClientAPIImpl().getEarliestMsgStoretime(brokerAddr, mq.getTopic(), mq.getQueueId(),
timeoutMillis);
return this.mQClientFactory.getMQClientAPIImpl().getEarliestMsgStoretime(brokerAddr, mq, timeoutMillis);
} catch (Exception e) {
throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
}
Expand Down
Expand Up @@ -836,13 +836,14 @@ public MessageExt viewMessage(final String addr, final long phyoffset, final lon
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}

public long searchOffset(final String addr, final String topic, final int queueId, final long timestamp,
public long searchOffset(final String addr, final MessageQueue mq, final long timestamp,
final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException {
SearchOffsetRequestHeader requestHeader = new SearchOffsetRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setQueueId(queueId);
requestHeader.setTopic(mq.getTopic());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setTimestamp(timestamp);
requestHeader.setBname(mq.getBrokerName());
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, requestHeader);

RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
Expand All @@ -861,11 +862,12 @@ public long searchOffset(final String addr, final String topic, final int queueI
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}

public long getMaxOffset(final String addr, final String topic, final int queueId, final long timeoutMillis)
public long getMaxOffset(final String addr, final MessageQueue mq, final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException {
GetMaxOffsetRequestHeader requestHeader = new GetMaxOffsetRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setQueueId(queueId);
requestHeader.setTopic(mq.getTopic());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setBname(mq.getBrokerName());
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MAX_OFFSET, requestHeader);

RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
Expand Down Expand Up @@ -912,11 +914,12 @@ public List<String> getConsumerIdListByGroup(
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}

public long getMinOffset(final String addr, final String topic, final int queueId, final long timeoutMillis)
public long getMinOffset(final String addr, final MessageQueue mq, final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException {
GetMinOffsetRequestHeader requestHeader = new GetMinOffsetRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setQueueId(queueId);
requestHeader.setTopic(mq.getTopic());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setBname(mq.getBrokerName());
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MIN_OFFSET, requestHeader);

RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
Expand All @@ -936,12 +939,12 @@ public long getMinOffset(final String addr, final String topic, final int queueI
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}

public long getEarliestMsgStoretime(final String addr, final String topic, final int queueId,
final long timeoutMillis)
public long getEarliestMsgStoretime(final String addr, final MessageQueue mq, final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException {
GetEarliestMsgStoretimeRequestHeader requestHeader = new GetEarliestMsgStoretimeRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setQueueId(queueId);
requestHeader.setTopic(mq.getTopic());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setBname(mq.getBrokerName());
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_EARLIEST_MSG_STORETIME, requestHeader);

RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
Expand Down Expand Up @@ -1100,6 +1103,7 @@ public boolean registerClient(final String addr, final HeartbeatData heartbeat,

public void consumerSendMessageBack(
final String addr,
final String brokerName,
final MessageExt msg,
final String consumerGroup,
final int delayLevel,
Expand All @@ -1115,6 +1119,7 @@ public void consumerSendMessageBack(
requestHeader.setDelayLevel(delayLevel);
requestHeader.setOriginMsgId(msg.getMsgId());
requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);
requestHeader.setBname(brokerName);

RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
Expand Down
Expand Up @@ -583,8 +583,8 @@ public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerN
consumerGroup = this.defaultMQPullConsumer.getConsumerGroup();
}

this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, consumerGroup, delayLevel, 3000,
this.defaultMQPullConsumer.getMaxReconsumeTimes());
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, brokerName, msg,
consumerGroup, delayLevel, 3000, this.defaultMQPullConsumer.getMaxReconsumeTimes());
} catch (Exception e) {
log.error("sendMessageBack Exception, " + this.defaultMQPullConsumer.getConsumerGroup(), e);

Expand Down
Expand Up @@ -515,7 +515,7 @@ public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerN
try {
String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
: RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, brokerName, msg,
this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
} catch (Exception e) {
log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);
Expand Down
Expand Up @@ -191,6 +191,7 @@ public PullResult pullKernelImpl(
requestHeader.setSubscription(subExpression);
requestHeader.setSubVersion(subVersion);
requestHeader.setExpressionType(expressionType);
requestHeader.setBname(mq.getBrokerName());

String brokerAddr = findBrokerResult.getBrokerAddr();
if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
Expand Down
Expand Up @@ -350,6 +350,8 @@ private void processTransactionState(
thisHeader.setProducerGroup(producerGroup);
thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
thisHeader.setFromTransactionCheck(true);
thisHeader.setBname(checkRequestHeader.getBname());
thisHeader.setQueueId(checkRequestHeader.getQueueId());

String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (uniqueKey == null) {
Expand Down Expand Up @@ -774,6 +776,7 @@ private SendResult sendKernelImpl(final Message msg,
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
requestHeader.setBname(mq.getBrokerName());
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
Expand Down Expand Up @@ -1323,6 +1326,8 @@ public void endTransaction(
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
requestHeader.setBname(sendResult.getMessageQueue().getBrokerName());
requestHeader.setQueueId(sendResult.getMessageQueue().getQueueId());
switch (localTransactionState) {
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
Expand Down
Expand Up @@ -20,18 +20,19 @@
*/
package org.apache.rocketmq.common.protocol.header;

import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.common.rpc.RpcRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;

public class CheckTransactionStateRequestHeader implements CommandCustomHeader {
public class CheckTransactionStateRequestHeader extends RpcRequestHeader {
@CFNotNull
private Long tranStateTableOffset;
@CFNotNull
private Long commitLogOffset;
private String msgId;
private String transactionId;
private String offsetMsgId;
private int queueId;

@Override
public void checkFields() throws RemotingCommandException {
Expand Down Expand Up @@ -76,4 +77,12 @@ public String getOffsetMsgId() {
public void setOffsetMsgId(String offsetMsgId) {
this.offsetMsgId = offsetMsgId;
}

public int getQueueId() {
return queueId;
}

public void setQueueId(int queueId) {
this.queueId = queueId;
}
}
Expand Up @@ -17,12 +17,12 @@

package org.apache.rocketmq.common.protocol.header;

import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.common.rpc.RpcRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;

public class ConsumerSendMsgBackRequestHeader implements CommandCustomHeader {
public class ConsumerSendMsgBackRequestHeader extends RpcRequestHeader {
@CFNotNull
private Long offset;
@CFNotNull
Expand Down
Expand Up @@ -17,13 +17,13 @@

package org.apache.rocketmq.common.protocol.header;

import org.apache.rocketmq.common.rpc.RpcRequestHeader;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;

public class EndTransactionRequestHeader implements CommandCustomHeader {
public class EndTransactionRequestHeader extends RpcRequestHeader {
@CFNotNull
private String producerGroup;
@CFNotNull
Expand All @@ -43,6 +43,8 @@ public class EndTransactionRequestHeader implements CommandCustomHeader {

private String transactionId;

private int queueId;

@Override
public void checkFields() throws RemotingCommandException {
if (MessageSysFlag.TRANSACTION_NOT_TYPE == this.commitOrRollback) {
Expand Down Expand Up @@ -126,6 +128,16 @@ public String toString() {
", fromTransactionCheck=" + fromTransactionCheck +
", msgId='" + msgId + '\'' +
", transactionId='" + transactionId + '\'' +
", queueId=" + queueId +
", bname='" + bname + '\'' +
'}';
}

public int getQueueId() {
return queueId;
}

public void setQueueId(int queueId) {
this.queueId = queueId;
}
}
Expand Up @@ -20,11 +20,11 @@
*/
package org.apache.rocketmq.common.protocol.header;

import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.common.rpc.RpcRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;

public class GetEarliestMsgStoretimeRequestHeader implements CommandCustomHeader {
public class GetEarliestMsgStoretimeRequestHeader extends RpcRequestHeader {
@CFNotNull
private String topic;
@CFNotNull
Expand Down
Expand Up @@ -20,11 +20,11 @@
*/
package org.apache.rocketmq.common.protocol.header;

import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.common.rpc.RpcRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;

public class GetMaxOffsetRequestHeader implements CommandCustomHeader {
public class GetMaxOffsetRequestHeader extends RpcRequestHeader {
@CFNotNull
private String topic;
@CFNotNull
Expand Down
Expand Up @@ -20,11 +20,11 @@
*/
package org.apache.rocketmq.common.protocol.header;

import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.common.rpc.RpcRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;

public class GetMinOffsetRequestHeader implements CommandCustomHeader {
public class GetMinOffsetRequestHeader extends RpcRequestHeader {
@CFNotNull
private String topic;
@CFNotNull
Expand Down
Expand Up @@ -20,17 +20,15 @@
*/
package org.apache.rocketmq.common.protocol.header;

import io.netty.buffer.ByteBuf;
import java.util.HashMap;

import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.common.rpc.RpcRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.FastCodesHeader;

import io.netty.buffer.ByteBuf;

public class PullMessageRequestHeader implements CommandCustomHeader, FastCodesHeader {
public class PullMessageRequestHeader extends RpcRequestHeader implements FastCodesHeader {
@CFNotNull
private String consumerGroup;
@CFNotNull
Expand All @@ -51,6 +49,7 @@ public class PullMessageRequestHeader implements CommandCustomHeader, FastCodesH
private String subscription;
@CFNotNull
private Long subVersion;
@CFNullable
private String expressionType;

@Override
Expand All @@ -70,6 +69,7 @@ public void encode(ByteBuf out) {
writeIfNotNull(out, "subscription", subscription);
writeIfNotNull(out, "subVersion", subVersion);
writeIfNotNull(out, "expressionType", expressionType);
writeIfNotNull(out, "bname", bname);
}

@Override
Expand Down Expand Up @@ -128,6 +128,11 @@ public void decode(HashMap<String, String> fields) throws RemotingCommandExcepti
if (str != null) {
this.expressionType = str;
}

str = fields.get("bname");
if (str != null) {
this.bname = str;
}
}

public String getConsumerGroup() {
Expand Down
Expand Up @@ -20,11 +20,11 @@
*/
package org.apache.rocketmq.common.protocol.header;

import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.common.rpc.RpcRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;

public class QueryConsumerOffsetRequestHeader implements CommandCustomHeader {
public class QueryConsumerOffsetRequestHeader extends RpcRequestHeader {
@CFNotNull
private String consumerGroup;
@CFNotNull
Expand Down

0 comments on commit 0443af3

Please sign in to comment.