Skip to content

Commit

Permalink
[TUBEMQ-392] add query rest api for clusters (#307)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuanboliu committed Nov 5, 2020
1 parent 714ea2b commit 9e1ea11
Show file tree
Hide file tree
Showing 7 changed files with 255 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class TubeMQManager {
@Value("${manager.async.thread.prefix:AsyncThread-}")
private String threadPrefix;

public static void main(String[] args) throws Exception {
public static void main(String[] args) {
SpringApplication.run(TubeMQManager.class);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@
*/

package org.apache.tubemq.manager.controller;
import javax.servlet.http.HttpServletRequest;
import org.apache.tubemq.manager.controller.topic.TopicResult;
import org.apache.tubemq.manager.exceptions.TubeMQManagerException;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.RestControllerAdvice;

Expand All @@ -29,18 +26,16 @@
public class ManagerControllerAdvice {

/**
* handling business TubeMQManagerException, and return json format string.
* handling exception, and return json format string.
*
* @param request - http request
* @param ex - exception
* @return entity
* @param ex
* @return
*/
@ExceptionHandler(TubeMQManagerException.class)
public TopicResult handlingBusinessException(HttpServletRequest request,
TubeMQManagerException ex) {
TopicResult result = new TopicResult();
result.setMessage(ex.getMessage());
result.setCode(-1);
@ExceptionHandler(Exception.class)
public TubeResult handlingParameterException(Exception ex) {
TubeResult result = new TubeResult();
result.setErrMsg(ex.getClass().getName() + " " + ex.getMessage());
result.setErrCode(-1);
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tubemq.manager.controller.topic;

package org.apache.tubemq.manager.controller;

import lombok.Data;

/**
* rest result for business controller
*/
@Data
public class TopicResult {
private String message;
private int code = 0;
public class TubeResult {
private String errMsg;
private int errCode = 0;
private boolean result = true;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tubemq.manager.controller.cluster;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.tubemq.manager.service.TubeHttpConst.SCHEMA;

import com.google.gson.Gson;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.tubemq.manager.controller.TubeResult;
import org.apache.tubemq.manager.entry.NodeEntry;
import org.apache.tubemq.manager.repository.NodeRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping(path = "/v1/cluster")
@Slf4j
public class ClusterController {

private final CloseableHttpClient httpclient = HttpClients.createDefault();
private final Gson gson = new Gson();

private static final String TUBE_REQUEST_PATH = "webapi.htm";

@Autowired
private NodeRepository nodeRepository;


private String covertMapToQueryString(Map<String, String> requestMap) throws Exception {
List<String> queryList = new ArrayList<>();

for (Map.Entry<String, String> entry : requestMap.entrySet()) {
queryList.add(entry.getKey() + "=" + URLEncoder.encode(
entry.getValue(), UTF_8.toString()));
}
return StringUtils.join(queryList, "&");
}

private String queryMaster(String url) {
log.info("start to request {}", url);
HttpGet httpGet = new HttpGet(url);
TubeResult defaultResult = new TubeResult();
try (CloseableHttpResponse response = httpclient.execute(httpGet)) {
// return result json to response
return EntityUtils.toString(response.getEntity());
} catch (Exception ex) {
log.error("exception caught while requesting broker status", ex);
defaultResult.setErrCode(-1);
defaultResult.setResult(false);
defaultResult.setErrMsg(ex.getMessage());
}
return gson.toJson(defaultResult);
}

@RequestMapping(value = "/query", method = RequestMethod.GET,
produces = MediaType.APPLICATION_JSON_VALUE)
public @ResponseBody String queryInfo(
@RequestParam Map<String, String> queryBody) throws Exception {
int clusterId = Integer.parseInt(queryBody.get("clusterId"));
queryBody.remove("clusterId");
NodeEntry nodeEntry =
nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(clusterId);
String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
+ "/" + TUBE_REQUEST_PATH + "?" + covertMapToQueryString(queryBody);
return queryMaster(url);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.tubemq.manager.controller.TubeResult;
import org.apache.tubemq.manager.entry.TopicEntry;
import org.apache.tubemq.manager.entry.TopicStatus;
import org.apache.tubemq.manager.exceptions.TubeMQManagerException;
Expand Down Expand Up @@ -53,7 +54,7 @@ public class TopicController {
* @throws Exception - exception
*/
@PostMapping("/add")
public TopicResult addTopic(@RequestBody TopicEntry entry) {
public TubeResult addTopic(@RequestBody TopicEntry entry) {
// entry in adding status
entry.setStatus(TopicStatus.ADDING.value());
topicRepository.saveAndFlush(entry);
Expand All @@ -68,7 +69,7 @@ public TopicResult addTopic(@RequestBody TopicEntry entry) {
}
topicRepository.saveAndFlush(entry1);
});
return new TopicResult();
return new TubeResult();
}

/**
Expand All @@ -78,8 +79,8 @@ public TopicResult addTopic(@RequestBody TopicEntry entry) {
* @throws Exception
*/
@PostMapping("/update")
public TopicResult updateTopic(@RequestBody TopicEntry entry) {
return new TopicResult();
public TubeResult updateTopic(@RequestBody TopicEntry entry) {
return new TubeResult();
}

/**
Expand All @@ -89,10 +90,10 @@ public TopicResult updateTopic(@RequestBody TopicEntry entry) {
* @throws Exception
*/
@GetMapping("/check")
public TopicResult checkTopicByBusinessName(
public TubeResult checkTopicByBusinessName(
@RequestParam String businessName) {
List<TopicEntry> result = topicRepository.findAllByBusinessName(businessName);
return new TopicResult();
return new TubeResult();
}

/**
Expand All @@ -103,13 +104,13 @@ public TopicResult checkTopicByBusinessName(
* @throws Exception
*/
@GetMapping("/get/{id}")
public TopicResult getBusinessByID(
public TubeResult getBusinessByID(
@PathVariable Long id) {
Optional<TopicEntry> businessEntry = topicRepository.findById(id);
TopicResult result = new TopicResult();
TubeResult result = new TubeResult();
if (!businessEntry.isPresent()) {
result.setCode(-1);
result.setMessage("business not found");
result.setErrCode(-1);
result.setErrMsg("business not found");
}
return result;
}
Expand All @@ -119,7 +120,7 @@ public TopicResult getBusinessByID(
* @return
*/
@GetMapping("/throwException")
public TopicResult throwException() {
public TubeResult throwException() {
throw new TubeMQManagerException("exception for test");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
import org.apache.tubemq.manager.controller.topic.TopicController;
import org.apache.tubemq.manager.controller.topic.TopicResult;
import org.apache.tubemq.manager.entry.TopicEntry;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -40,6 +39,7 @@
import org.springframework.test.web.servlet.setup.MockMvcBuilders;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertTrue;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;

Expand Down Expand Up @@ -82,18 +82,18 @@ public void testAddBusiness() throws Exception {
HttpHeaders headers = new HttpHeaders();
HttpEntity<TopicEntry> request = new HttpEntity<>(entry, headers);

ResponseEntity<TopicResult> responseEntity =
client.postForEntity(uri, request, TopicResult.class);
ResponseEntity<TubeResult> responseEntity =
client.postForEntity(uri, request, TubeResult.class);
assertThat(responseEntity.getStatusCode().is2xxSuccessful()).isEqualTo(true);
}

@Test
public void testControllerException() throws Exception {
final String baseUrl = "http://localhost:" + randomServerPort + "/business/throwException";
URI uri = new URI(baseUrl);
ResponseEntity<TopicResult> responseEntity =
client.getForEntity(uri, TopicResult.class);
assertThat(Objects.requireNonNull(responseEntity.getBody()).getCode()).isEqualTo(-1);
assertThat(responseEntity.getBody().getMessage()).isEqualTo("exception for test");
ResponseEntity<TubeResult> responseEntity =
client.getForEntity(uri, TubeResult.class);
assertThat(Objects.requireNonNull(responseEntity.getBody()).getErrCode()).isEqualTo(-1);
assertTrue(responseEntity.getBody().getErrMsg().contains("exception for test"));
}
}
Loading

0 comments on commit 9e1ea11

Please sign in to comment.