Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ private PutMessageResult appendCheckPoint(final ChangeInvisibleTimeRequestHeader
msgInner.setDeliverTimeMs(ck.getReviveTime() - PopAckConstants.ackTimeInterval);
msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, PopMessageProcessor.genCkUniqueId(ck));
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
PutMessageResult putMessageResult = this.brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);

if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("change Invisible , appendCheckPoint, topic {}, queueId {},reviveId {}, cid {}, startOffset {}, rt {}, result {}", requestHeader.getTopic(), queueId, reviveQid, requestHeader.getConsumerGroup(), offset,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ private void putCkToStore(final PopCheckPointWrapper pointWrapper, final boolean
return;
}
MessageExtBrokerInner msgInner = popMessageProcessor.buildCkMsg(pointWrapper.getCk(), pointWrapper.getReviveQueueId());
PutMessageResult putMessageResult = brokerController.getMessageStore().putMessage(msgInner);
PutMessageResult putMessageResult = brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
if (putMessageResult.getPutMessageStatus() != PutMessageStatus.PUT_OK
&& putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_DISK_TIMEOUT
&& putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_SLAVE_TIMEOUT
Expand All @@ -547,7 +547,14 @@ private void putCkToStore(final PopCheckPointWrapper pointWrapper, final boolean
return;
}
pointWrapper.setCkStored(true);
pointWrapper.setReviveQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());

if (putMessageResult.isRemotePut()) {
//No AppendMessageResult when escaping remotely
pointWrapper.setReviveQueueOffset(0);
} else {
pointWrapper.setReviveQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
}

if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("[PopBuffer]put ck to store ok: {}, {}", pointWrapper, putMessageResult);
}
Expand Down Expand Up @@ -575,7 +582,7 @@ private boolean putAckToStore(final PopCheckPointWrapper pointWrapper, byte msgI
msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, PopMessageProcessor.genAckUniqueId(ackMsg));

msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
PutMessageResult putMessageResult = brokerController.getMessageStore().putMessage(msgInner);
PutMessageResult putMessageResult = brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
if (putMessageResult.getPutMessageStatus() != PutMessageStatus.PUT_OK
&& putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_DISK_TIMEOUT
&& putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_SLAVE_TIMEOUT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ private long popMsgFromQueue(boolean isRetry, GetMessageResult getMessageResult,
return restNum;
}
offset = getPopOffset(topic, requestHeader, queueId, true, lockKey);
GetMessageResult getMessageTmpResult;
GetMessageResult getMessageTmpResult = null;
try {
if (isOrder && brokerController.getConsumerOrderInfoManager().checkBlock(topic,
requestHeader.getConsumerGroup(), queueId, requestHeader.getInvisibleTime())) {
Expand Down Expand Up @@ -544,6 +544,8 @@ private long popMsgFromQueue(boolean isRetry, GetMessageResult getMessageResult,
// this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(), requestHeader.getConsumerGroup(), topic,
// queueId, getMessageTmpResult.getNextBeginOffset());
}
} catch (Exception e) {
POP_LOGGER.error("Exception in popMsgFromQueue", e);
} finally {
queueLockManager.unLock(lockKey);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private void reviveRetry(PopCheckPoint popCheckPoint, MessageExt messageExt) thr
}
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
addRetryTopicIfNoExit(msgInner.getTopic(), popCheckPoint.getCId());
PutMessageResult putMessageResult = brokerController.getMessageStore().putMessage(msgInner);
PutMessageResult putMessageResult = brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("reviveQueueId={},retry msg , ck={}, msg queueId {}, offset {}, reviveDelay={}, result is {} ",
queueId, popCheckPoint, messageExt.getQueueId(), messageExt.getQueueOffset(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.net.Broker2Client;
import org.apache.rocketmq.broker.failover.EscapeBridge;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.MessageConst;
Expand Down Expand Up @@ -75,12 +76,20 @@ public class ChangeInvisibleTimeProcessorTest {
@Mock
private Broker2Client broker2Client;

@Mock
private EscapeBridge escapeBridge = new EscapeBridge(this.brokerController);

@Before
public void init() throws IllegalAccessException, NoSuchFieldException {
brokerController.setMessageStore(messageStore);
Field field = BrokerController.class.getDeclaredField("broker2Client");
field.setAccessible(true);
field.set(brokerController, broker2Client);

Field ebField = BrokerController.class.getDeclaredField("escapeBridge");
ebField.setAccessible(true);
ebField.set(brokerController, this.escapeBridge);

Channel mockChannel = mock(Channel.class);
when(handlerContext.channel()).thenReturn(mockChannel);
brokerController.getTopicConfigManager().getTopicConfigTable().put(topic, new TopicConfig());
Expand All @@ -99,7 +108,7 @@ public void init() throws IllegalAccessException, NoSuchFieldException {

@Test
public void testProcessRequest_Success() throws RemotingCommandException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException {
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)));
when(escapeBridge.putMessageToSpecificQueue(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)));
int queueId = 0;
long queueOffset = 0;
long popTime = System.currentTimeMillis() - 1_000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ public class PopMessageProcessorTest {
public void init() {
brokerController.setMessageStore(messageStore);
popMessageProcessor = new PopMessageProcessor(brokerController);
when(messageStore.putMessage(any())).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)));
Channel mockChannel = mock(Channel.class);
when(mockChannel.remoteAddress()).thenReturn(new InetSocketAddress(1024));
when(handlerContext.channel()).thenReturn(mockChannel);
Expand Down
Loading