Skip to content
Permalink
Browse files
[Revert] Revert RestService.java (#6994)
  • Loading branch information
chovy-3012 committed Nov 4, 2021
1 parent d8ad175 commit 00589ffc6e6768c98b3b6a28871065c75b1f1cc4
Showing 2 changed files with 116 additions and 43 deletions.
@@ -31,19 +31,27 @@
import static org.apache.doris.spark.util.ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE;
import static org.apache.doris.spark.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.io.Serializable;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
import java.util.Base64;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.ArrayList;
import java.util.Set;
import java.util.HashSet;
import java.util.stream.Collectors;

import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.spark.cfg.ConfigurationOptions;
import org.apache.doris.spark.cfg.Settings;
@@ -52,22 +60,17 @@
import org.apache.doris.spark.exception.DorisException;
import org.apache.doris.spark.exception.IllegalArgumentException;
import org.apache.doris.spark.exception.ShouldNeverHappenException;
import org.apache.doris.spark.rest.models.*;
import org.apache.doris.spark.rest.models.Backend;
import org.apache.doris.spark.rest.models.BackendRow;
import org.apache.doris.spark.rest.models.QueryPlan;
import org.apache.doris.spark.rest.models.Schema;
import org.apache.doris.spark.rest.models.Tablet;
import org.apache.http.HttpStatus;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
@@ -83,7 +86,8 @@ public class RestService implements Serializable {
private static final String API_PREFIX = "/api";
private static final String SCHEMA = "_schema";
private static final String QUERY_PLAN = "_query_plan";
private static final String BACKENDS = "/api/show_proc?path=//backends";
private static final String BACKENDS = "/rest/v1/system?path=//backends";


/**
* send request to Doris FE and get response json string.
@@ -110,36 +114,37 @@ private static String send(Settings cfg, HttpRequestBase request, Logger logger)
.build();

request.setConfig(requestConfig);

String user = cfg.getProperty(DORIS_REQUEST_AUTH_USER, "");
String password = cfg.getProperty(DORIS_REQUEST_AUTH_PASSWORD, "");

CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials(user, password));
HttpClientContext context = HttpClientContext.create();
context.setCredentialsProvider(credentialsProvider);
logger.info("Send request to Doris FE '{}' with user '{}'.", request.getURI(), user);

IOException ex = null;
int statusCode = -1;

for (int attempt = 0; attempt < retries; attempt++) {
CloseableHttpClient httpClient = HttpClients.createDefault();
logger.debug("Attempt {} to request {}.", attempt, request.getURI());
try {
CloseableHttpResponse response = httpClient.execute(request, context);
statusCode = response.getStatusLine().getStatusCode();
if (statusCode != HttpStatus.SC_OK) {
String response;
if (request instanceof HttpGet){
response = getConnectionGet(request.getURI().toString(), user, password,logger);
} else {
response = getConnectionPost(request,user, password,logger);
}
if (response == null) {
logger.warn("Failed to get response from Doris FE {}, http code is {}",
request.getURI(), statusCode);
continue;
}
String res = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
logger.trace("Success get response from Doris FE: {}, response is: {}.",
request.getURI(), res);
return res;
request.getURI(), response);
ObjectMapper mapper = new ObjectMapper();
Map map = mapper.readValue(response, Map.class);
//Handle the problem of inconsistent data format returned by http v1 and v2
if (map.containsKey("code") && map.containsKey("msg")) {
Object data = map.get("data");
return mapper.writeValueAsString(data);
} else {
return response;
}
} catch (IOException e) {
ex = e;
logger.warn(CONNECT_FAILED_MESSAGE, request.getURI(), e);
@@ -150,6 +155,54 @@ private static String send(Settings cfg, HttpRequestBase request, Logger logger)
throw new ConnectedFailedException(request.getURI().toString(), statusCode, ex);
}

private static String getConnectionGet(String request,String user, String passwd,Logger logger) throws IOException {
URL realUrl = new URL(request);
// open connection
HttpURLConnection connection = (HttpURLConnection)realUrl.openConnection();
String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
connection.setRequestProperty("Authorization", "Basic " + authEncoding);

connection.connect();
return parseResponse(connection,logger);
}

private static String parseResponse(HttpURLConnection connection,Logger logger) throws IOException {
if (connection.getResponseCode() != HttpStatus.SC_OK) {
logger.warn("Failed to get response from Doris {}, http code is {}",
connection.getURL(), connection.getResponseCode());
throw new IOException("Failed to get response from Doris");
}
String result = "";
BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream(), "utf-8"));
String line;
while ((line = in.readLine()) != null) {
result += line;
}
if (in != null) {
in.close();
}
return result;
}

private static String getConnectionPost(HttpRequestBase request,String user, String passwd,Logger logger) throws IOException {
URL url = new URL(request.getURI().toString());
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setInstanceFollowRedirects(false);
conn.setRequestMethod(request.getMethod());
String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
conn.setRequestProperty("Authorization", "Basic " + authEncoding);
InputStream content = ((HttpPost)request).getEntity().getContent();
String res = IOUtils.toString(content);
conn.setDoOutput(true);
conn.setDoInput(true);
PrintWriter out = new PrintWriter(conn.getOutputStream());
// send request params
out.print(res);
// flush
out.flush();
// read response
return parseResponse(conn,logger);
}
/**
* parse table identifier to array.
* @param tableIdentifier table identifier string
@@ -426,7 +479,6 @@ static int tabletCountLimitForOnePartition(Settings cfg, Logger logger) {
* @return the chosen one Doris BE node
* @throws IllegalArgumentException BE nodes is illegal
*/
@VisibleForTesting
public static String randomBackend(SparkSettings sparkSettings , Logger logger) throws DorisException, IOException {
String feNodes = sparkSettings.getProperty(DORIS_FENODES);
String feNode = randomEndpoint(feNodes, logger);
@@ -446,12 +498,19 @@ public static String randomBackend(SparkSettings sparkSettings , Logger logger)
}



/**
* translate Doris FE response to inner {@link BackendRow} struct.
* @param response Doris FE response
* @param logger {@link Logger}
* @return inner {@link List<BackendRow>} struct
* @throws DorisException,IOException throw when translate failed
* */
@VisibleForTesting
static List<BackendRow> parseBackend(String response, Logger logger) throws DorisException, IOException {
com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper();
List<List<String>> backend;
Backend backend;
try {
backend = mapper.readValue(response, List.class);
backend = mapper.readValue(response, Backend.class);
} catch (com.fasterxml.jackson.core.JsonParseException e) {
String errMsg = "Doris BE's response is not a json. res: " + response;
logger.error(errMsg, e);
@@ -470,13 +529,7 @@ static List<BackendRow> parseBackend(String response, Logger logger) throws Dori
logger.error(SHOULD_NOT_HAPPEN_MESSAGE);
throw new ShouldNeverHappenException();
}
List<BackendRow> backendRows = backend.stream().map(array -> {
BackendRow backendRow = new BackendRow();
backendRow.setIP(array.get(2));
backendRow.setHttpPort(array.get(6));
backendRow.setAlive(Boolean.parseBoolean(array.get(10)));
return backendRow;
}).filter(v -> v.getAlive()).collect(Collectors.toList());
List<BackendRow> backendRows = backend.getRows().stream().filter(v -> v.getAlive()).collect(Collectors.toList());
logger.debug("Parsing schema result is '{}'.", backendRows);
return backendRows;
}
@@ -494,7 +547,7 @@ static List<BackendRow> parseBackend(String response, Logger logger) throws Dori
*/
@VisibleForTesting
static List<PartitionDefinition> tabletsMapToPartition(Settings cfg, Map<String, List<Long>> be2Tablets,
String opaquedQueryPlan, String database, String table, Logger logger)
String opaquedQueryPlan, String database, String table, Logger logger)
throws IllegalArgumentException {
int tabletsSize = tabletCountLimitForOnePartition(cfg, logger);
List<PartitionDefinition> partitions = new ArrayList<>();
@@ -37,6 +37,7 @@
import org.apache.doris.spark.cfg.Settings;
import org.apache.doris.spark.exception.DorisException;
import org.apache.doris.spark.exception.IllegalArgumentException;
import org.apache.doris.spark.rest.models.BackendRow;
import org.apache.doris.spark.rest.models.Field;
import org.apache.doris.spark.rest.models.QueryPlan;
import org.apache.doris.spark.rest.models.Schema;
@@ -293,4 +294,23 @@ public void testTabletsMapToPartition() throws Exception {

Assert.assertEquals(expected, actual);
}

@Test
public void testParseBackend() throws Exception {
String response = "{\"href_columns\":[\"BackendId\"],\"parent_url\":\"/rest/v1/system?path=/\"," +
"\"column_names\":[\"BackendId\",\"Cluster\",\"IP\",\"HostName\",\"HeartbeatPort\",\"BePort\"," +
"\"HttpPort\",\"BrpcPort\",\"LastStartTime\",\"LastHeartbeat\",\"Alive\",\"SystemDecommissioned\"," +
"\"ClusterDecommissioned\",\"TabletNum\",\"DataUsedCapacity\",\"AvailCapacity\",\"TotalCapacity\"," +
"\"UsedPct\",\"MaxDiskUsedPct\",\"Tag\",\"ErrMsg\",\"Version\",\"Status\"],\"rows\":[{\"HttpPort\":" +
"\"8040\",\"Status\":\"{\\\"lastSuccessReportTabletsTime\\\":\\\"N/A\\\",\\\"lastStreamLoadTime\\\":" +
"-1}\",\"SystemDecommissioned\":\"false\",\"LastHeartbeat\":\"\\\\N\",\"DataUsedCapacity\":\"0.000 " +
"\",\"ErrMsg\":\"\",\"IP\":\"127.0.0.1\",\"UsedPct\":\"0.00 %\",\"__hrefPaths\":[\"/rest/v1/system?" +
"path=//backends/10002\"],\"Cluster\":\"default_cluster\",\"Alive\":\"true\",\"MaxDiskUsedPct\":" +
"\"0.00 %\",\"BrpcPort\":\"-1\",\"BePort\":\"-1\",\"ClusterDecommissioned\":\"false\"," +
"\"AvailCapacity\":\"1.000 B\",\"Version\":\"\",\"BackendId\":\"10002\",\"HeartbeatPort\":\"9050\"," +
"\"LastStartTime\":\"\\\\N\",\"TabletNum\":\"0\",\"TotalCapacity\":\"0.000 \",\"Tag\":" +
"\"{\\\"location\\\" : \\\"default\\\"}\",\"HostName\":\"localhost\"}]}";
List<BackendRow> backendRows = RestService.parseBackend(response, logger);
Assert.assertTrue(backendRows != null && !backendRows.isEmpty());
}
}

0 comments on commit 00589ff

Please sign in to comment.