Skip to content
Permalink
Browse files
[Bug] Modify spark, flink doris connector to send request to FE, fix …
…the problem of POST method, it should be the same as the method when sending the request (#5788)

Modify spark, flink doris connector to send request to FE, fix the problem of POST method,
it should be the same as the method when sending the request
  • Loading branch information
hf200012 committed May 19, 2021
1 parent 293bd8f commit d121e12309cf42c59d47628d350b54b1088eca28
Showing 1 changed file with 50 additions and 30 deletions.
@@ -31,8 +31,10 @@
import static org.apache.doris.spark.util.ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE;
import static org.apache.doris.spark.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.io.Serializable;
import java.net.HttpURLConnection;
@@ -115,39 +117,36 @@ private static String send(Settings cfg, HttpRequestBase request, Logger logger)
.build();

request.setConfig(requestConfig);

String user = cfg.getProperty(DORIS_REQUEST_AUTH_USER, "");
String password = cfg.getProperty(DORIS_REQUEST_AUTH_PASSWORD, "");

logger.info("Send request to Doris FE '{}' with user '{}'.", request.getURI(), user);

IOException ex = null;
int statusCode = -1;

for (int attempt = 0; attempt < retries; attempt++) {
logger.debug("Attempt {} to request {}.", attempt, request.getURI());
try {
HttpURLConnection conn = getConnection(request, user, password);
statusCode = conn.getResponseCode();
if (statusCode != HttpStatus.SC_OK) {
String response;
if (request instanceof HttpGet){
response = getConnectionGet(request.getURI().toString(), user, password,logger);
} else {
response = getConnectionPost(request,user, password,logger);
}
if (response == null) {
logger.warn("Failed to get response from Doris FE {}, http code is {}",
request.getURI(), statusCode);
continue;
}
InputStream stream = (InputStream) conn.getContent();
String res = IOUtils.toString(stream);
logger.trace("Success get response from Doris FE: {}, response is: {}.",
request.getURI(), res);

request.getURI(), response);
ObjectMapper mapper = new ObjectMapper();

Map map = mapper.readValue(res, Map.class);
Map map = mapper.readValue(response, Map.class);
//Handle the problem of inconsistent data format returned by http v1 and v2
if(map.containsKey("code") && map.containsKey("msg")) {
if (map.containsKey("code") && map.containsKey("msg")) {
Object data = map.get("data");
return mapper.writeValueAsString(data);
} else {
return res;
return response;
}
} catch (IOException e) {
ex = e;
@@ -159,32 +158,53 @@ private static String send(Settings cfg, HttpRequestBase request, Logger logger)
throw new ConnectedFailedException(request.getURI().toString(), statusCode, ex);
}

private static String getConnectionGet(String request,String user, String passwd,Logger logger) throws IOException {
URL realUrl = new URL(request);
// open connection
HttpURLConnection connection = (HttpURLConnection)realUrl.openConnection();
String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
connection.setRequestProperty("Authorization", "Basic " + authEncoding);

/**
* Get http connection
* @param request
* @param user
* @param passwd
* @return
* @throws IOException
*/
private static HttpURLConnection getConnection(HttpRequestBase request, String user, String passwd) throws IOException {
connection.connect();
return parseResponse(connection,logger);
}

private static String parseResponse(HttpURLConnection connection,Logger logger) throws IOException {
if (connection.getResponseCode() != HttpStatus.SC_OK) {
logger.warn("Failed to get response from Doris {}, http code is {}",
connection.getURL(), connection.getResponseCode());
throw new IOException("Failed to get response from Doris");
}
String result = "";
BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream(), "utf-8"));
String line;
while ((line = in.readLine()) != null) {
result += line;
}
if (in != null) {
in.close();
}
return result;
}

private static String getConnectionPost(HttpRequestBase request,String user, String passwd,Logger logger) throws IOException {
URL url = new URL(request.getURI().toString());
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setInstanceFollowRedirects(false);
conn.setRequestMethod("POST");
conn.setRequestMethod(request.getMethod());
String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
conn.setRequestProperty("Authorization", "Basic " + authEncoding);

InputStream content = ((HttpPost) request).getEntity().getContent();
String s = IOUtils.toString(content);

InputStream content = ((HttpPost)request).getEntity().getContent();
String res = IOUtils.toString(content);
conn.setDoOutput(true);
conn.setDoInput(true);
PrintWriter out = new PrintWriter(conn.getOutputStream());
out.print(s);
// send request params
out.print(res);
// flush
out.flush();
return conn;
// read response
return parseResponse(conn,logger);
}
/**
* parse table identifier to array.

0 comments on commit d121e12

Please sign in to comment.