Skip to content
Permalink
Browse files
[Bug] Modify spark, flink doris connector to send request to FE, fix …
…the problem of POST method, it should be the same as the method when sending the request (#5788)

Modify spark, flink doris connector to send request to FE, fix the problem of POST method,
it should be the same as the method when sending the request
  • Loading branch information
hf200012 committed May 19, 2021
1 parent d499ac5 commit 3618f40e6346e44972b71991e72d2e55f89cc331
Showing 1 changed file with 12 additions and 13 deletions.
@@ -20,6 +20,7 @@
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.commons.io.IOUtils;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
@@ -42,7 +43,6 @@
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.entity.StringEntity;


import org.slf4j.Logger;

import java.io.BufferedReader;
@@ -65,8 +65,6 @@
import java.util.Set;
import java.util.stream.Collectors;



import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_MIN;
@@ -110,28 +108,30 @@ private static String send(DorisOptions options,DorisReadOptions readOptions, Ht
.build();

request.setConfig(requestConfig);


logger.info("Send request to Doris FE '{}' with user '{}'.", request.getURI(), options.getUsername());

IOException ex = null;
int statusCode = -1;

for (int attempt = 0; attempt < retries; attempt++) {
logger.debug("Attempt {} to request {}.", attempt, request.getURI());
try {
String response;
if(request instanceof HttpGet){
if (request instanceof HttpGet){
response = getConnectionGet(request.getURI().toString(), options.getUsername(), options.getPassword(),logger);
}else{
response = getConnection(request, options.getUsername(), options.getPassword(),logger);
} else {
response = getConnectionPost(request, options.getUsername(), options.getPassword(),logger);
}
if (response == null) {
logger.warn("Failed to get response from Doris FE {}, http code is {}",
request.getURI(), statusCode);
continue;
}
logger.trace("Success get response from Doris FE: {}, response is: {}.",
request.getURI(), response);
//Handle the problem of inconsistent data format returned by http v1 and v2
ObjectMapper mapper = new ObjectMapper();
Map map = mapper.readValue(response, Map.class);
if(map.containsKey("code") && map.containsKey("msg")) {
if (map.containsKey("code") && map.containsKey("msg")) {
Object data = map.get("data");
return mapper.writeValueAsString(data);
} else {
@@ -147,14 +147,13 @@ private static String send(DorisOptions options,DorisReadOptions readOptions, Ht
throw new ConnectedFailedException(request.getURI().toString(), statusCode, ex);
}

private static String getConnection(HttpRequestBase request,String user, String passwd,Logger logger) throws IOException {
private static String getConnectionPost(HttpRequestBase request,String user, String passwd,Logger logger) throws IOException {
URL url = new URL(request.getURI().toString());
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setInstanceFollowRedirects(false);
conn.setRequestMethod("POST");
conn.setRequestMethod(request.getMethod());
String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
conn.setRequestProperty("Authorization", "Basic " + authEncoding);

InputStream content = ((HttpPost)request).getEntity().getContent();
String res = IOUtils.toString(content);
conn.setDoOutput(true);

0 comments on commit 3618f40

Please sign in to comment.