Skip to content
Permalink
Browse files
[Feature] Support spark connector sink data to Doris (#6256)
support spark conector write dataframe to doris
  • Loading branch information
huzk8 committed Aug 16, 2021
1 parent d121e12 commit e73f7964089f5681a1644989ddedfb0bbcdd3b50
Showing 10 changed files with 713 additions and 12 deletions.
@@ -0,0 +1,179 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.spark;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.doris.spark.exception.StreamLoadException;
import org.apache.doris.spark.rest.models.RespContent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Calendar;
import java.util.List;
import java.util.UUID;

/**
* DorisStreamLoad
**/
public class DorisStreamLoad implements Serializable{

private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoad.class);

private final static List<String> DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList("Success", "Publish Timeout"));
private static String loadUrlPattern = "http://%s/api/%s/%s/_stream_load?";
private String user;
private String passwd;
private String loadUrlStr;
private String hostPort;
private String db;
private String tbl;
private String authEncoding;

public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd) {
this.hostPort = hostPort;
this.db = db;
this.tbl = tbl;
this.user = user;
this.passwd = passwd;
this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
}

public String getLoadUrlStr() {
return loadUrlStr;
}

public String getHostPort() {
return hostPort;
}

public void setHostPort(String hostPort) {
this.hostPort = hostPort;
this.loadUrlStr = String.format(loadUrlPattern, hostPort, this.db, this.tbl);
}


private HttpURLConnection getConnection(String urlStr, String label) throws IOException {
URL url = new URL(urlStr);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setInstanceFollowRedirects(false);
conn.setRequestMethod("PUT");
String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
conn.setRequestProperty("Authorization", "Basic " + authEncoding);
conn.addRequestProperty("Expect", "100-continue");
conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
conn.addRequestProperty("label", label);
conn.setDoOutput(true);
conn.setDoInput(true);
return conn;
}

public static class LoadResponse {
public int status;
public String respMsg;
public String respContent;

public LoadResponse(int status, String respMsg, String respContent) {
this.status = status;
this.respMsg = respMsg;
this.respContent = respContent;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("status: ").append(status);
sb.append(", resp msg: ").append(respMsg);
sb.append(", resp content: ").append(respContent);
return sb.toString();
}
}

public void load(String value) throws StreamLoadException {
LoadResponse loadResponse = loadBatch(value);
LOG.info("Streamload Response:{}",loadResponse);
if(loadResponse.status != 200){
throw new StreamLoadException("stream load error: " + loadResponse.respContent);
}else{
ObjectMapper obj = new ObjectMapper();
try {
RespContent respContent = obj.readValue(loadResponse.respContent, RespContent.class);
if(!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())){
throw new StreamLoadException("stream load error: " + respContent.getMessage());
}
} catch (IOException e) {
throw new StreamLoadException(e);
}
}
}

private LoadResponse loadBatch(String value) {
Calendar calendar = Calendar.getInstance();
String label = String.format("audit_%s%02d%02d_%02d%02d%02d_%s",
calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH),
calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND),
UUID.randomUUID().toString().replaceAll("-", ""));

HttpURLConnection feConn = null;
HttpURLConnection beConn = null;
try {
// build request and send to new be location
beConn = getConnection(loadUrlStr, label);
// send data to be
BufferedOutputStream bos = new BufferedOutputStream(beConn.getOutputStream());
bos.write(value.getBytes());
bos.close();

// get respond
int status = beConn.getResponseCode();
String respMsg = beConn.getResponseMessage();
InputStream stream = (InputStream) beConn.getContent();
BufferedReader br = new BufferedReader(new InputStreamReader(stream));
StringBuilder response = new StringBuilder();
String line;
while ((line = br.readLine()) != null) {
response.append(line);
}
// log.info("AuditLoader plugin load with label: {}, response code: {}, msg: {}, content: {}",label, status, respMsg, response.toString());
return new LoadResponse(status, respMsg, response.toString());

} catch (Exception e) {
e.printStackTrace();
String err = "failed to load audit via AuditLoader plugin with label: " + label;
LOG.warn(err, e);
return new LoadResponse(-1, e.getMessage(), err);
} finally {
if (feConn != null) {
feConn.disconnect();
}
if (beConn != null) {
beConn.disconnect();
}
}
}
}
@@ -0,0 +1,38 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.spark.exception;

public class StreamLoadException extends Exception {
public StreamLoadException() {
super();
}
public StreamLoadException(String message) {
super(message);
}
public StreamLoadException(String message, Throwable cause) {
super(message, cause);
}
public StreamLoadException(Throwable cause) {
super(cause);
}
protected StreamLoadException(String message, Throwable cause,
boolean enableSuppression,
boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
@@ -49,33 +49,29 @@
import java.util.ArrayList;
import java.util.Set;
import java.util.HashSet;
import java.util.stream.Collectors;

import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.spark.cfg.ConfigurationOptions;
import org.apache.doris.spark.cfg.Settings;
import org.apache.doris.spark.cfg.SparkSettings;
import org.apache.doris.spark.exception.ConnectedFailedException;
import org.apache.doris.spark.exception.DorisException;
import org.apache.doris.spark.exception.IllegalArgumentException;
import org.apache.doris.spark.exception.ShouldNeverHappenException;
import org.apache.doris.spark.rest.models.Backend;
import org.apache.doris.spark.rest.models.BackendRow;
import org.apache.doris.spark.rest.models.QueryPlan;
import org.apache.doris.spark.rest.models.Schema;
import org.apache.doris.spark.rest.models.Tablet;
import org.apache.doris.spark.sql.DorisWriterOption;
import org.apache.http.HttpStatus;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
@@ -91,6 +87,8 @@ public class RestService implements Serializable {
private static final String API_PREFIX = "/api";
private static final String SCHEMA = "_schema";
private static final String QUERY_PLAN = "_query_plan";
private static final String BACKENDS = "/rest/v1/system?path=//backends";


/**
* send request to Doris FE and get response json string.
@@ -476,6 +474,65 @@ static int tabletCountLimitForOnePartition(Settings cfg, Logger logger) {
return tabletsSize;
}

/**
* choice a Doris BE node to request.
* @param options configuration of request
* @param logger slf4j logger
* @return the chosen one Doris BE node
* @throws IllegalArgumentException BE nodes is illegal
*/
@VisibleForTesting
public static String randomBackend(SparkSettings sparkSettings , DorisWriterOption options , Logger logger) throws DorisException, IOException {
// set user auth
sparkSettings.setProperty(DORIS_REQUEST_AUTH_USER,options.user());
sparkSettings.setProperty(DORIS_REQUEST_AUTH_PASSWORD,options.password());
String feNodes = options.feHostPort();
String feNode = randomEndpoint(feNodes, logger);
String beUrl = String.format("http://%s" + BACKENDS,feNode);
HttpGet httpGet = new HttpGet(beUrl);
String response = send(sparkSettings,httpGet, logger);
logger.info("Backend Info:{}",response);
List<BackendRow> backends = parseBackend(response, logger);
logger.trace("Parse beNodes '{}'.", backends);
if (backends == null || backends.isEmpty()) {
logger.error(ILLEGAL_ARGUMENT_MESSAGE, "beNodes", backends);
throw new IllegalArgumentException("beNodes", String.valueOf(backends));
}
Collections.shuffle(backends);
BackendRow backend = backends.get(0);
return backend.getIP() + ":" + backend.getHttpPort();
}



static List<BackendRow> parseBackend(String response, Logger logger) throws DorisException, IOException {
com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper();
Backend backend;
try {
backend = mapper.readValue(response, Backend.class);
} catch (com.fasterxml.jackson.core.JsonParseException e) {
String errMsg = "Doris BE's response is not a json. res: " + response;
logger.error(errMsg, e);
throw new DorisException(errMsg, e);
} catch (com.fasterxml.jackson.databind.JsonMappingException e) {
String errMsg = "Doris BE's response cannot map to schema. res: " + response;
logger.error(errMsg, e);
throw new DorisException(errMsg, e);
} catch (IOException e) {
String errMsg = "Parse Doris BE's response to json failed. res: " + response;
logger.error(errMsg, e);
throw new DorisException(errMsg, e);
}

if (backend == null) {
logger.error(SHOULD_NOT_HAPPEN_MESSAGE);
throw new ShouldNeverHappenException();
}
List<BackendRow> backendRows = backend.getRows().stream().filter(v -> v.getAlive()).collect(Collectors.toList());
logger.debug("Parsing schema result is '{}'.", backendRows);
return backendRows;
}

/**
* translate BE tablets map to Doris RDD partition.
* @param cfg configuration of request
@@ -0,0 +1,39 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.spark.rest.models;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.List;

/**
* Be response model
**/
@JsonIgnoreProperties(ignoreUnknown = true)
public class Backend {

@JsonProperty(value = "rows")
private List<BackendRow> rows;

public List<BackendRow> getRows() {
return rows;
}

public void setRows(List<BackendRow> rows) {
this.rows = rows;
}
}

0 comments on commit e73f796

Please sign in to comment.