Skip to content

Commit

Permalink
[TUBEMQ-447] Add Broker-Admin Cli (apache#347)
Browse files Browse the repository at this point in the history
Co-authored-by: gosonzhang <gosonzhang@tencent.com>
  • Loading branch information
2 people authored and EMsnap committed Jan 7, 2021
1 parent fc28100 commit ab85c22
Show file tree
Hide file tree
Showing 14 changed files with 355 additions and 19 deletions.
40 changes: 40 additions & 0 deletions bin/tubemq-broker-admin.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#!/bin/bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

if [ -z "$BASE_DIR" ] ; then
PRG="$0"

# need this for relative symlinks
while [ -h "$PRG" ] ; do
ls=`ls -ld "$PRG"`
link=`expr "$ls" : '.*-> \(.*\)$'`
if expr "$link" : '/.*' > /dev/null; then
PRG="$link"
else
PRG="`dirname "$PRG"`/$link"
fi
done
BASE_DIR=`dirname "$PRG"`/..

# make it fully qualified
BASE_DIR=`cd "$BASE_DIR" && pwd`
#echo "TubeMQ master is at $BASE_DIR"
fi
source $BASE_DIR/bin/env.sh
$JAVA $TOOLS_ARGS org.apache.tubemq.server.tools.cli.CliBrokerAdmin $@
15 changes: 15 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,11 @@
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.2</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
Expand Down Expand Up @@ -435,6 +440,16 @@
<artifactId>velocity</artifactId>
<version>1.7</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>9.4.31.v20200723</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<version>9.4.31.v20200723</version>
</dependency>
<dependency>
<groupId>org.ini4j</groupId>
<artifactId>ini4j</artifactId>
Expand Down
6 changes: 4 additions & 2 deletions tubemq-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -184,15 +184,17 @@
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>9.4.31.v20200723</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<version>9.4.31.v20200723</version>
</dependency>
<dependency>
<groupId>junit</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.tubemq.server.broker.web;

import static org.apache.tubemq.server.common.webbase.WebMethodMapper.getRegisteredWebMethod;
import static org.apache.tubemq.server.common.webbase.WebMethodMapper.getWebApiRegInfo;
import static org.apache.tubemq.server.common.webbase.WebMethodMapper.registerWebMethod;
import java.io.IOException;
import java.util.List;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
Expand All @@ -42,6 +44,10 @@ protected void doGet(HttpServletRequest req,
doPost(req, resp);
}

public List<String> getSupportedMethod() {
return getRegisteredWebMethod();
}

@Override
protected void doPost(HttpServletRequest req,
HttpServletResponse resp) throws IOException {
Expand All @@ -56,7 +62,7 @@ protected void doPost(HttpServletRequest req,
WebApiRegInfo webApiRegInfo = getWebApiRegInfo(true, method);
if (webApiRegInfo == null) {
strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
.append("Unsupported method ").append(method).append("}");
.append("Unsupported method ").append(method).append("\"}");
} else {
strBuffer = (StringBuilder) webApiRegInfo.method.invoke(webApiRegInfo.webHandler, req);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,25 @@ public void registerWebApiMethod() {
// manual set offset
innRegisterWebMethod("admin_manual_set_current_offset",
"adminManualSetCurrentOffSet");
// get all registered methods
innRegisterWebMethod("admin_get_methods",
"adminQueryAllMethods");
}

public StringBuilder adminQueryAllMethods(HttpServletRequest req) throws Exception {
int index = 0;
List<String> methods = getSupportedMethod();
StringBuilder sBuilder = new StringBuilder(1024);
sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",\"dataSet\":[");
for (index = 0; index < methods.size(); index++) {
if (index > 0) {
sBuilder.append(",");
}
sBuilder.append("{\"id\":").append(index + 1)
.append(",\"method\":\"").append(methods.get(index)).append("\"}");
}
sBuilder.append("],\"totalCnt\":").append(index + 1).append("}");
return sBuilder;
}

/***
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ public enum CliArgDef {
MASTERSERVER(null, "master-servers",
"String: format is master1_ip:port[,master2_ip:port]",
"The master address(es) to connect to."),
MASTERURL(null, "master-url",
"String: format is http://master_ip:master_webport/",
"Master Service URL to which to connect.(default: http://localhost:8080/)"),
BROKERURL(null, "broker-url",
"String: format is http://broker_ip:broker_webport/",
"Broker Service URL to which to connect.(default: http://localhost:8081/)"),
MASTERPORTAL(null, "master-portal",
"String: format is master_ip:master_webport",
"Master Service portal to which to connect.(default: 127.0.0.1:8080)"),
BROKERPORTAL(null, "broker-portal",
"String: format is broker_ip:broker_webport",
"Broker Service URL to which to connect.(default: 127.0.0.1:8081)"),
MESSAGES(null, "messages",
"Long: count",
"The number of messages to send or consume, If not set, production or consumption is continual."),
Expand Down Expand Up @@ -95,7 +95,13 @@ public enum CliArgDef {
SYNCPRODUCE(null, "sync-produce",
"Synchronous production. (default: false)"),
WITHOUTDELAY(null, "without-delay",
"Production without delay. (default: false)");
"Production without delay. (default: false)"),
METHOD(null, "method",
"String: http call method",
"Http call method"),
ADMINMETHOD(null, "show-methods",
"Return http's methods.");




Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,11 @@ public enum WebFieldDef {
"Topic name", TBaseConstants.META_MAX_TOPICNAME_LENGTH,
RegexDef.TMP_STRING),
COMPSPARTITIONID(12, "partitionId", "pid", WebFieldType.COMPINT,
"Partition id", RegexDef.TMP_NUMBER);
"Partition id", RegexDef.TMP_NUMBER),
CALLERIP(13, "callerIp", "cip", WebFieldType.STRING,
"Caller ip address", TBaseConstants.META_MAX_CLIENT_HOSTNAME_LENGTH);





Expand All @@ -85,6 +89,12 @@ public enum WebFieldDef {
TBaseConstants.META_VALUE_UNDEFINED, false, null);
}

WebFieldDef(int id, String name, String shortName, WebFieldType type,
String desc, int valMaxLen) {
this(id, name, shortName, type, desc, valMaxLen,
TBaseConstants.META_VALUE_UNDEFINED, false, null);
}

WebFieldDef(int id, String name, String shortName, WebFieldType type,
String desc, RegexDef regexDef) {
this(id, name, shortName, type, desc,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tubemq.server.common.utils;

import com.google.gson.JsonObject;
import com.google.gson.JsonParser;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Map;

import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;
import org.apache.tubemq.corebase.utils.AddressUtils;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.server.common.fielddef.WebFieldDef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;



/**
* This class is used to process http connection and return result conversion,
* currently does not support https
*/
public class HttpUtils {
// log printer
private static final Logger logger =
LoggerFactory.getLogger(HttpUtils.class);


/* Send request to target server. */
public static JsonObject requestWebService(String url,
Map<String, String> inParamMap) throws Exception {
if (url == null) {
throw new Exception("Web service url is null!");
}
if (url.trim().toLowerCase().startsWith("https://")) {
throw new Exception("Unsupported https protocol!");
}
// process business parameters
ArrayList<BasicNameValuePair> params = new ArrayList<>();
if (inParamMap != null && !inParamMap.isEmpty()) {
for (Map.Entry<String, String> entry : inParamMap.entrySet()) {
params.add(new BasicNameValuePair(entry.getKey(), entry.getValue()));
}
if (inParamMap.containsKey(WebFieldDef.CALLERIP.shortName)
|| inParamMap.containsKey(WebFieldDef.CALLERIP.name)) {
params.add(new BasicNameValuePair(WebFieldDef.CALLERIP.name,
AddressUtils.getIPV4LocalAddress()));
}
}
// build connect configure
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(50000).setSocketTimeout(60000).build();
// build HttpClient and HttpPost objects
CloseableHttpClient httpclient = null;
HttpPost httpPost = null;
JsonObject jsonRes = null;
JsonParser jsonParser = new JsonParser();
try {
httpclient = HttpClients.custom()
.setDefaultRequestConfig(requestConfig).build();
httpPost = new HttpPost(url);
UrlEncodedFormEntity se = new UrlEncodedFormEntity(params);
httpPost.setEntity(se);
// send http request and process response
CloseableHttpResponse response = httpclient.execute(httpPost);
String returnStr = EntityUtils.toString(response.getEntity());
if (TStringUtils.isNotBlank(returnStr)
&& response.getStatusLine().getStatusCode() == 200) {
jsonRes = jsonParser.parse(returnStr).getAsJsonObject();
}
} catch (Throwable e) {
throw new Exception("Connecting " + url + " throw an error!", e);
} finally {
if (httpPost != null) {
httpPost.releaseConnection();
}
if (httpclient != null) {
try {
httpclient.close();
} catch (IOException ie) {
logger.error("Close HttpClient error.", ie);
}
}
}
return jsonRes;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.tubemq.server.common.webbase;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -67,6 +69,13 @@ public static void registerWebMethod(boolean isQryApi,
.append(webHandler.getClass().getName()).toString());
}

public static List<String> getRegisteredWebMethod() {
List<String> methods = new ArrayList<>();
methods.addAll(WEB_QRY_METHOD_MAP.keySet());
methods.addAll(WEB_MDY_METHOD_MAP.keySet());
return methods;
}



public static class WebApiRegInfo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void execute(RequestContext requestContext) {
WebMethodMapper.WebApiRegInfo webApiRegInfo = getWebApiRegInfo(isQuery, method);
if (webApiRegInfo == null) {
strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"Unsupported method: ")
.append(method).append("}");
.append(method).append("\"}");
requestContext.put("sb", strBuffer.toString());
} else {

Expand All @@ -120,8 +120,8 @@ public void execute(RequestContext requestContext) {
}
}
} catch (Throwable e) {
strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"Bad request from client :")
.append(e.getMessage()).append("}");
strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"Bad request from client, ")
.append(e.getMessage()).append("\"}");
requestContext.put("sb", strBuffer.toString());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,6 @@ public void addCommandOption(CliArgDef cliArgDef) {
protected abstract void initCommandOptions();


public abstract boolean parseParams(String[] args) throws Exception;
public abstract boolean processParams(String[] args) throws Exception;

}
Loading

0 comments on commit ab85c22

Please sign in to comment.