diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/TubeMQManager.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/TubeMQManager.java index a25897beb8b..114c0bce3f8 100644 --- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/TubeMQManager.java +++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/TubeMQManager.java @@ -42,7 +42,7 @@ public class TubeMQManager { @Value("${manager.async.thread.prefix:AsyncThread-}") private String threadPrefix; - public static void main(String[] args) throws Exception { + public static void main(String[] args) { SpringApplication.run(TubeMQManager.class); } diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/ManagerControllerAdvice.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/ManagerControllerAdvice.java index 09a72cdcdfc..505383433df 100644 --- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/ManagerControllerAdvice.java +++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/ManagerControllerAdvice.java @@ -16,9 +16,6 @@ */ package org.apache.tubemq.manager.controller; -import javax.servlet.http.HttpServletRequest; -import org.apache.tubemq.manager.controller.topic.TopicResult; -import org.apache.tubemq.manager.exceptions.TubeMQManagerException; import org.springframework.web.bind.annotation.ExceptionHandler; import org.springframework.web.bind.annotation.RestControllerAdvice; @@ -29,18 +26,16 @@ public class ManagerControllerAdvice { /** - * handling business TubeMQManagerException, and return json format string. + * handling exception, and return json format string. * - * @param request - http request - * @param ex - exception - * @return entity + * @param ex + * @return */ - @ExceptionHandler(TubeMQManagerException.class) - public TopicResult handlingBusinessException(HttpServletRequest request, - TubeMQManagerException ex) { - TopicResult result = new TopicResult(); - result.setMessage(ex.getMessage()); - result.setCode(-1); + @ExceptionHandler(Exception.class) + public TubeResult handlingParameterException(Exception ex) { + TubeResult result = new TubeResult(); + result.setErrMsg(ex.getClass().getName() + " " + ex.getMessage()); + result.setErrCode(-1); return result; } } diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicResult.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/TubeResult.java similarity index 82% rename from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicResult.java rename to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/TubeResult.java index 98fb81e9b63..144d9751b4d 100644 --- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicResult.java +++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/TubeResult.java @@ -14,15 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.tubemq.manager.controller.topic; + +package org.apache.tubemq.manager.controller; import lombok.Data; -/** - * rest result for business controller - */ @Data -public class TopicResult { - private String message; - private int code = 0; +public class TubeResult { + private String errMsg; + private int errCode = 0; + private boolean result = true; } diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java new file mode 100644 index 00000000000..599ce15e4cf --- /dev/null +++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tubemq.manager.controller.cluster; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.tubemq.manager.service.TubeHttpConst.SCHEMA; + +import com.google.gson.Gson; +import java.net.URLEncoder; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.apache.tubemq.manager.controller.TubeResult; +import org.apache.tubemq.manager.entry.NodeEntry; +import org.apache.tubemq.manager.repository.NodeRepository; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.ResponseBody; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@RequestMapping(path = "/v1/cluster") +@Slf4j +public class ClusterController { + + private final CloseableHttpClient httpclient = HttpClients.createDefault(); + private final Gson gson = new Gson(); + + private static final String TUBE_REQUEST_PATH = "webapi.htm"; + + @Autowired + private NodeRepository nodeRepository; + + + private String covertMapToQueryString(Map requestMap) throws Exception { + List queryList = new ArrayList<>(); + + for (Map.Entry entry : requestMap.entrySet()) { + queryList.add(entry.getKey() + "=" + URLEncoder.encode( + entry.getValue(), UTF_8.toString())); + } + return StringUtils.join(queryList, "&"); + } + + private String queryMaster(String url) { + log.info("start to request {}", url); + HttpGet httpGet = new HttpGet(url); + TubeResult defaultResult = new TubeResult(); + try (CloseableHttpResponse response = httpclient.execute(httpGet)) { + // return result json to response + return EntityUtils.toString(response.getEntity()); + } catch (Exception ex) { + log.error("exception caught while requesting broker status", ex); + defaultResult.setErrCode(-1); + defaultResult.setResult(false); + defaultResult.setErrMsg(ex.getMessage()); + } + return gson.toJson(defaultResult); + } + + @RequestMapping(value = "/query", method = RequestMethod.GET, + produces = MediaType.APPLICATION_JSON_VALUE) + public @ResponseBody String queryInfo( + @RequestParam Map queryBody) throws Exception { + int clusterId = Integer.parseInt(queryBody.get("clusterId")); + queryBody.remove("clusterId"); + NodeEntry nodeEntry = + nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(clusterId); + String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort() + + "/" + TUBE_REQUEST_PATH + "?" + covertMapToQueryString(queryBody); + return queryMaster(url); + } + + +} diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicController.java index 314d0799285..fdeac4ea505 100644 --- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicController.java +++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicController.java @@ -20,6 +20,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; +import org.apache.tubemq.manager.controller.TubeResult; import org.apache.tubemq.manager.entry.TopicEntry; import org.apache.tubemq.manager.entry.TopicStatus; import org.apache.tubemq.manager.exceptions.TubeMQManagerException; @@ -53,7 +54,7 @@ public class TopicController { * @throws Exception - exception */ @PostMapping("/add") - public TopicResult addTopic(@RequestBody TopicEntry entry) { + public TubeResult addTopic(@RequestBody TopicEntry entry) { // entry in adding status entry.setStatus(TopicStatus.ADDING.value()); topicRepository.saveAndFlush(entry); @@ -68,7 +69,7 @@ public TopicResult addTopic(@RequestBody TopicEntry entry) { } topicRepository.saveAndFlush(entry1); }); - return new TopicResult(); + return new TubeResult(); } /** @@ -78,8 +79,8 @@ public TopicResult addTopic(@RequestBody TopicEntry entry) { * @throws Exception */ @PostMapping("/update") - public TopicResult updateTopic(@RequestBody TopicEntry entry) { - return new TopicResult(); + public TubeResult updateTopic(@RequestBody TopicEntry entry) { + return new TubeResult(); } /** @@ -89,10 +90,10 @@ public TopicResult updateTopic(@RequestBody TopicEntry entry) { * @throws Exception */ @GetMapping("/check") - public TopicResult checkTopicByBusinessName( + public TubeResult checkTopicByBusinessName( @RequestParam String businessName) { List result = topicRepository.findAllByBusinessName(businessName); - return new TopicResult(); + return new TubeResult(); } /** @@ -103,13 +104,13 @@ public TopicResult checkTopicByBusinessName( * @throws Exception */ @GetMapping("/get/{id}") - public TopicResult getBusinessByID( + public TubeResult getBusinessByID( @PathVariable Long id) { Optional businessEntry = topicRepository.findById(id); - TopicResult result = new TopicResult(); + TubeResult result = new TubeResult(); if (!businessEntry.isPresent()) { - result.setCode(-1); - result.setMessage("business not found"); + result.setErrCode(-1); + result.setErrMsg("business not found"); } return result; } @@ -119,7 +120,7 @@ public TopicResult getBusinessByID( * @return */ @GetMapping("/throwException") - public TopicResult throwException() { + public TubeResult throwException() { throw new TubeMQManagerException("exception for test"); } } diff --git a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java index 0838203085c..9a497cf3f7b 100644 --- a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java +++ b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java @@ -20,7 +20,6 @@ import java.util.Objects; import lombok.extern.slf4j.Slf4j; import org.apache.tubemq.manager.controller.topic.TopicController; -import org.apache.tubemq.manager.controller.topic.TopicResult; import org.apache.tubemq.manager.entry.TopicEntry; import org.junit.Before; import org.junit.Test; @@ -40,6 +39,7 @@ import org.springframework.test.web.servlet.setup.MockMvcBuilders; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertTrue; import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; @@ -82,8 +82,8 @@ public void testAddBusiness() throws Exception { HttpHeaders headers = new HttpHeaders(); HttpEntity request = new HttpEntity<>(entry, headers); - ResponseEntity responseEntity = - client.postForEntity(uri, request, TopicResult.class); + ResponseEntity responseEntity = + client.postForEntity(uri, request, TubeResult.class); assertThat(responseEntity.getStatusCode().is2xxSuccessful()).isEqualTo(true); } @@ -91,9 +91,9 @@ public void testAddBusiness() throws Exception { public void testControllerException() throws Exception { final String baseUrl = "http://localhost:" + randomServerPort + "/business/throwException"; URI uri = new URI(baseUrl); - ResponseEntity responseEntity = - client.getForEntity(uri, TopicResult.class); - assertThat(Objects.requireNonNull(responseEntity.getBody()).getCode()).isEqualTo(-1); - assertThat(responseEntity.getBody().getMessage()).isEqualTo("exception for test"); + ResponseEntity responseEntity = + client.getForEntity(uri, TubeResult.class); + assertThat(Objects.requireNonNull(responseEntity.getBody()).getErrCode()).isEqualTo(-1); + assertTrue(responseEntity.getBody().getErrMsg().contains("exception for test")); } } diff --git a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestClusterController.java b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestClusterController.java new file mode 100644 index 00000000000..c31fb8d415f --- /dev/null +++ b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestClusterController.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tubemq.manager.controller; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; + +import com.google.gson.Gson; +import lombok.extern.slf4j.Slf4j; +import org.apache.tubemq.manager.controller.cluster.ClusterController; +import org.apache.tubemq.manager.entry.NodeEntry; +import org.apache.tubemq.manager.repository.NodeRepository; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.test.web.servlet.MockMvc; +import org.springframework.test.web.servlet.MvcResult; +import org.springframework.test.web.servlet.RequestBuilder; + +@Slf4j +@RunWith(SpringRunner.class) +@SpringBootTest +@AutoConfigureMockMvc +public class TestClusterController { + + private final Gson gson = new Gson(); + + @MockBean + private NodeRepository nodeRepository; + + @InjectMocks + private ClusterController clusterController; + + @Autowired + private MockMvc mockMvc; + + private NodeEntry getNodeEntry() { + NodeEntry nodeEntry = new NodeEntry(); + nodeEntry.setMaster(true); + nodeEntry.setIp("10.215.128.83"); + nodeEntry.setWebPort(8080); + return nodeEntry; + } + + @Test + public void testExceptionQuery() throws Exception { + NodeEntry nodeEntry = getNodeEntry(); + when(nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(any(Integer.class))) + .thenReturn(nodeEntry); + RequestBuilder request = get( + "/v1/cluster/query?method=admin_query_topic_info&type=op_query"); + MvcResult result = mockMvc.perform(request).andReturn(); + String resultStr = result.getResponse().getContentAsString(); + TubeResult clusterResult = gson.fromJson(resultStr, TubeResult.class); + Assert.assertEquals(-1, clusterResult.getErrCode()); + Assert.assertTrue(clusterResult.getErrMsg().contains("NumberFormatException")); + } + + @Test + public void testTopicQuery() throws Exception { + NodeEntry nodeEntry = getNodeEntry(); + when(nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(any(Integer.class))) + .thenReturn(nodeEntry); + RequestBuilder request = get( + "/v1/cluster/query?method=admin_query_topic_info&type=op_query&clusterId=1"); + MvcResult result = mockMvc.perform(request).andReturn(); + String resultStr = result.getResponse().getContentAsString(); + log.info("result json string is {}, response type is {}", resultStr, + result.getResponse().getContentType()); + } + + @Test + public void testBrokerQuery() throws Exception { + NodeEntry nodeEntry = getNodeEntry(); + when(nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(any(Integer.class))) + .thenReturn(nodeEntry); + RequestBuilder request = get( + "/v1/cluster/query?method=admin_query_broker_run_status&type=op_query&clusterId=1&brokerIp="); + MvcResult result = mockMvc.perform(request).andReturn(); + String resultStr = result.getResponse().getContentAsString(); + log.info("result json string is {}, response type is {}", resultStr, + result.getResponse().getContentType()); + } + + @Test + public void testTopicAndGroupQuery() throws Exception { + NodeEntry nodeEntry = getNodeEntry(); + when(nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(any(Integer.class))) + .thenReturn(nodeEntry); + RequestBuilder request = get( + "/v1/cluster/query?method=admin_query_sub_info&type=op_query&clusterId=1&topicName=test&groupName=test"); + MvcResult result = mockMvc.perform(request).andReturn(); + String resultStr = result.getResponse().getContentAsString(); + log.info("result json string is {}, response type is {}", resultStr, + result.getResponse().getContentType()); + } + + +}