Skip to content

Commit

Permalink
[ISSUE #4821] Add some integration tests for POP consumption in slave…
Browse files Browse the repository at this point in the history
…-acting-master mode (#4822)

* fix #4821

* add license header and ignore executing integration tests in automatic flow

* fix unit test

* fix unit test
  • Loading branch information
caigy committed Aug 16, 2022
1 parent c5f8071 commit e3c2111
Show file tree
Hide file tree
Showing 7 changed files with 613 additions and 8 deletions.
Expand Up @@ -189,7 +189,7 @@ private PutMessageResult appendCheckPoint(final ChangeInvisibleTimeRequestHeader
msgInner.setDeliverTimeMs(ck.getReviveTime() - PopAckConstants.ackTimeInterval);
msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, PopMessageProcessor.genCkUniqueId(ck));
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
PutMessageResult putMessageResult = this.brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);

if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("change Invisible , appendCheckPoint, topic {}, queueId {},reviveId {}, cid {}, startOffset {}, rt {}, result {}", requestHeader.getTopic(), queueId, reviveQid, requestHeader.getConsumerGroup(), offset,
Expand Down
Expand Up @@ -538,7 +538,7 @@ private void putCkToStore(final PopCheckPointWrapper pointWrapper, final boolean
return;
}
MessageExtBrokerInner msgInner = popMessageProcessor.buildCkMsg(pointWrapper.getCk(), pointWrapper.getReviveQueueId());
PutMessageResult putMessageResult = brokerController.getMessageStore().putMessage(msgInner);
PutMessageResult putMessageResult = brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
if (putMessageResult.getPutMessageStatus() != PutMessageStatus.PUT_OK
&& putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_DISK_TIMEOUT
&& putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_SLAVE_TIMEOUT
Expand All @@ -547,7 +547,14 @@ private void putCkToStore(final PopCheckPointWrapper pointWrapper, final boolean
return;
}
pointWrapper.setCkStored(true);
pointWrapper.setReviveQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());

if (putMessageResult.isRemotePut()) {
//No AppendMessageResult when escaping remotely
pointWrapper.setReviveQueueOffset(0);
} else {
pointWrapper.setReviveQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
}

if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("[PopBuffer]put ck to store ok: {}, {}", pointWrapper, putMessageResult);
}
Expand Down Expand Up @@ -575,7 +582,7 @@ private boolean putAckToStore(final PopCheckPointWrapper pointWrapper, byte msgI
msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, PopMessageProcessor.genAckUniqueId(ackMsg));

msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
PutMessageResult putMessageResult = brokerController.getMessageStore().putMessage(msgInner);
PutMessageResult putMessageResult = brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
if (putMessageResult.getPutMessageStatus() != PutMessageStatus.PUT_OK
&& putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_DISK_TIMEOUT
&& putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_SLAVE_TIMEOUT
Expand Down
Expand Up @@ -477,7 +477,7 @@ private long popMsgFromQueue(boolean isRetry, GetMessageResult getMessageResult,
return restNum;
}
offset = getPopOffset(topic, requestHeader, queueId, true, lockKey);
GetMessageResult getMessageTmpResult;
GetMessageResult getMessageTmpResult = null;
try {
if (isOrder && brokerController.getConsumerOrderInfoManager().checkBlock(topic,
requestHeader.getConsumerGroup(), queueId, requestHeader.getInvisibleTime())) {
Expand Down Expand Up @@ -544,6 +544,8 @@ private long popMsgFromQueue(boolean isRetry, GetMessageResult getMessageResult,
// this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(), requestHeader.getConsumerGroup(), topic,
// queueId, getMessageTmpResult.getNextBeginOffset());
}
} catch (Exception e) {
POP_LOGGER.error("Exception in popMsgFromQueue", e);
} finally {
queueLockManager.unLock(lockKey);
}
Expand Down
Expand Up @@ -106,7 +106,7 @@ private void reviveRetry(PopCheckPoint popCheckPoint, MessageExt messageExt) thr
}
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
addRetryTopicIfNoExit(msgInner.getTopic(), popCheckPoint.getCId());
PutMessageResult putMessageResult = brokerController.getMessageStore().putMessage(msgInner);
PutMessageResult putMessageResult = brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("reviveQueueId={},retry msg , ck={}, msg queueId {}, offset {}, reviveDelay={}, result is {} ",
queueId, popCheckPoint, messageExt.getQueueId(), messageExt.getQueueOffset(),
Expand Down
Expand Up @@ -22,6 +22,7 @@
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.net.Broker2Client;
import org.apache.rocketmq.broker.failover.EscapeBridge;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.MessageConst;
Expand Down Expand Up @@ -75,12 +76,20 @@ public class ChangeInvisibleTimeProcessorTest {
@Mock
private Broker2Client broker2Client;

@Mock
private EscapeBridge escapeBridge = new EscapeBridge(this.brokerController);

@Before
public void init() throws IllegalAccessException, NoSuchFieldException {
brokerController.setMessageStore(messageStore);
Field field = BrokerController.class.getDeclaredField("broker2Client");
field.setAccessible(true);
field.set(brokerController, broker2Client);

Field ebField = BrokerController.class.getDeclaredField("escapeBridge");
ebField.setAccessible(true);
ebField.set(brokerController, this.escapeBridge);

Channel mockChannel = mock(Channel.class);
when(handlerContext.channel()).thenReturn(mockChannel);
brokerController.getTopicConfigManager().getTopicConfigTable().put(topic, new TopicConfig());
Expand All @@ -99,7 +108,7 @@ public void init() throws IllegalAccessException, NoSuchFieldException {

@Test
public void testProcessRequest_Success() throws RemotingCommandException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException {
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)));
when(escapeBridge.putMessageToSpecificQueue(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)));
int queueId = 0;
long queueOffset = 0;
long popTime = System.currentTimeMillis() - 1_000;
Expand Down
Expand Up @@ -78,7 +78,6 @@ public class PopMessageProcessorTest {
public void init() {
brokerController.setMessageStore(messageStore);
popMessageProcessor = new PopMessageProcessor(brokerController);
when(messageStore.putMessage(any())).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)));
Channel mockChannel = mock(Channel.class);
when(mockChannel.remoteAddress()).thenReturn(new InetSocketAddress(1024));
when(handlerContext.channel()).thenReturn(mockChannel);
Expand Down

0 comments on commit e3c2111

Please sign in to comment.