Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TUBEMQ-392] add query rest api for clusters #307

Merged
merged 1 commit into from
Nov 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class TubeMQManager {
@Value("${manager.async.thread.prefix:AsyncThread-}")
private String threadPrefix;

public static void main(String[] args) throws Exception {
public static void main(String[] args) {
SpringApplication.run(TubeMQManager.class);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@
*/

package org.apache.tubemq.manager.controller;
import javax.servlet.http.HttpServletRequest;
import org.apache.tubemq.manager.controller.topic.TopicResult;
import org.apache.tubemq.manager.exceptions.TubeMQManagerException;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.RestControllerAdvice;

Expand All @@ -29,18 +26,16 @@
public class ManagerControllerAdvice {

/**
* handling business TubeMQManagerException, and return json format string.
* handling exception, and return json format string.
*
* @param request - http request
* @param ex - exception
* @return entity
* @param ex
* @return
*/
@ExceptionHandler(TubeMQManagerException.class)
public TopicResult handlingBusinessException(HttpServletRequest request,
TubeMQManagerException ex) {
TopicResult result = new TopicResult();
result.setMessage(ex.getMessage());
result.setCode(-1);
@ExceptionHandler(Exception.class)
public TubeResult handlingParameterException(Exception ex) {
TubeResult result = new TubeResult();
result.setErrMsg(ex.getClass().getName() + " " + ex.getMessage());
result.setErrCode(-1);
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tubemq.manager.controller.topic;

package org.apache.tubemq.manager.controller;

import lombok.Data;

/**
* rest result for business controller
*/
@Data
public class TopicResult {
private String message;
private int code = 0;
public class TubeResult {
private String errMsg;
private int errCode = 0;
private boolean result = true;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tubemq.manager.controller.cluster;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.tubemq.manager.service.TubeHttpConst.SCHEMA;

import com.google.gson.Gson;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.tubemq.manager.controller.TubeResult;
import org.apache.tubemq.manager.entry.NodeEntry;
import org.apache.tubemq.manager.repository.NodeRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping(path = "/v1/cluster")
@Slf4j
public class ClusterController {

private final CloseableHttpClient httpclient = HttpClients.createDefault();
private final Gson gson = new Gson();

private static final String TUBE_REQUEST_PATH = "webapi.htm";

@Autowired
private NodeRepository nodeRepository;


private String covertMapToQueryString(Map<String, String> requestMap) throws Exception {
List<String> queryList = new ArrayList<>();

for (Map.Entry<String, String> entry : requestMap.entrySet()) {
queryList.add(entry.getKey() + "=" + URLEncoder.encode(
entry.getValue(), UTF_8.toString()));
}
return StringUtils.join(queryList, "&");
}

private String queryMaster(String url) {
log.info("start to request {}", url);
HttpGet httpGet = new HttpGet(url);
TubeResult defaultResult = new TubeResult();
try (CloseableHttpResponse response = httpclient.execute(httpGet)) {
// return result json to response
return EntityUtils.toString(response.getEntity());
} catch (Exception ex) {
log.error("exception caught while requesting broker status", ex);
defaultResult.setErrCode(-1);
defaultResult.setResult(false);
defaultResult.setErrMsg(ex.getMessage());
}
return gson.toJson(defaultResult);
}

@RequestMapping(value = "/query", method = RequestMethod.GET,
produces = MediaType.APPLICATION_JSON_VALUE)
public @ResponseBody String queryInfo(
@RequestParam Map<String, String> queryBody) throws Exception {
int clusterId = Integer.parseInt(queryBody.get("clusterId"));
queryBody.remove("clusterId");
NodeEntry nodeEntry =
nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(clusterId);
String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
+ "/" + TUBE_REQUEST_PATH + "?" + covertMapToQueryString(queryBody);
return queryMaster(url);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.tubemq.manager.controller.TubeResult;
import org.apache.tubemq.manager.entry.TopicEntry;
import org.apache.tubemq.manager.entry.TopicStatus;
import org.apache.tubemq.manager.exceptions.TubeMQManagerException;
Expand Down Expand Up @@ -53,7 +54,7 @@ public class TopicController {
* @throws Exception - exception
*/
@PostMapping("/add")
public TopicResult addTopic(@RequestBody TopicEntry entry) {
public TubeResult addTopic(@RequestBody TopicEntry entry) {
// entry in adding status
entry.setStatus(TopicStatus.ADDING.value());
topicRepository.saveAndFlush(entry);
Expand All @@ -68,7 +69,7 @@ public TopicResult addTopic(@RequestBody TopicEntry entry) {
}
topicRepository.saveAndFlush(entry1);
});
return new TopicResult();
return new TubeResult();
}

/**
Expand All @@ -78,8 +79,8 @@ public TopicResult addTopic(@RequestBody TopicEntry entry) {
* @throws Exception
*/
@PostMapping("/update")
public TopicResult updateTopic(@RequestBody TopicEntry entry) {
return new TopicResult();
public TubeResult updateTopic(@RequestBody TopicEntry entry) {
return new TubeResult();
}

/**
Expand All @@ -89,10 +90,10 @@ public TopicResult updateTopic(@RequestBody TopicEntry entry) {
* @throws Exception
*/
@GetMapping("/check")
public TopicResult checkTopicByBusinessName(
public TubeResult checkTopicByBusinessName(
@RequestParam String businessName) {
List<TopicEntry> result = topicRepository.findAllByBusinessName(businessName);
return new TopicResult();
return new TubeResult();
}

/**
Expand All @@ -103,13 +104,13 @@ public TopicResult checkTopicByBusinessName(
* @throws Exception
*/
@GetMapping("/get/{id}")
public TopicResult getBusinessByID(
public TubeResult getBusinessByID(
@PathVariable Long id) {
Optional<TopicEntry> businessEntry = topicRepository.findById(id);
TopicResult result = new TopicResult();
TubeResult result = new TubeResult();
if (!businessEntry.isPresent()) {
result.setCode(-1);
result.setMessage("business not found");
result.setErrCode(-1);
result.setErrMsg("business not found");
}
return result;
}
Expand All @@ -119,7 +120,7 @@ public TopicResult getBusinessByID(
* @return
*/
@GetMapping("/throwException")
public TopicResult throwException() {
public TubeResult throwException() {
throw new TubeMQManagerException("exception for test");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
import org.apache.tubemq.manager.controller.topic.TopicController;
import org.apache.tubemq.manager.controller.topic.TopicResult;
import org.apache.tubemq.manager.entry.TopicEntry;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -40,6 +39,7 @@
import org.springframework.test.web.servlet.setup.MockMvcBuilders;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertTrue;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;

Expand Down Expand Up @@ -82,18 +82,18 @@ public void testAddBusiness() throws Exception {
HttpHeaders headers = new HttpHeaders();
HttpEntity<TopicEntry> request = new HttpEntity<>(entry, headers);

ResponseEntity<TopicResult> responseEntity =
client.postForEntity(uri, request, TopicResult.class);
ResponseEntity<TubeResult> responseEntity =
client.postForEntity(uri, request, TubeResult.class);
assertThat(responseEntity.getStatusCode().is2xxSuccessful()).isEqualTo(true);
}

@Test
public void testControllerException() throws Exception {
final String baseUrl = "http://localhost:" + randomServerPort + "/business/throwException";
URI uri = new URI(baseUrl);
ResponseEntity<TopicResult> responseEntity =
client.getForEntity(uri, TopicResult.class);
assertThat(Objects.requireNonNull(responseEntity.getBody()).getCode()).isEqualTo(-1);
assertThat(responseEntity.getBody().getMessage()).isEqualTo("exception for test");
ResponseEntity<TubeResult> responseEntity =
client.getForEntity(uri, TubeResult.class);
assertThat(Objects.requireNonNull(responseEntity.getBody()).getErrCode()).isEqualTo(-1);
assertTrue(responseEntity.getBody().getErrMsg().contains("exception for test"));
}
}
Loading