diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java index bb68badc126..76c1b908e76 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java @@ -189,7 +189,7 @@ private PutMessageResult appendCheckPoint(final ChangeInvisibleTimeRequestHeader msgInner.setDeliverTimeMs(ck.getReviveTime() - PopAckConstants.ackTimeInterval); msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, PopMessageProcessor.genCkUniqueId(ck)); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); - PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); + PutMessageResult putMessageResult = this.brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner); if (brokerController.getBrokerConfig().isEnablePopLog()) { POP_LOGGER.info("change Invisible , appendCheckPoint, topic {}, queueId {},reviveId {}, cid {}, startOffset {}, rt {}, result {}", requestHeader.getTopic(), queueId, reviveQid, requestHeader.getConsumerGroup(), offset, diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java index 4b2df0875de..bb432a85154 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java @@ -538,7 +538,7 @@ private void putCkToStore(final PopCheckPointWrapper pointWrapper, final boolean return; } MessageExtBrokerInner msgInner = popMessageProcessor.buildCkMsg(pointWrapper.getCk(), pointWrapper.getReviveQueueId()); - PutMessageResult putMessageResult = brokerController.getMessageStore().putMessage(msgInner); + PutMessageResult putMessageResult = brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner); if (putMessageResult.getPutMessageStatus() != PutMessageStatus.PUT_OK && putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_DISK_TIMEOUT && putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_SLAVE_TIMEOUT @@ -547,7 +547,14 @@ private void putCkToStore(final PopCheckPointWrapper pointWrapper, final boolean return; } pointWrapper.setCkStored(true); - pointWrapper.setReviveQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset()); + + if (putMessageResult.isRemotePut()) { + //No AppendMessageResult when escaping remotely + pointWrapper.setReviveQueueOffset(0); + } else { + pointWrapper.setReviveQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset()); + } + if (brokerController.getBrokerConfig().isEnablePopLog()) { POP_LOGGER.info("[PopBuffer]put ck to store ok: {}, {}", pointWrapper, putMessageResult); } @@ -575,7 +582,7 @@ private boolean putAckToStore(final PopCheckPointWrapper pointWrapper, byte msgI msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, PopMessageProcessor.genAckUniqueId(ackMsg)); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); - PutMessageResult putMessageResult = brokerController.getMessageStore().putMessage(msgInner); + PutMessageResult putMessageResult = brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner); if (putMessageResult.getPutMessageStatus() != PutMessageStatus.PUT_OK && putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_DISK_TIMEOUT && putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_SLAVE_TIMEOUT diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java index 842d0d98bed..0d2c5f9b561 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java @@ -477,7 +477,7 @@ private long popMsgFromQueue(boolean isRetry, GetMessageResult getMessageResult, return restNum; } offset = getPopOffset(topic, requestHeader, queueId, true, lockKey); - GetMessageResult getMessageTmpResult; + GetMessageResult getMessageTmpResult = null; try { if (isOrder && brokerController.getConsumerOrderInfoManager().checkBlock(topic, requestHeader.getConsumerGroup(), queueId, requestHeader.getInvisibleTime())) { @@ -544,6 +544,8 @@ private long popMsgFromQueue(boolean isRetry, GetMessageResult getMessageResult, // this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(), requestHeader.getConsumerGroup(), topic, // queueId, getMessageTmpResult.getNextBeginOffset()); } + } catch (Exception e) { + POP_LOGGER.error("Exception in popMsgFromQueue", e); } finally { queueLockManager.unLock(lockKey); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java index 1b95cce4ef4..1a6c52ec337 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java @@ -106,7 +106,7 @@ private void reviveRetry(PopCheckPoint popCheckPoint, MessageExt messageExt) thr } msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); addRetryTopicIfNoExit(msgInner.getTopic(), popCheckPoint.getCId()); - PutMessageResult putMessageResult = brokerController.getMessageStore().putMessage(msgInner); + PutMessageResult putMessageResult = brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner); if (brokerController.getBrokerConfig().isEnablePopLog()) { POP_LOGGER.info("reviveQueueId={},retry msg , ck={}, msg queueId {}, offset {}, reviveDelay={}, result is {} ", queueId, popCheckPoint, messageExt.getQueueId(), messageExt.getQueueOffset(), diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java index 2a009e9513d..811913a26b7 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java @@ -22,6 +22,7 @@ import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.net.Broker2Client; +import org.apache.rocketmq.broker.failover.EscapeBridge; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.message.MessageConst; @@ -75,12 +76,20 @@ public class ChangeInvisibleTimeProcessorTest { @Mock private Broker2Client broker2Client; + @Mock + private EscapeBridge escapeBridge = new EscapeBridge(this.brokerController); + @Before public void init() throws IllegalAccessException, NoSuchFieldException { brokerController.setMessageStore(messageStore); Field field = BrokerController.class.getDeclaredField("broker2Client"); field.setAccessible(true); field.set(brokerController, broker2Client); + + Field ebField = BrokerController.class.getDeclaredField("escapeBridge"); + ebField.setAccessible(true); + ebField.set(brokerController, this.escapeBridge); + Channel mockChannel = mock(Channel.class); when(handlerContext.channel()).thenReturn(mockChannel); brokerController.getTopicConfigManager().getTopicConfigTable().put(topic, new TopicConfig()); @@ -99,7 +108,7 @@ public void init() throws IllegalAccessException, NoSuchFieldException { @Test public void testProcessRequest_Success() throws RemotingCommandException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException { - when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))); + when(escapeBridge.putMessageToSpecificQueue(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))); int queueId = 0; long queueOffset = 0; long popTime = System.currentTimeMillis() - 1_000; diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java index af6b4bb55a8..7ea20ceffee 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java @@ -78,7 +78,6 @@ public class PopMessageProcessorTest { public void init() { brokerController.setMessageStore(messageStore); popMessageProcessor = new PopMessageProcessor(brokerController); - when(messageStore.putMessage(any())).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))); Channel mockChannel = mock(Channel.class); when(mockChannel.remoteAddress()).thenReturn(new InetSocketAddress(1024)); when(handlerContext.channel()).thenReturn(mockChannel); diff --git a/test/src/test/java/org/apache/rocketmq/test/container/PopSlaveActingMasterIT.java b/test/src/test/java/org/apache/rocketmq/test/container/PopSlaveActingMasterIT.java new file mode 100644 index 00000000000..17d5e0cc376 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/container/PopSlaveActingMasterIT.java @@ -0,0 +1,588 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.container; + +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.BrokerIdentity; +import org.apache.rocketmq.common.KeyBuilder; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.message.MessageRequestMode; +import org.apache.rocketmq.container.BrokerContainer; +import org.apache.rocketmq.container.InnerBrokerController; +import org.apache.rocketmq.container.InnerSalveBrokerController; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + +import java.io.UnsupportedEncodingException; +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +@Ignore +public class PopSlaveActingMasterIT extends ContainerIntegrationTestBase { + private static final String CONSUME_GROUP = PopSlaveActingMasterIT.class.getSimpleName() + "_Consumer"; + private final static int MESSAGE_COUNT = 16; + private final static Random random = new Random(); + private static DefaultMQProducer producer; + private final static String MESSAGE_STRING = RandomStringUtils.random(1024); + private static byte[] MESSAGE_BODY; + + public PopSlaveActingMasterIT() { + } + + static { + try { + MESSAGE_BODY = MESSAGE_STRING.getBytes(RemotingHelper.DEFAULT_CHARSET); + } catch (UnsupportedEncodingException e) { + e.printStackTrace(); + } + } + + void createTopic(String topic) { + createTopicTo(master1With3Replicas, topic, 1, 1); + createTopicTo(master2With3Replicas, topic, 1, 1); + createTopicTo(master3With3Replicas, topic, 1, 1); + System.out.println("Topic [" + topic + "] created"); + } + + @BeforeClass + public static void beforeClass() throws Throwable { + producer = createProducer(PopSlaveActingMasterIT.class.getSimpleName() + "_PRODUCER"); + producer.setSendMsgTimeout(5000); + producer.start(); + } + + @AfterClass + public static void afterClass() throws Exception { + producer.shutdown(); + } + + + @Test + public void testLocalActing_ackSlave() throws Exception { + String topic = PopSlaveActingMasterIT.class.getSimpleName() + random.nextInt(65535); + createTopic(topic); + String retryTopic = KeyBuilder.buildPopRetryTopic(topic, CONSUME_GROUP); + createTopic(retryTopic); + + this.switchPop(topic); + + producer.getDefaultMQProducerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(topic); + + MessageQueue messageQueue = new MessageQueue(topic, master1With3Replicas.getBrokerConfig().getBrokerName(), 0); + int sendSuccess = 0; + for (int i = 0; i < MESSAGE_COUNT; i++) { + Message msg = new Message(topic, MESSAGE_BODY); + SendResult sendResult = producer.send(msg, messageQueue); + if (sendResult.getSendStatus() == SendStatus.SEND_OK) { + System.out.println("send message id: " + sendResult.getMsgId()); + sendSuccess++; + } + } + + System.out.printf("send success %d%n", sendSuccess); + final int finalSendSuccess = sendSuccess; + await().atMost(Duration.ofMinutes(1)).until(() -> finalSendSuccess >= MESSAGE_COUNT); + + isolateBroker(master1With3Replicas); + System.out.printf("isolate master1%n"); + + DefaultMQPushConsumer consumer = createPushConsumer(CONSUME_GROUP); + consumer.subscribe(topic, "*"); + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + List consumedMessages = new CopyOnWriteArrayList<>(); + consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { + msgs.forEach(msg -> { + System.out.println("receive msg id: " + msg.getMsgId()); + consumedMessages.add(msg.getMsgId()); + }); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + }); + consumer.setClientRebalance(false); + consumer.start(); + + await().atMost(Duration.ofMinutes(1)).until(() -> consumedMessages.size() >= MESSAGE_COUNT); + System.out.printf("%s pop receive msg count: %d%n", LocalDateTime.now(), consumedMessages.size()); + + consumer.shutdown(); + + List retryMsgList = new CopyOnWriteArrayList<>(); + DefaultMQPushConsumer pushConsumer = createPushConsumer(CONSUME_GROUP); + pushConsumer.subscribe(retryTopic, "*"); + pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { + for (MessageExt msg : msgs) { + System.out.printf("receive retry msg: %s %s%n", new String(msg.getBody()), msg); + retryMsgList.add(new String(msg.getBody())); + } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + }); + pushConsumer.start(); + + System.out.printf("wait for ack revive%n"); + Thread.sleep(10000L); + + assertThat(retryMsgList.size()).isEqualTo(0); + + cancelIsolatedBroker(master1With3Replicas); + awaitUntilSlaveOK(); + + pushConsumer.shutdown(); + } + + @Test + public void testLocalActing_notAckSlave() throws Exception { +// master1With3Replicas.getBrokerConfig().setReviveMaxSlow(0L); +// master1With3Replicas.getBrokerConfig().setReviveInterval(0L); +// //master1With3Replicas.getMessageStoreConfig().setMessageDelayLevel("1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s"); +// +// master2With3Replicas.getBrokerConfig().setReviveMaxSlow(0L); +// master2With3Replicas.getBrokerConfig().setReviveInterval(0L); +// //master2With3Replicas.getMessageStoreConfig().setMessageDelayLevel("1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s"); +// +// master3With3Replicas.getBrokerConfig().setReviveMaxSlow(0L); +// master3With3Replicas.getBrokerConfig().setReviveInterval(0L); +// //master3With3Replicas.getMessageStoreConfig().setMessageDelayLevel("1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s"); + + String topic = PopSlaveActingMasterIT.class.getSimpleName() + random.nextInt(65535); + createTopic(topic); + String retryTopic = KeyBuilder.buildPopRetryTopic(topic, CONSUME_GROUP); + createTopic(retryTopic); + + this.switchPop(topic); + + producer.getDefaultMQProducerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(topic); + + Set sendToIsolateMsgSet = new HashSet<>(); + MessageQueue messageQueue = new MessageQueue(topic, master1With3Replicas.getBrokerConfig().getBrokerName(), 0); + int sendSuccess = 0; + for (int i = 0; i < MESSAGE_COUNT; i++) { + Message msg = new Message(topic, MESSAGE_BODY); + SendResult sendResult = producer.send(msg, messageQueue); + if (sendResult.getSendStatus() == SendStatus.SEND_OK) { + sendToIsolateMsgSet.add(new String(msg.getBody())); + sendSuccess++; + } + } + + System.out.printf("send success %d%n", sendSuccess); + final int finalSendSuccess = sendSuccess; + await().atMost(Duration.ofMinutes(1)).until(() -> finalSendSuccess >= MESSAGE_COUNT); + + isolateBroker(master1With3Replicas); + System.out.printf("isolate master1%n"); + + DefaultMQPushConsumer consumer = createPushConsumer(CONSUME_GROUP); + consumer.subscribe(topic, "*"); + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + consumer.setPopInvisibleTime(5000L); + List consumedMessages = new CopyOnWriteArrayList<>(); + consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { + msgs.forEach(msg -> { + System.out.println("receive msg id: " + msg.getMsgId()); + + msg.setReconsumeTimes(0); + + consumedMessages.add(msg.getMsgId()); + }); + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + }); + consumer.setClientRebalance(false); + consumer.start(); + + await().atMost(Duration.ofMinutes(1)).until(() -> consumedMessages.size() >= MESSAGE_COUNT); + consumer.shutdown(); + + List retryMsgList = new CopyOnWriteArrayList<>(); + DefaultMQPushConsumer pushConsumer = createPushConsumer(CONSUME_GROUP); + pushConsumer.subscribe(retryTopic, "*"); + pushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { + for (MessageExt msg : msgs) { + System.out.printf("receive retry msg: %s%n", msg.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX)); + retryMsgList.add(new String(msg.getBody())); + } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + }); + pushConsumer.start(); + + System.out.printf(LocalDateTime.now() + ": wait for ack revive%n"); + + AtomicInteger failCnt = new AtomicInteger(0); + await().atMost(Duration.ofMinutes(3)).pollInterval(Duration.ofSeconds(10)).until(() -> { + if (retryMsgList.size() < MESSAGE_COUNT) { + System.out.println("check FAILED" + failCnt.incrementAndGet() + ": retryMsgList.size=" + retryMsgList.size() + " less than " + MESSAGE_COUNT); + return false; + } + + for (String msgBodyString : retryMsgList) { + if (!sendToIsolateMsgSet.contains(msgBodyString)) { + System.out.println("check FAILED: sendToIsolateMsgSet doesn't contain " + msgBodyString); + return false; + } + } + return true; + }); + + System.out.printf(LocalDateTime.now() + ": receive retry msg size=%d%n", retryMsgList.size()); + + cancelIsolatedBroker(master1With3Replicas); + awaitUntilSlaveOK(); + + pushConsumer.shutdown(); + } + + @Test + public void testRemoteActing_ackSlave() throws Exception { + String topic = PopSlaveActingMasterIT.class.getSimpleName() + random.nextInt(65535); + createTopic(topic); + String retryTopic = KeyBuilder.buildPopRetryTopic(topic, CONSUME_GROUP); + createTopic(retryTopic); + + switchPop(topic); + + producer.getDefaultMQProducerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(topic); + + MessageQueue messageQueue = new MessageQueue(topic, master1With3Replicas.getBrokerConfig().getBrokerName(), 0); + int sendSuccess = 0; + for (int i = 0; i < MESSAGE_COUNT; i++) { + Message msg = new Message(topic, MESSAGE_BODY); + SendResult sendResult = producer.send(msg, messageQueue); + if (sendResult.getSendStatus() == SendStatus.SEND_OK) { + System.out.println("Send message id: " + sendResult.getMsgId()); + sendSuccess++; + } + } + + System.out.printf("%s send success %d%n", LocalDateTime.now(), sendSuccess); + final int finalSendSuccess = sendSuccess; + await().atMost(Duration.ofMinutes(1)).until(() -> finalSendSuccess >= MESSAGE_COUNT); + + isolateBroker(master1With3Replicas); + System.out.printf("%s isolate master1%n", LocalDateTime.now()); + + isolateBroker(master2With3Replicas); + brokerContainer2.removeBroker(new BrokerIdentity( + master2With3Replicas.getBrokerConfig().getBrokerClusterName(), + master2With3Replicas.getBrokerConfig().getBrokerName(), + master2With3Replicas.getBrokerConfig().getBrokerId())); + System.out.printf("%s Remove master2%n", LocalDateTime.now()); + + + DefaultMQPushConsumer consumer = createPushConsumer(CONSUME_GROUP); + consumer.subscribe(topic, "*"); + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + List consumedMessages = new CopyOnWriteArrayList<>(); + consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { + msgs.forEach(msg -> { + System.out.println("receive msg id: " + msg.getMsgId()); + consumedMessages.add(msg.getMsgId()); + }); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + }); + consumer.setClientRebalance(false); + consumer.start(); + + await().atMost(Duration.ofMinutes(2)).until(() -> consumedMessages.size() >= MESSAGE_COUNT); + consumer.shutdown(); + System.out.printf("%s %d messages consumed%n", LocalDateTime.now(), consumedMessages.size()); + + List retryMsgList = new CopyOnWriteArrayList<>(); + DefaultMQPushConsumer pushConsumer = createPushConsumer(CONSUME_GROUP); + pushConsumer.subscribe(retryTopic, "*"); + pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { + for (MessageExt msg : msgs) { + System.out.printf("receive retry msg: %s %n", msg.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX)); + retryMsgList.add(new String(msg.getBody())); + } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + }); + pushConsumer.start(); + + System.out.printf("%s wait for ack revive%n", LocalDateTime.now()); + Thread.sleep(10000); + + assertThat(retryMsgList.size()).isEqualTo(0); + + cancelIsolatedBroker(master1With3Replicas); + System.out.printf("%s Cancel isolate master1%n", LocalDateTime.now()); + + //Add back master + master2With3Replicas = brokerContainer2.addBroker(master2With3Replicas.getBrokerConfig(), master2With3Replicas.getMessageStoreConfig()); + master2With3Replicas.start(); + cancelIsolatedBroker(master2With3Replicas); + System.out.printf("%s Add back master2%n", LocalDateTime.now()); + + awaitUntilSlaveOK(); + + System.out.printf("%s wait for ack revive%n", LocalDateTime.now()); + Thread.sleep(10000); + + assertThat(retryMsgList.size()).isEqualTo(0); + + System.out.printf("%s shutting down pushConsumer%n", LocalDateTime.now()); + pushConsumer.shutdown(); + } + + @Test + public void testRemoteActing_notAckSlave_getFromLocal() throws Exception { + String topic = PopSlaveActingMasterIT.class.getSimpleName() + random.nextInt(65535); + createTopic(topic); + this.switchPop(topic); + + String retryTopic = KeyBuilder.buildPopRetryTopic(topic, CONSUME_GROUP); + createTopic(retryTopic); + + producer.getDefaultMQProducerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(topic); + + Set sendToIsolateMsgSet = new HashSet<>(); + MessageQueue messageQueue = new MessageQueue(topic, master1With3Replicas.getBrokerConfig().getBrokerName(), 0); + int sendSuccess = 0; + for (int i = 0; i < MESSAGE_COUNT; i++) { + Message msg = new Message(topic, MESSAGE_BODY); + SendResult sendResult = producer.send(msg, messageQueue); + if (sendResult.getSendStatus() == SendStatus.SEND_OK) { + sendToIsolateMsgSet.add(new String(msg.getBody())); + sendSuccess++; + } + } + + System.out.printf("send success %d%n", sendSuccess); + final int finalSendSuccess = sendSuccess; + await().atMost(Duration.ofMinutes(1)).until(() -> finalSendSuccess >= MESSAGE_COUNT); + + isolateBroker(master1With3Replicas); + System.out.printf("isolate master1%n"); + + isolateBroker(master2With3Replicas); + brokerContainer2.removeBroker(new BrokerIdentity( + master2With3Replicas.getBrokerConfig().getBrokerClusterName(), + master2With3Replicas.getBrokerConfig().getBrokerName(), + master2With3Replicas.getBrokerConfig().getBrokerId())); + System.out.printf("Remove master2%n"); + + + DefaultMQPushConsumer consumer = createPushConsumer(CONSUME_GROUP); + consumer.subscribe(topic, "*"); + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + List consumedMessages = new CopyOnWriteArrayList<>(); + consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { + msgs.forEach(msg -> { + System.out.println("receive msg id: " + msg.getMsgId()); + consumedMessages.add(msg.getMsgId()); + }); + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + }); + consumer.setClientRebalance(false); + consumer.start(); + + await().atMost(Duration.ofMinutes(3)).until(() -> consumedMessages.size() >= MESSAGE_COUNT); + consumer.shutdown(); + + + List retryMsgList = new CopyOnWriteArrayList<>(); + DefaultMQPushConsumer pushConsumer = createPushConsumer(CONSUME_GROUP); + pushConsumer.subscribe(retryTopic, "*"); + pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { + for (MessageExt msg : msgs) { + System.out.printf("receive retry msg: %s%n", msg.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX)); + retryMsgList.add(new String(msg.getBody())); + } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + }); + pushConsumer.start(); + + System.out.printf("wait for ack revive%n"); + + await().atMost(Duration.ofMinutes(1)).until(() -> { + if (retryMsgList.size() < MESSAGE_COUNT) { + System.out.println("check FAILED: retryMsgList.size=" + retryMsgList.size() + " less than " + MESSAGE_COUNT); + return false; + } + + for (String msgBodyString : retryMsgList) { + if (!sendToIsolateMsgSet.contains(msgBodyString)) { + System.out.println("check FAILED: sendToIsolateMsgSet doesn't contain: " + msgBodyString); + return false; + } + } + return true; + }); + + System.out.printf("receive retry msg as expected%n"); + + cancelIsolatedBroker(master1With3Replicas); + System.out.printf("Cancel isolate master1%n"); + + //Add back master + master2With3Replicas = brokerContainer2.addBroker(master2With3Replicas.getBrokerConfig(), master2With3Replicas.getMessageStoreConfig()); + master2With3Replicas.start(); + cancelIsolatedBroker(master2With3Replicas); + System.out.printf("Add back master2%n"); + + awaitUntilSlaveOK(); + pushConsumer.shutdown(); + } + + @Test + public void testRemoteActing_notAckSlave_getFromRemote() throws Exception { + String topic = PopSlaveActingMasterIT.class.getSimpleName() + random.nextInt(65535); + createTopic(topic); + this.switchPop(topic); + String retryTopic = KeyBuilder.buildPopRetryTopic(topic, CONSUME_GROUP); + createTopic(retryTopic); + + producer.getDefaultMQProducerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(topic); + + Set sendToIsolateMsgSet = new HashSet<>(); + MessageQueue messageQueue = new MessageQueue(topic, master1With3Replicas.getBrokerConfig().getBrokerName(), 0); + int sendSuccess = 0; + for (int i = 0; i < MESSAGE_COUNT; i++) { + Message msg = new Message(topic, MESSAGE_BODY); + SendResult sendResult = producer.send(msg, messageQueue); + if (sendResult.getSendStatus() == SendStatus.SEND_OK) { + sendToIsolateMsgSet.add(new String(msg.getBody())); + sendSuccess++; + } + } + + System.out.printf("send success %d%n", sendSuccess); + final int finalSendSuccess = sendSuccess; + await().atMost(Duration.ofMinutes(1)).until(() -> finalSendSuccess >= MESSAGE_COUNT); + + isolateBroker(master1With3Replicas); + System.out.printf("isolate master1%n"); + + isolateBroker(master2With3Replicas); + brokerContainer2.removeBroker(new BrokerIdentity( + master2With3Replicas.getBrokerConfig().getBrokerClusterName(), + master2With3Replicas.getBrokerConfig().getBrokerName(), + master2With3Replicas.getBrokerConfig().getBrokerId())); + System.out.printf("Remove master2%n"); + + BrokerController slave1InBrokerContainer3 = getSlaveFromContainerByName(brokerContainer3, master1With3Replicas.getBrokerConfig().getBrokerName()); + isolateBroker(slave1InBrokerContainer3); + brokerContainer3.removeBroker(new BrokerIdentity( + slave1InBrokerContainer3.getBrokerConfig().getBrokerClusterName(), + slave1InBrokerContainer3.getBrokerConfig().getBrokerName(), + slave1InBrokerContainer3.getBrokerConfig().getBrokerId())); + System.out.printf("Remove slave1 form container3%n"); + + DefaultMQPushConsumer consumer = createPushConsumer(CONSUME_GROUP); + consumer.subscribe(topic, "*"); + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + List consumedMessages = new CopyOnWriteArrayList<>(); + consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { + msgs.forEach(msg -> { + System.out.println("receive msg id: " + msg.getMsgId()); + consumedMessages.add(msg.getMsgId()); + }); + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + }); + consumer.setClientRebalance(false); + consumer.start(); + + await().atMost(Duration.ofMinutes(1)).until(() -> consumedMessages.size() >= MESSAGE_COUNT); + System.out.printf("%s pop receive msg count: %d%n", LocalDateTime.now(), consumedMessages.size()); + consumer.shutdown(); + + + List retryMsgList = new CopyOnWriteArrayList<>(); + DefaultMQPushConsumer pushConsumer = createPushConsumer(CONSUME_GROUP); + pushConsumer.subscribe(retryTopic, "*"); + pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { + for (MessageExt msg : msgs) { + System.out.printf("receive retry msg: %s%n", msg.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX)); + retryMsgList.add(new String(msg.getBody())); + } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + }); + pushConsumer.start(); + + System.out.printf("wait for ack revive%n"); + Thread.sleep(10000); + + await().atMost(Duration.ofMinutes(1)).until(() -> { + if (retryMsgList.size() < MESSAGE_COUNT) { + return false; + } + + for (String msgBodyString : retryMsgList) { + if (!sendToIsolateMsgSet.contains(msgBodyString)) { + return false; + } + } + return true; + }); + + System.out.printf("receive retry msg as expected%n"); + + cancelIsolatedBroker(master1With3Replicas); + System.out.printf("Cancel isolate master1%n"); + + //Add back master + master2With3Replicas = brokerContainer2.addBroker(master2With3Replicas.getBrokerConfig(), master2With3Replicas.getMessageStoreConfig()); + master2With3Replicas.start(); + cancelIsolatedBroker(master2With3Replicas); + System.out.printf("Add back master2%n"); + + //Add back slave1 to container3 + slave1InBrokerContainer3 = brokerContainer3.addBroker(slave1InBrokerContainer3.getBrokerConfig(), slave1InBrokerContainer3.getMessageStoreConfig()); + slave1InBrokerContainer3.start(); + cancelIsolatedBroker(slave1InBrokerContainer3); + System.out.printf("Add back slave1 to container3%n"); + + awaitUntilSlaveOK(); + pushConsumer.shutdown(); + } + + private void switchPop(String topic) throws Exception { + for (BrokerContainer brokerContainer : brokerContainerList) { + for (InnerBrokerController master : brokerContainer.getMasterBrokers()) { + String brokerAddr = master.getBrokerAddr(); + defaultMQAdminExt.setMessageRequestMode(brokerAddr, topic, CONSUME_GROUP, MessageRequestMode.POP, 8, 60_000); + } + for (InnerSalveBrokerController slave : brokerContainer.getSlaveBrokers()) { + defaultMQAdminExt.setMessageRequestMode(slave.getBrokerAddr(), topic, CONSUME_GROUP, MessageRequestMode.POP, 8, 60_000); + } + } + + } + +} \ No newline at end of file