Skip to content

Commit

Permalink
Fix NPE in CI (#5273)
Browse files Browse the repository at this point in the history
  • Loading branch information
lizhanhui committed Oct 11, 2022
1 parent 2b412a6 commit 94d8e55
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.math.IntMath;
import com.google.common.math.LongMath;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullCallback;
Expand All @@ -40,7 +41,6 @@
import org.apache.rocketmq.test.util.StatUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -51,6 +51,7 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class BenchLmqStore {
private static Logger logger = LoggerFactory.getLogger(BenchLmqStore.class);
private static String namesrv = System.getProperty("namesrv", "127.0.0.1:9876");
Expand Down Expand Up @@ -91,7 +92,7 @@ public static void main(String[] args) throws InterruptedException, MQClientExce
defaultMQPullConsumer.setVipChannelEnabled(false);
defaultMQPullConsumer.setConsumerGroup("CID_RMQ_SYS_LMQ_TEST_" + i);
defaultMQPullConsumer.setInstanceName("CID_RMQ_SYS_LMQ_TEST_" + i);
defaultMQPullConsumer.setRegisterTopics(new HashSet<>(Arrays.asList(lmqTopic)));
defaultMQPullConsumer.setRegisterTopics(new HashSet<>(Collections.singletonList(lmqTopic)));
defaultMQPullConsumer.setBrokerSuspendMaxTimeMillis(suspendTime);
defaultMQPullConsumer.setConsumerTimeoutMillisWhenSuspend(suspendTime + 1000);
defaultMQPullConsumer.start();
Expand Down Expand Up @@ -140,6 +141,7 @@ public static void main(String[] args) throws InterruptedException, MQClientExce
Thread.sleep(5000L);
doSend();
}

public static void doSend() {
StringBuilder sb = new StringBuilder();
for (int j = 0; j < size; j += 10) {
Expand Down Expand Up @@ -172,9 +174,9 @@ public static void doSend() {
if (StatUtil.nowTps(pubKey) < 10) {
logger.warn("pub: {} ", sendResult.getMsgId());
}
if (enableSub) {
if (enableSub && null != sendResult.getMessageQueue()) {
MessageQueue mq = new MessageQueue(queue, sendResult.getMessageQueue().getBrokerName(),
lmqNum > 0 ? 0 : sendResult.getMessageQueue().getQueueId());
lmqNum > 0 ? 0 : sendResult.getMessageQueue().getQueueId());
int queueHash = IntMath.mod(queue.hashCode(), consumerThreadNum);
pullEvent.putIfAbsent(queueHash, new ConcurrentHashMap<>());
pullEvent.get(queueHash).put(mq, idx);
Expand All @@ -187,7 +189,9 @@ public static void doSend() {
});
}
}
public static void doPull(Map<MessageQueue, Long> eventMap, MessageQueue mq, Long eventId) throws RemotingException, InterruptedException, MQClientException {

public static void doPull(Map<MessageQueue, Long> eventMap, MessageQueue mq,
Long eventId) throws RemotingException, InterruptedException, MQClientException {
if (!enableSub) {
eventMap.remove(mq, eventId);
pullStatus.remove(mq);
Expand All @@ -206,47 +210,49 @@ public static void doPull(Map<MessageQueue, Long> eventMap, MessageQueue mq, Lon
return;
}
defaultMQPullConsumer.pullBlockIfNotFound(
mq, "*", offset, 32,
new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
StatUtil.addInvoke(pullResult.getPullStatus().name(), System.currentTimeMillis() - start);
eventMap.remove(mq, eventId);
pullStatus.remove(mq);
offsetMap.put(mq, pullResult.getNextBeginOffset());
StatUtil.addInvoke("doPull", System.currentTimeMillis() - start);
if (PullStatus.NO_MATCHED_MSG.equals(pullResult.getPullStatus()) && RETRY_NO_MATCHED_MSG) {
long idx = rid.incrementAndGet();
eventMap.put(mq, idx);
}
List<MessageExt> list = pullResult.getMsgFoundList();
if (list == null || list.isEmpty()) {
StatUtil.addInvoke("NoMsg", System.currentTimeMillis() - start);
return;
}
for (MessageExt messageExt : list) {
StatUtil.addInvoke("sub", System.currentTimeMillis() - messageExt.getBornTimestamp());
if (StatUtil.nowTps("sub") < 10) {
logger.warn("sub: {}", messageExt.getMsgId());
}
}
mq, "*", offset, 32,
new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
StatUtil.addInvoke(pullResult.getPullStatus().name(), System.currentTimeMillis() - start);
eventMap.remove(mq, eventId);
pullStatus.remove(mq);
offsetMap.put(mq, pullResult.getNextBeginOffset());
StatUtil.addInvoke("doPull", System.currentTimeMillis() - start);
if (PullStatus.NO_MATCHED_MSG.equals(pullResult.getPullStatus()) && RETRY_NO_MATCHED_MSG) {
long idx = rid.incrementAndGet();
eventMap.put(mq, idx);
}
@Override
public void onException(Throwable e) {
eventMap.remove(mq, eventId);
pullStatus.remove(mq);
logger.error("", e);
StatUtil.addInvoke("doPull", System.currentTimeMillis() - start, false);
List<MessageExt> list = pullResult.getMsgFoundList();
if (list == null || list.isEmpty()) {
StatUtil.addInvoke("NoMsg", System.currentTimeMillis() - start);
return;
}
for (MessageExt messageExt : list) {
StatUtil.addInvoke("sub", System.currentTimeMillis() - messageExt.getBornTimestamp());
if (StatUtil.nowTps("sub") < 10) {
logger.warn("sub: {}", messageExt.getMsgId());
}
}
});
}

@Override
public void onException(Throwable e) {
eventMap.remove(mq, eventId);
pullStatus.remove(mq);
logger.error("", e);
StatUtil.addInvoke("doPull", System.currentTimeMillis() - start, false);
}
});
}

public static void doBenchOffset() throws RemotingException, InterruptedException, MQClientException {
ExecutorService sendPool = Executors.newFixedThreadPool(sendThreadNum);
Map<String, Long> offsetMap = new ConcurrentHashMap<>();
String statKey = "benchOffset";
TopicRouteData topicRouteData = defaultMQPullConsumers[0].getDefaultMQPullConsumerImpl().
getRebalanceImpl().getmQClientFactory().getMQClientAPIImpl().
getTopicRouteInfoFromNameServer(lmqTopic, 3000);
getRebalanceImpl().getmQClientFactory().getMQClientAPIImpl().
getTopicRouteInfoFromNameServer(lmqTopic, 3000);
HashMap<Long, String> brokerMap = topicRouteData.getBrokerDatas().get(0).getBrokerAddrs();
if (brokerMap == null || brokerMap.isEmpty()) {
return;
Expand All @@ -263,35 +269,33 @@ public void run() {
Thread.sleep(100L);
}
long start = System.currentTimeMillis();
DefaultMQPullConsumer defaultMQPullConsumer = defaultMQPullConsumers[(int) (rid.incrementAndGet() % pullConsumerNum)];
long id = rid.incrementAndGet();
int index = (Integer.MAX_VALUE & (int) id) % defaultMQPullConsumers.length;
DefaultMQPullConsumer defaultMQPullConsumer = defaultMQPullConsumers[index];
String lmq = LMQ_PREFIX + queuePrefix + id % benchOffsetNum;
String lmqCid = LMQ_PREFIX + "GID_LMQ@@c" + flag + "-" + id % benchOffsetNum;
Long offset = offsetMap.get(lmq);
if (offset == null) {
offsetMap.put(lmq, 0L);
}
offsetMap.putIfAbsent(lmq, 0L);
long newOffset1 = offsetMap.get(lmq) + 1;
UpdateConsumerOffsetRequestHeader updateHeader = new UpdateConsumerOffsetRequestHeader();
updateHeader.setTopic(lmq);
updateHeader.setConsumerGroup(lmqCid);
updateHeader.setQueueId(0);
updateHeader.setCommitOffset(newOffset1);
defaultMQPullConsumer
.getDefaultMQPullConsumerImpl()
.getRebalanceImpl()
.getmQClientFactory()
.getMQClientAPIImpl().updateConsumerOffset(brokerAddress, updateHeader, 1000);
.getDefaultMQPullConsumerImpl()
.getRebalanceImpl()
.getmQClientFactory()
.getMQClientAPIImpl().updateConsumerOffset(brokerAddress, updateHeader, 1000);
QueryConsumerOffsetRequestHeader queryHeader = new QueryConsumerOffsetRequestHeader();
queryHeader.setTopic(lmq);
queryHeader.setConsumerGroup(lmqCid);
queryHeader.setQueueId(0);
long newOffset2 = defaultMQPullConsumer
.getDefaultMQPullConsumerImpl()
.getRebalanceImpl()
.getmQClientFactory()
.getMQClientAPIImpl()
.queryConsumerOffset(brokerAddress, queryHeader, 1000);
.getDefaultMQPullConsumerImpl()
.getRebalanceImpl()
.getmQClientFactory()
.getMQClientAPIImpl()
.queryConsumerOffset(brokerAddress, queryHeader, 1000);
offsetMap.put(lmq, newOffset2);
if (newOffset1 != newOffset2) {
StatUtil.addInvoke("ErrorOffset", 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.test.lmq;

import java.util.Collections;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullCallback;
Expand All @@ -39,10 +40,10 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
Expand Down Expand Up @@ -71,6 +72,7 @@ public void test() throws MQBrokerException, RemotingException, InterruptedExcep
verify(BenchLmqStore.defaultMQPullConsumers[0], atLeastOnce()).pullBlockIfNotFound(any(MessageQueue.class), anyString(), anyLong(), anyInt(), any(
PullCallback.class));
}

@Test
public void testOffset() throws RemotingException, InterruptedException, MQClientException, MQBrokerException, IllegalAccessException {
System.setProperty("sendThreadNum", "1");
Expand All @@ -88,7 +90,7 @@ public void testOffset() throws RemotingException, InterruptedException, MQClien
TopicRouteData topicRouteData = new TopicRouteData();
HashMap<Long, String> brokerAddrs = new HashMap<>();
brokerAddrs.put(MixAll.MASTER_ID, "test");
List<BrokerData> brokerData = Arrays.asList(new BrokerData("test", "test", brokerAddrs));
List<BrokerData> brokerData = Collections.singletonList(new BrokerData("test", "test", brokerAddrs));
topicRouteData.setBrokerDatas(brokerData);
FieldUtils.writeStaticField(BenchLmqStore.class, "lmqTopic", "test", true);
when(mqClientAPI.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(topicRouteData);
Expand Down

0 comments on commit 94d8e55

Please sign in to comment.