Skip to content

Commit

Permalink
[ISSUE #8025] fix: fix topic route when topic deleted
Browse files Browse the repository at this point in the history
  • Loading branch information
HScarb committed Apr 29, 2024
1 parent 04dddec commit c8e1ef8
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2007,6 +2007,7 @@ public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final
case ResponseCode.TOPIC_NOT_EXIST: {
if (allowTopicNotExist) {
log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
return null;
}

break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@
import org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.AddWritePermOfBrokerResponseHeader;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.protocol.route.QueueData;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.assertj.core.api.Assertions;
import org.junit.Before;
Expand All @@ -89,13 +91,16 @@
import org.mockito.stubbing.Answer;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.when;

@RunWith(MockitoJUnitRunner.class)
public class MQClientAPIImplTest {
Expand Down Expand Up @@ -987,4 +992,58 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
int topicCnt = mqClientAPI.addWritePermOfBroker("127.0.0.1", "default-broker", 1000);
assertThat(topicCnt).isEqualTo(7);
}

@Test
public void testGetTopicRouteInfoFromNameServer_shouldReturnNull_whenTopicNotExistAndAllow() throws Exception {
when(remotingClient.invokeSync(isNull(), any(RemotingCommand.class), anyLong()))
.thenReturn(RemotingCommand.createResponseCommand(ResponseCode.TOPIC_NOT_EXIST, "topic not exist"));

assertThat(mqClientAPI.getTopicRouteInfoFromNameServer("topic", 1000, true)).isNull();
}

@Test
public void testGetTopicRouteInfoFromNameServer_shouldThrowException_whenTopicNotExistAndNotAllow() throws Exception {
when(remotingClient.invokeSync(isNull(), any(RemotingCommand.class), anyLong()))
.thenReturn(RemotingCommand.createResponseCommand(ResponseCode.TOPIC_NOT_EXIST, "topic not exist"));

assertThatThrownBy(() -> mqClientAPI.getTopicRouteInfoFromNameServer("topic", 1000, false))
.isInstanceOf(MQClientException.class)
.hasMessageContaining("topic not exist");
}

@Test
public void testGetTopicRouteInfoFromNameServer_shouldThrowException_whenErrorResponse() throws Exception {
when(remotingClient.invokeSync(isNull(), any(RemotingCommand.class), anyLong()))
.thenReturn(RemotingCommand.createResponseCommand(ResponseCode.SYSTEM_ERROR, "timeout"));

assertThatThrownBy(() -> mqClientAPI.getTopicRouteInfoFromNameServer("topic", 1000, false))
.isInstanceOf(MQClientException.class)
.hasMessageContaining("timeout");
}

@Test
public void testGetTopicRouteInfoFromNameServer_shouldReturnTopicRouteData_whenSuccess() throws Exception {
when(remotingClient.invokeSync(isNull(), any(RemotingCommand.class), anyLong()))
.thenReturn(createTopicRouteInfoResponse());

TopicRouteData topicRouteData = mqClientAPI.getTopicRouteInfoFromNameServer("topic", 1000, false);
assertThat(topicRouteData).isNotNull();
assertThat(topicRouteData.getQueueDatas()).hasSize(1);
}

private RemotingCommand createTopicRouteInfoResponse() {
RemotingCommand response = RemotingCommand.createResponseCommand(null);
response.setCode(ResponseCode.SUCCESS);
response.setBody(new byte[0]);
TopicRouteData topicRouteData = new TopicRouteData();
QueueData queueData = new QueueData();
queueData.setBrokerName("broker1");
queueData.setPerm(6);
queueData.setReadQueueNums(8);
queueData.setWriteQueueNums(8);
queueData.setTopicSysFlag(0);
topicRouteData.setQueueDatas(Collections.singletonList(queueData));
response.setBody(topicRouteData.encode());
return response;
}
}

0 comments on commit c8e1ef8

Please sign in to comment.