Skip to content

Commit

Permalink
[ISSUE #8227] Optimize DefaultMQPushConsumer construction method (#8228)
Browse files Browse the repository at this point in the history
  • Loading branch information
yx9o committed Jun 13, 2024
1 parent 6b59155 commit 017b753
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@
*/
package org.apache.rocketmq.client.consumer;

import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
Expand All @@ -40,12 +36,17 @@
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

/**
* In most scenarios, this is the mostly recommended class to consume messages.
Expand Down Expand Up @@ -328,10 +329,7 @@ public DefaultMQPushConsumer(RPCHook rpcHook) {
* @param rpcHook RPC hook to execute before each remoting command.
*/
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook) {
this.consumerGroup = consumerGroup;
this.rpcHook = rpcHook;
this.allocateMessageQueueStrategy = new AllocateMessageQueueAveragely();
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
this(consumerGroup, rpcHook, new AllocateMessageQueueAveragely());
}


Expand All @@ -355,10 +353,7 @@ public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace,
*/
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.consumerGroup = consumerGroup;
this.rpcHook = rpcHook;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
this(consumerGroup, rpcHook, allocateMessageQueueStrategy, false, null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,15 @@
*/
package org.apache.rocketmq.client.consumer;

import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragelyByCircle;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.CommunicationMode;
Expand All @@ -53,6 +41,7 @@
import org.apache.rocketmq.client.impl.consumer.PullResultExt;
import org.apache.rocketmq.client.impl.consumer.RebalanceImpl;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
Expand All @@ -62,7 +51,6 @@
import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -71,15 +59,35 @@
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;

import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -206,7 +214,7 @@ public static void terminate() {

@Test
public void testStart_OffsetShouldNotNUllAfterStart() {
Assert.assertNotNull(pushConsumer.getOffsetStore());
assertNotNull(pushConsumer.getOffsetStore());
}

@Test
Expand Down Expand Up @@ -388,4 +396,22 @@ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
pullMessageService.executePullRequestImmediately(createPullRequest());
assertThat(messageExts[0]).isNull();
}

@Test
public void assertCreatePushConsumer() {
DefaultMQPushConsumer pushConsumer1 = new DefaultMQPushConsumer(consumerGroup, mock(RPCHook.class));
assertNotNull(pushConsumer1);
assertEquals(consumerGroup, pushConsumer1.getConsumerGroup());
assertTrue(pushConsumer1.getAllocateMessageQueueStrategy() instanceof AllocateMessageQueueAveragely);
assertNotNull(pushConsumer1.defaultMQPushConsumerImpl);
assertFalse(pushConsumer1.isEnableTrace());
assertTrue(UtilAll.isBlank(pushConsumer1.getTraceTopic()));
DefaultMQPushConsumer pushConsumer2 = new DefaultMQPushConsumer(consumerGroup, mock(RPCHook.class), new AllocateMessageQueueAveragelyByCircle());
assertNotNull(pushConsumer2);
assertEquals(consumerGroup, pushConsumer2.getConsumerGroup());
assertTrue(pushConsumer2.getAllocateMessageQueueStrategy() instanceof AllocateMessageQueueAveragelyByCircle);
assertNotNull(pushConsumer2.defaultMQPushConsumerImpl);
assertFalse(pushConsumer2.isEnableTrace());
assertTrue(UtilAll.isBlank(pushConsumer2.getTraceTopic()));
}
}

0 comments on commit 017b753

Please sign in to comment.