From a80a9eba99f9ff969d5b94b0e6cf8eba6ea3cb1c Mon Sep 17 00:00:00 2001 From: Zhanhui Li Date: Tue, 11 Oct 2022 16:00:19 +0800 Subject: [PATCH] Fix NPE in CI --- .../test/lmq/benchmark/BenchLmqStore.java | 108 +++++++++--------- .../rocketmq/test/lmq/TestBenchLmqStore.java | 6 +- 2 files changed, 60 insertions(+), 54 deletions(-) diff --git a/test/src/main/java/org/apache/rocketmq/test/lmq/benchmark/BenchLmqStore.java b/test/src/main/java/org/apache/rocketmq/test/lmq/benchmark/BenchLmqStore.java index df71fb3bef3..2d7bace1dd2 100644 --- a/test/src/main/java/org/apache/rocketmq/test/lmq/benchmark/BenchLmqStore.java +++ b/test/src/main/java/org/apache/rocketmq/test/lmq/benchmark/BenchLmqStore.java @@ -19,6 +19,7 @@ import com.google.common.math.IntMath; import com.google.common.math.LongMath; import java.nio.charset.StandardCharsets; +import java.util.Collections; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.PullCallback; @@ -40,7 +41,6 @@ import org.apache.rocketmq.test.util.StatUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -51,6 +51,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; + public class BenchLmqStore { private static Logger logger = LoggerFactory.getLogger(BenchLmqStore.class); private static String namesrv = System.getProperty("namesrv", "127.0.0.1:9876"); @@ -91,7 +92,7 @@ public static void main(String[] args) throws InterruptedException, MQClientExce defaultMQPullConsumer.setVipChannelEnabled(false); defaultMQPullConsumer.setConsumerGroup("CID_RMQ_SYS_LMQ_TEST_" + i); defaultMQPullConsumer.setInstanceName("CID_RMQ_SYS_LMQ_TEST_" + i); - defaultMQPullConsumer.setRegisterTopics(new HashSet<>(Arrays.asList(lmqTopic))); + defaultMQPullConsumer.setRegisterTopics(new HashSet<>(Collections.singletonList(lmqTopic))); defaultMQPullConsumer.setBrokerSuspendMaxTimeMillis(suspendTime); defaultMQPullConsumer.setConsumerTimeoutMillisWhenSuspend(suspendTime + 1000); defaultMQPullConsumer.start(); @@ -140,6 +141,7 @@ public static void main(String[] args) throws InterruptedException, MQClientExce Thread.sleep(5000L); doSend(); } + public static void doSend() { StringBuilder sb = new StringBuilder(); for (int j = 0; j < size; j += 10) { @@ -172,9 +174,9 @@ public static void doSend() { if (StatUtil.nowTps(pubKey) < 10) { logger.warn("pub: {} ", sendResult.getMsgId()); } - if (enableSub) { + if (enableSub && null != sendResult.getMessageQueue()) { MessageQueue mq = new MessageQueue(queue, sendResult.getMessageQueue().getBrokerName(), - lmqNum > 0 ? 0 : sendResult.getMessageQueue().getQueueId()); + lmqNum > 0 ? 0 : sendResult.getMessageQueue().getQueueId()); int queueHash = IntMath.mod(queue.hashCode(), consumerThreadNum); pullEvent.putIfAbsent(queueHash, new ConcurrentHashMap<>()); pullEvent.get(queueHash).put(mq, idx); @@ -187,7 +189,9 @@ public static void doSend() { }); } } - public static void doPull(Map eventMap, MessageQueue mq, Long eventId) throws RemotingException, InterruptedException, MQClientException { + + public static void doPull(Map eventMap, MessageQueue mq, + Long eventId) throws RemotingException, InterruptedException, MQClientException { if (!enableSub) { eventMap.remove(mq, eventId); pullStatus.remove(mq); @@ -206,47 +210,49 @@ public static void doPull(Map eventMap, MessageQueue mq, Lon return; } defaultMQPullConsumer.pullBlockIfNotFound( - mq, "*", offset, 32, - new PullCallback() { - @Override - public void onSuccess(PullResult pullResult) { - StatUtil.addInvoke(pullResult.getPullStatus().name(), System.currentTimeMillis() - start); - eventMap.remove(mq, eventId); - pullStatus.remove(mq); - offsetMap.put(mq, pullResult.getNextBeginOffset()); - StatUtil.addInvoke("doPull", System.currentTimeMillis() - start); - if (PullStatus.NO_MATCHED_MSG.equals(pullResult.getPullStatus()) && RETRY_NO_MATCHED_MSG) { - long idx = rid.incrementAndGet(); - eventMap.put(mq, idx); - } - List list = pullResult.getMsgFoundList(); - if (list == null || list.isEmpty()) { - StatUtil.addInvoke("NoMsg", System.currentTimeMillis() - start); - return; - } - for (MessageExt messageExt : list) { - StatUtil.addInvoke("sub", System.currentTimeMillis() - messageExt.getBornTimestamp()); - if (StatUtil.nowTps("sub") < 10) { - logger.warn("sub: {}", messageExt.getMsgId()); - } - } + mq, "*", offset, 32, + new PullCallback() { + @Override + public void onSuccess(PullResult pullResult) { + StatUtil.addInvoke(pullResult.getPullStatus().name(), System.currentTimeMillis() - start); + eventMap.remove(mq, eventId); + pullStatus.remove(mq); + offsetMap.put(mq, pullResult.getNextBeginOffset()); + StatUtil.addInvoke("doPull", System.currentTimeMillis() - start); + if (PullStatus.NO_MATCHED_MSG.equals(pullResult.getPullStatus()) && RETRY_NO_MATCHED_MSG) { + long idx = rid.incrementAndGet(); + eventMap.put(mq, idx); } - @Override - public void onException(Throwable e) { - eventMap.remove(mq, eventId); - pullStatus.remove(mq); - logger.error("", e); - StatUtil.addInvoke("doPull", System.currentTimeMillis() - start, false); + List list = pullResult.getMsgFoundList(); + if (list == null || list.isEmpty()) { + StatUtil.addInvoke("NoMsg", System.currentTimeMillis() - start); + return; + } + for (MessageExt messageExt : list) { + StatUtil.addInvoke("sub", System.currentTimeMillis() - messageExt.getBornTimestamp()); + if (StatUtil.nowTps("sub") < 10) { + logger.warn("sub: {}", messageExt.getMsgId()); + } } - }); + } + + @Override + public void onException(Throwable e) { + eventMap.remove(mq, eventId); + pullStatus.remove(mq); + logger.error("", e); + StatUtil.addInvoke("doPull", System.currentTimeMillis() - start, false); + } + }); } + public static void doBenchOffset() throws RemotingException, InterruptedException, MQClientException { ExecutorService sendPool = Executors.newFixedThreadPool(sendThreadNum); Map offsetMap = new ConcurrentHashMap<>(); String statKey = "benchOffset"; TopicRouteData topicRouteData = defaultMQPullConsumers[0].getDefaultMQPullConsumerImpl(). - getRebalanceImpl().getmQClientFactory().getMQClientAPIImpl(). - getTopicRouteInfoFromNameServer(lmqTopic, 3000); + getRebalanceImpl().getmQClientFactory().getMQClientAPIImpl(). + getTopicRouteInfoFromNameServer(lmqTopic, 3000); HashMap brokerMap = topicRouteData.getBrokerDatas().get(0).getBrokerAddrs(); if (brokerMap == null || brokerMap.isEmpty()) { return; @@ -263,14 +269,12 @@ public void run() { Thread.sleep(100L); } long start = System.currentTimeMillis(); - DefaultMQPullConsumer defaultMQPullConsumer = defaultMQPullConsumers[(int) (rid.incrementAndGet() % pullConsumerNum)]; long id = rid.incrementAndGet(); + int index = (Integer.MAX_VALUE & (int) id) % defaultMQPullConsumers.length; + DefaultMQPullConsumer defaultMQPullConsumer = defaultMQPullConsumers[index]; String lmq = LMQ_PREFIX + queuePrefix + id % benchOffsetNum; String lmqCid = LMQ_PREFIX + "GID_LMQ@@c" + flag + "-" + id % benchOffsetNum; - Long offset = offsetMap.get(lmq); - if (offset == null) { - offsetMap.put(lmq, 0L); - } + offsetMap.putIfAbsent(lmq, 0L); long newOffset1 = offsetMap.get(lmq) + 1; UpdateConsumerOffsetRequestHeader updateHeader = new UpdateConsumerOffsetRequestHeader(); updateHeader.setTopic(lmq); @@ -278,20 +282,20 @@ public void run() { updateHeader.setQueueId(0); updateHeader.setCommitOffset(newOffset1); defaultMQPullConsumer - .getDefaultMQPullConsumerImpl() - .getRebalanceImpl() - .getmQClientFactory() - .getMQClientAPIImpl().updateConsumerOffset(brokerAddress, updateHeader, 1000); + .getDefaultMQPullConsumerImpl() + .getRebalanceImpl() + .getmQClientFactory() + .getMQClientAPIImpl().updateConsumerOffset(brokerAddress, updateHeader, 1000); QueryConsumerOffsetRequestHeader queryHeader = new QueryConsumerOffsetRequestHeader(); queryHeader.setTopic(lmq); queryHeader.setConsumerGroup(lmqCid); queryHeader.setQueueId(0); long newOffset2 = defaultMQPullConsumer - .getDefaultMQPullConsumerImpl() - .getRebalanceImpl() - .getmQClientFactory() - .getMQClientAPIImpl() - .queryConsumerOffset(brokerAddress, queryHeader, 1000); + .getDefaultMQPullConsumerImpl() + .getRebalanceImpl() + .getmQClientFactory() + .getMQClientAPIImpl() + .queryConsumerOffset(brokerAddress, queryHeader, 1000); offsetMap.put(lmq, newOffset2); if (newOffset1 != newOffset2) { StatUtil.addInvoke("ErrorOffset", 1); diff --git a/test/src/test/java/org/apache/rocketmq/test/lmq/TestBenchLmqStore.java b/test/src/test/java/org/apache/rocketmq/test/lmq/TestBenchLmqStore.java index 3457d6106b4..3135bcc3ea0 100644 --- a/test/src/test/java/org/apache/rocketmq/test/lmq/TestBenchLmqStore.java +++ b/test/src/test/java/org/apache/rocketmq/test/lmq/TestBenchLmqStore.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.test.lmq; +import java.util.Collections; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.PullCallback; @@ -39,10 +40,10 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.junit.MockitoJUnitRunner; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.concurrent.ConcurrentHashMap; + import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; @@ -71,6 +72,7 @@ public void test() throws MQBrokerException, RemotingException, InterruptedExcep verify(BenchLmqStore.defaultMQPullConsumers[0], atLeastOnce()).pullBlockIfNotFound(any(MessageQueue.class), anyString(), anyLong(), anyInt(), any( PullCallback.class)); } + @Test public void testOffset() throws RemotingException, InterruptedException, MQClientException, MQBrokerException, IllegalAccessException { System.setProperty("sendThreadNum", "1"); @@ -88,7 +90,7 @@ public void testOffset() throws RemotingException, InterruptedException, MQClien TopicRouteData topicRouteData = new TopicRouteData(); HashMap brokerAddrs = new HashMap<>(); brokerAddrs.put(MixAll.MASTER_ID, "test"); - List brokerData = Arrays.asList(new BrokerData("test", "test", brokerAddrs)); + List brokerData = Collections.singletonList(new BrokerData("test", "test", brokerAddrs)); topicRouteData.setBrokerDatas(brokerData); FieldUtils.writeStaticField(BenchLmqStore.class, "lmqTopic", "test", true); when(mqClientAPI.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(topicRouteData);