From c8e1ef8a6dd993efff4d56813f649df381330c47 Mon Sep 17 00:00:00 2001 From: ScarbWin Date: Tue, 16 Apr 2024 23:05:55 +0800 Subject: [PATCH] [ISSUE #8025] fix: fix topic route when topic deleted --- .../rocketmq/client/impl/MQClientAPIImpl.java | 1 + .../client/impl/MQClientAPIImplTest.java | 59 +++++++++++++++++++ 2 files changed, 60 insertions(+) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 12d305b612e..56f5726c259 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -2007,6 +2007,7 @@ public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final case ResponseCode.TOPIC_NOT_EXIST: { if (allowTopicNotExist) { log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic); + return null; } break; diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java index 08e7fbe09a8..ac44280fbb6 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java @@ -77,6 +77,8 @@ import org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetResponseHeader; import org.apache.rocketmq.remoting.protocol.header.namesrv.AddWritePermOfBrokerResponseHeader; import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.remoting.protocol.route.QueueData; +import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; import org.assertj.core.api.Assertions; import org.junit.Before; @@ -89,13 +91,16 @@ import org.mockito.stubbing.Answer; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) public class MQClientAPIImplTest { @@ -987,4 +992,58 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { int topicCnt = mqClientAPI.addWritePermOfBroker("127.0.0.1", "default-broker", 1000); assertThat(topicCnt).isEqualTo(7); } + + @Test + public void testGetTopicRouteInfoFromNameServer_shouldReturnNull_whenTopicNotExistAndAllow() throws Exception { + when(remotingClient.invokeSync(isNull(), any(RemotingCommand.class), anyLong())) + .thenReturn(RemotingCommand.createResponseCommand(ResponseCode.TOPIC_NOT_EXIST, "topic not exist")); + + assertThat(mqClientAPI.getTopicRouteInfoFromNameServer("topic", 1000, true)).isNull(); + } + + @Test + public void testGetTopicRouteInfoFromNameServer_shouldThrowException_whenTopicNotExistAndNotAllow() throws Exception { + when(remotingClient.invokeSync(isNull(), any(RemotingCommand.class), anyLong())) + .thenReturn(RemotingCommand.createResponseCommand(ResponseCode.TOPIC_NOT_EXIST, "topic not exist")); + + assertThatThrownBy(() -> mqClientAPI.getTopicRouteInfoFromNameServer("topic", 1000, false)) + .isInstanceOf(MQClientException.class) + .hasMessageContaining("topic not exist"); + } + + @Test + public void testGetTopicRouteInfoFromNameServer_shouldThrowException_whenErrorResponse() throws Exception { + when(remotingClient.invokeSync(isNull(), any(RemotingCommand.class), anyLong())) + .thenReturn(RemotingCommand.createResponseCommand(ResponseCode.SYSTEM_ERROR, "timeout")); + + assertThatThrownBy(() -> mqClientAPI.getTopicRouteInfoFromNameServer("topic", 1000, false)) + .isInstanceOf(MQClientException.class) + .hasMessageContaining("timeout"); + } + + @Test + public void testGetTopicRouteInfoFromNameServer_shouldReturnTopicRouteData_whenSuccess() throws Exception { + when(remotingClient.invokeSync(isNull(), any(RemotingCommand.class), anyLong())) + .thenReturn(createTopicRouteInfoResponse()); + + TopicRouteData topicRouteData = mqClientAPI.getTopicRouteInfoFromNameServer("topic", 1000, false); + assertThat(topicRouteData).isNotNull(); + assertThat(topicRouteData.getQueueDatas()).hasSize(1); + } + + private RemotingCommand createTopicRouteInfoResponse() { + RemotingCommand response = RemotingCommand.createResponseCommand(null); + response.setCode(ResponseCode.SUCCESS); + response.setBody(new byte[0]); + TopicRouteData topicRouteData = new TopicRouteData(); + QueueData queueData = new QueueData(); + queueData.setBrokerName("broker1"); + queueData.setPerm(6); + queueData.setReadQueueNums(8); + queueData.setWriteQueueNums(8); + queueData.setTopicSysFlag(0); + topicRouteData.setQueueDatas(Collections.singletonList(queueData)); + response.setBody(topicRouteData.encode()); + return response; + } }