Skip to content
Permalink
Browse files
[Feature] support spark connector sink stream data to doris (#6761)
* [Feature] support spark connector sink stream data to doris

* [Doc] Add spark-connector batch/stream writing instructions

* add license and remove meaningless blanks code

Co-authored-by: wei.zhao <wei.zhao@aispeech.com>
  • Loading branch information
chovy-3012 and wei.zhao committed Sep 28, 2021
1 parent 9191e5b commit acf886938b70f614c1c8ad1676abf75c5719769d
Showing 9 changed files with 327 additions and 51 deletions.
@@ -139,6 +139,12 @@
</exclusions>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId>
<version>${spark.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
@@ -0,0 +1,63 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.spark;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import org.apache.doris.spark.cfg.SparkSettings;
import org.apache.doris.spark.exception.DorisException;

import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
* a cached streamload client for each partition
*/
public class CachedDorisStreamLoadClient {
private static final long cacheExpireTimeout = 30 * 60;
private static LoadingCache<SparkSettings, DorisStreamLoad> dorisStreamLoadLoadingCache;

static {
dorisStreamLoadLoadingCache = CacheBuilder.newBuilder()
.expireAfterWrite(cacheExpireTimeout, TimeUnit.SECONDS)
.removalListener(new RemovalListener<Object, Object>() {
@Override
public void onRemoval(RemovalNotification<Object, Object> removalNotification) {
//do nothing
}
})
.build(
new CacheLoader<SparkSettings, DorisStreamLoad>() {
@Override
public DorisStreamLoad load(SparkSettings sparkSettings) throws IOException, DorisException {
DorisStreamLoad dorisStreamLoad = new DorisStreamLoad(sparkSettings);
return dorisStreamLoad;
}
}
);
}

public static DorisStreamLoad getOrCreate(SparkSettings settings) throws ExecutionException {
DorisStreamLoad dorisStreamLoad = dorisStreamLoadLoadingCache.get(settings);
return dorisStreamLoad;
}
}
@@ -17,7 +17,11 @@
package org.apache.doris.spark;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.doris.spark.cfg.ConfigurationOptions;
import org.apache.doris.spark.cfg.SparkSettings;
import org.apache.doris.spark.exception.DorisException;
import org.apache.doris.spark.exception.StreamLoadException;
import org.apache.doris.spark.rest.RestService;
import org.apache.doris.spark.rest.models.RespContent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,12 +40,16 @@
import java.util.Base64;
import java.util.Calendar;
import java.util.List;
import java.util.StringJoiner;
import java.util.UUID;

/**
* DorisStreamLoad
**/
public class DorisStreamLoad implements Serializable{
public static final String FIELD_DELIMITER = "\t";
public static final String LINE_DELIMITER = "\n";
public static final String NULL_VALUE = "\\N";

private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoad.class);

@@ -65,6 +73,18 @@ public DorisStreamLoad(String hostPort, String db, String tbl, String user, Stri
this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
}

public DorisStreamLoad(SparkSettings settings) throws IOException, DorisException {
String hostPort = RestService.randomBackend(settings, LOG);
this.hostPort = hostPort;
String[] dbTable = settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\.");
this.db = dbTable[0];
this.tbl = dbTable[1];
this.user = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_USER);
this.passwd = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD);
this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
}

public String getLoadUrlStr() {
return loadUrlStr;
}
@@ -84,7 +104,6 @@ private HttpURLConnection getConnection(String urlStr, String label) throws IOEx
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setInstanceFollowRedirects(false);
conn.setRequestMethod("PUT");
String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
conn.setRequestProperty("Authorization", "Basic " + authEncoding);
conn.addRequestProperty("Expect", "100-continue");
conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
@@ -114,6 +133,22 @@ public String toString() {
}
}

public void load(List<List<Object>> rows) throws StreamLoadException {
StringJoiner lines = new StringJoiner(LINE_DELIMITER);
for (List<Object> row : rows) {
StringJoiner line = new StringJoiner(FIELD_DELIMITER);
for (Object field : row) {
if (field == null) {
line.add(NULL_VALUE);
} else {
line.add(field.toString());
}
}
lines.add(line.toString());
}
load(lines.toString());
}

public void load(String value) throws StreamLoadException {
LoadResponse loadResponse = loadBatch(value);
LOG.info("Streamload Response:{}",loadResponse);
@@ -98,4 +98,17 @@ public String save() throws IllegalArgumentException {
Properties copy = asProperties();
return IOUtils.propsToString(copy);
}

@Override
public int hashCode() {
return asProperties().hashCode();
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
return asProperties().equals(((Settings) obj).asProperties());
}
}
@@ -24,9 +24,10 @@
import com.google.common.base.Preconditions;

import scala.Option;
import scala.Serializable;
import scala.Tuple2;

public class SparkSettings extends Settings {
public class SparkSettings extends Settings implements Serializable {

private final SparkConf cfg;

@@ -65,7 +65,6 @@
import org.apache.doris.spark.rest.models.QueryPlan;
import org.apache.doris.spark.rest.models.Schema;
import org.apache.doris.spark.rest.models.Tablet;
import org.apache.doris.spark.sql.DorisWriterOption;
import org.apache.http.HttpStatus;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
@@ -476,17 +475,13 @@ static int tabletCountLimitForOnePartition(Settings cfg, Logger logger) {

/**
* choice a Doris BE node to request.
* @param options configuration of request
* @param logger slf4j logger
* @return the chosen one Doris BE node
* @throws IllegalArgumentException BE nodes is illegal
*/
@VisibleForTesting
public static String randomBackend(SparkSettings sparkSettings , DorisWriterOption options , Logger logger) throws DorisException, IOException {
// set user auth
sparkSettings.setProperty(DORIS_REQUEST_AUTH_USER,options.user());
sparkSettings.setProperty(DORIS_REQUEST_AUTH_PASSWORD,options.password());
String feNodes = options.feHostPort();
public static String randomBackend(SparkSettings sparkSettings , Logger logger) throws DorisException, IOException {
String feNodes = sparkSettings.getProperty(DORIS_FENODES);
String feNode = randomEndpoint(feNodes, logger);
String beUrl = String.format("http://%s" + BACKENDS,feNode);
HttpGet httpGet = new HttpGet(beUrl);
@@ -17,25 +17,24 @@

package org.apache.doris.spark.sql

import java.io.IOException
import java.util.StringJoiner

import org.apache.commons.collections.CollectionUtils
import org.apache.doris.spark.DorisStreamLoad
import org.apache.doris.spark.cfg.SparkSettings
import org.apache.doris.spark.exception.DorisException
import org.apache.doris.spark.rest.RestService
import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamWriteSupport}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, Filter, RelationProvider}
import org.apache.spark.sql.types.StructType
import org.json4s.jackson.Json

import scala.collection.mutable.ListBuffer
import scala.util.Random
import java.io.IOException
import java.util
import scala.collection.JavaConversions.mapAsScalaMap
import scala.collection.JavaConverters.mapAsJavaMapConverter
import scala.util.control.Breaks

private[sql] class DorisSourceProvider extends DataSourceRegister with RelationProvider with CreatableRelationProvider with Logging {
private[sql] class DorisSourceProvider extends DataSourceRegister with RelationProvider with CreatableRelationProvider with StreamWriteSupport with Logging {
override def shortName(): String = "doris"

override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = {
@@ -50,41 +49,29 @@ private[sql] class DorisSourceProvider extends DataSourceRegister with RelationP
mode: SaveMode, parameters: Map[String, String],
data: DataFrame): BaseRelation = {

val dorisWriterOption = DorisWriterOption(parameters)
val sparkSettings = new SparkSettings(sqlContext.sparkContext.getConf)
// choose available be node
val choosedBeHost = RestService.randomBackend(sparkSettings, dorisWriterOption, log)
sparkSettings.merge(Utils.params(parameters, log).asJava)
// init stream loader
val dorisStreamLoader = new DorisStreamLoad(choosedBeHost, dorisWriterOption.dbName, dorisWriterOption.tbName, dorisWriterOption.user, dorisWriterOption.password)
val fieldDelimiter: String = "\t"
val lineDelimiter: String = "\n"
val NULL_VALUE: String = "\\N"
val dorisStreamLoader = new DorisStreamLoad(sparkSettings)

val maxRowCount = dorisWriterOption.maxRowCount
val maxRetryTimes = dorisWriterOption.maxRetryTimes
val maxRowCount = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_BATCH_SIZE, ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT)
val maxRetryTimes = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_REQUEST_RETRIES, ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT)

data.rdd.foreachPartition(partition => {

val buffer = ListBuffer[String]()
val rowsBuffer: util.List[util.List[Object]] = new util.ArrayList[util.List[Object]](maxRowCount)
partition.foreach(row => {
val value = new StringJoiner(fieldDelimiter)
// create one row string
val line: util.List[Object] = new util.ArrayList[Object]()
for (i <- 0 until row.size) {
val field = row.get(i)
if (field == null) {
value.add(NULL_VALUE)
} else {
value.add(field.toString)
}
line.add(field.asInstanceOf[AnyRef])
}
// add one row string to buffer
buffer += value.toString
if (buffer.size > maxRowCount) {
rowsBuffer.add(line)
if (rowsBuffer.size > maxRowCount) {
flush
}
})
// flush buffer
if (buffer.nonEmpty) {
if (!rowsBuffer.isEmpty) {
flush
}

@@ -98,16 +85,16 @@ private[sql] class DorisSourceProvider extends DataSourceRegister with RelationP

for (i <- 1 to maxRetryTimes) {
try {
dorisStreamLoader.load(buffer.mkString(lineDelimiter))
buffer.clear()
dorisStreamLoader.load(rowsBuffer)
rowsBuffer.clear()
loop.break()
}
catch {
case e: Exception =>
try {
Thread.sleep(1000 * i)
dorisStreamLoader.load(buffer.mkString(lineDelimiter))
buffer.clear()
dorisStreamLoader.load(rowsBuffer)
rowsBuffer.clear()
} catch {
case ex: InterruptedException =>
Thread.currentThread.interrupt()
@@ -136,8 +123,9 @@ private[sql] class DorisSourceProvider extends DataSourceRegister with RelationP
}
}





override def createStreamWriter(queryId: String, structType: StructType, outputMode: OutputMode, dataSourceOptions: DataSourceOptions): StreamWriter = {
val sparkSettings = new SparkSettings(new SparkConf())
sparkSettings.merge(Utils.params(dataSourceOptions.asMap().toMap, log).asJava)
new DorisStreamWriter(sparkSettings)
}
}

0 comments on commit acf8869

Please sign in to comment.