Skip to content

Commit

Permalink
ROCKETMQ-51] Add unit tests for PullMessageProcessor
Browse files Browse the repository at this point in the history
  • Loading branch information
zhouxinyu committed Jan 21, 2017
1 parent b636c37 commit d87223f
Show file tree
Hide file tree
Showing 2 changed files with 199 additions and 10 deletions.
@@ -0,0 +1,197 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.processor;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.net.InetSocketAddress;
import java.util.HashSet;
import java.util.Set;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

@RunWith(MockitoJUnitRunner.class)
public class PullMessageProcessorTest {
private PullMessageProcessor pullMessageProcessor;
@Spy
private BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new MessageStoreConfig());
@Mock
private ChannelHandlerContext handlerContext;
@Mock
private MessageStore messageStore;
private ClientChannelInfo clientChannelInfo;
private String group = "FooBarGroup";
private String topic = "FooBar";

@Before
public void init() {
brokerController.setMessageStore(messageStore);
pullMessageProcessor = new PullMessageProcessor(brokerController);
Channel mockChannel = mock(Channel.class);
when(mockChannel.remoteAddress()).thenReturn(new InetSocketAddress(1024));
when(handlerContext.channel()).thenReturn(mockChannel);
brokerController.getTopicConfigManager().getTopicConfigTable().put(topic, new TopicConfig());
clientChannelInfo = new ClientChannelInfo(mockChannel);
ConsumerData consumerData = createConsumerData();
brokerController.getConsumerManager().registerConsumer(
consumerData.getGroupName(),
clientChannelInfo,
consumerData.getConsumeType(),
consumerData.getMessageModel(),
consumerData.getConsumeFromWhere(),
consumerData.getSubscriptionDataSet(),
false);
}

@Test
public void testProcessRequest_TopicNotExist() throws RemotingCommandException {
brokerController.getTopicConfigManager().getTopicConfigTable().remove(topic);
final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE);
RemotingCommand response = pullMessageProcessor.processRequest(handlerContext, request);
assertThat(response).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.TOPIC_NOT_EXIST);
assertThat(response.getRemark()).contains("topic[" + topic + "] not exist");
}

@Test
public void testProcessRequest_SubNotExist() throws RemotingCommandException {
brokerController.getConsumerManager().unregisterConsumer(group, clientChannelInfo, false);
final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE);
RemotingCommand response = pullMessageProcessor.processRequest(handlerContext, request);
assertThat(response).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.SUBSCRIPTION_NOT_EXIST);
assertThat(response.getRemark()).contains("consumer's group info not exist");
}

@Test
public void testProcessRequest_SubNotLatest() throws RemotingCommandException {
final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE);
request.addExtField("subVersion", String.valueOf(101));
RemotingCommand response = pullMessageProcessor.processRequest(handlerContext, request);
assertThat(response).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.SUBSCRIPTION_NOT_LATEST);
assertThat(response.getRemark()).contains("subscription not latest");
}

@Test
public void testProcessRequest_Found() throws RemotingCommandException {
GetMessageResult getMessageResult = createGetMessageResult();
when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any(SubscriptionData.class))).thenReturn(getMessageResult);

final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE);
RemotingCommand response = pullMessageProcessor.processRequest(handlerContext, request);
assertThat(response).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}

@Test
public void testProcessRequest_MsgWasRemoving() throws RemotingCommandException {
GetMessageResult getMessageResult = createGetMessageResult();
getMessageResult.setStatus(GetMessageStatus.MESSAGE_WAS_REMOVING);
when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any(SubscriptionData.class))).thenReturn(getMessageResult);

final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE);
RemotingCommand response = pullMessageProcessor.processRequest(handlerContext, request);
assertThat(response).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.PULL_RETRY_IMMEDIATELY);
}

@Test
public void testProcessRequest_NoMsgInQueue() throws RemotingCommandException {
GetMessageResult getMessageResult = createGetMessageResult();
getMessageResult.setStatus(GetMessageStatus.NO_MESSAGE_IN_QUEUE);
when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any(SubscriptionData.class))).thenReturn(getMessageResult);

final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE);
RemotingCommand response = pullMessageProcessor.processRequest(handlerContext, request);
assertThat(response).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.PULL_OFFSET_MOVED);
}

private RemotingCommand createPullMsgCommand(int requestCode) {
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setCommitOffset(123L);
requestHeader.setConsumerGroup(group);
requestHeader.setMaxMsgNums(100);
requestHeader.setQueueId(1);
requestHeader.setQueueOffset(456L);
requestHeader.setSubscription("*");
requestHeader.setTopic(topic);
requestHeader.setSysFlag(0);
requestHeader.setSubVersion(100L);
RemotingCommand request = RemotingCommand.createRequestCommand(requestCode, requestHeader);
request.makeCustomHeaderToNet();
return request;
}

private ConsumerData createConsumerData() {
ConsumerData consumerData = new ConsumerData();
consumerData.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumerData.setConsumeType(ConsumeType.CONSUME_PASSIVELY);
consumerData.setGroupName(group);
consumerData.setMessageModel(MessageModel.CLUSTERING);
Set<SubscriptionData> subscriptionDataSet = new HashSet<>();
SubscriptionData subscriptionData = new SubscriptionData();
subscriptionData.setTopic(topic);
subscriptionData.setSubString("*");
subscriptionData.setSubVersion(100L);
subscriptionDataSet.add(subscriptionData);
consumerData.setSubscriptionDataSet(subscriptionDataSet);
return consumerData;
}

private GetMessageResult createGetMessageResult() {
GetMessageResult getMessageResult = new GetMessageResult();
getMessageResult.setStatus(GetMessageStatus.FOUND);
getMessageResult.setMinOffset(100);
getMessageResult.setMaxOffset(1024);
getMessageResult.setNextBeginOffset(516);
return getMessageResult;
}
}
Expand Up @@ -153,13 +153,7 @@ private RemotingCommand createSendMsgCommand(int requestCode) {
requestHeader.setReconsumeTimes(0);

RemotingCommand request = RemotingCommand.createRequestCommand(requestCode, requestHeader);
request.addExtField("queueId", String.valueOf(requestHeader.getQueueId()));
request.addExtField("topic", String.valueOf(requestHeader.getTopic()));
request.addExtField("defaultTopicQueueNums", String.valueOf(requestHeader.getDefaultTopicQueueNums()));
request.addExtField("defaultTopic", requestHeader.getDefaultTopic());
request.addExtField("sysFlag", String.valueOf(requestHeader.getSysFlag()));
request.addExtField("flag", String.valueOf(requestHeader.getFlag()));
request.addExtField("bornTimestamp", String.valueOf(requestHeader.getBornTimestamp()));
request.makeCustomHeaderToNet();
return request;
}

Expand All @@ -172,9 +166,7 @@ private RemotingCommand createSendMsgBackCommand(int requestCode) {
requestHeader.setOffset(123L);

RemotingCommand request = RemotingCommand.createRequestCommand(requestCode, requestHeader);
request.addExtField("group", requestHeader.getGroup());
request.addExtField("offset", String.valueOf(requestHeader.getOffset()));
request.addExtField("delayLevel", String.valueOf(requestHeader.getDelayLevel()));
request.makeCustomHeaderToNet();
return request;
}

Expand Down

0 comments on commit d87223f

Please sign in to comment.