Skip to content
Permalink
Browse files
[improvement] stream load data is converted to json format (#15)
* [improvement] stream load data is converted to json format

* Add unit test and Schema.java add keysType property

* modify doris read kafka only with jsonobject format

* format code

Co-authored-by: smallhibiscus <844981280>
  • Loading branch information
smallhibiscus committed Apr 14, 2022
1 parent 4c621dc commit b80046d55682b7760d8efbf8b66337cceb369ed8
Showing 5 changed files with 157 additions and 23 deletions.
@@ -16,6 +16,7 @@
// under the License.
package org.apache.doris.spark;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.doris.spark.cfg.ConfigurationOptions;
import org.apache.doris.spark.cfg.SparkSettings;
@@ -35,13 +36,16 @@
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Calendar;
import java.util.List;
import java.util.StringJoiner;
import java.util.UUID;
import java.util.ArrayList;
import java.util.Map;
import java.util.Base64;
import java.util.HashMap;
import java.util.Calendar;


/**
* DorisStreamLoad
@@ -63,6 +67,7 @@ public class DorisStreamLoad implements Serializable{
private String tbl;
private String authEncoding;
private String columns;
private String[] dfColumns;

public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd) {
this.hostPort = hostPort;
@@ -87,6 +92,20 @@ public DorisStreamLoad(SparkSettings settings) throws IOException, DorisExceptio
this.columns = settings.getProperty(ConfigurationOptions.DORIS_WRITE_FIELDS);
}

public DorisStreamLoad(SparkSettings settings, String[] dfColumns) throws IOException, DorisException {
String hostPort = RestService.randomBackendV2(settings, LOG);
this.hostPort = hostPort;
String[] dbTable = settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\.");
this.db = dbTable[0];
this.tbl = dbTable[1];
this.user = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_USER);
this.passwd = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD);
this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
this.columns = settings.getProperty(ConfigurationOptions.DORIS_WRITE_FIELDS);
this.dfColumns = dfColumns;
}

public String getLoadUrlStr() {
return loadUrlStr;
}
@@ -115,6 +134,8 @@ private HttpURLConnection getConnection(String urlStr, String label) throws IOEx
}
conn.setDoOutput(true);
conn.setDoInput(true);
conn.addRequestProperty("format", "json");
conn.addRequestProperty("strip_outer_array", "true");
return conn;
}

@@ -138,7 +159,7 @@ public String toString() {
}
}

public String listToString(List<List<Object>> rows){
public String listToString(List<List<Object>> rows) {
StringJoiner lines = new StringJoiner(LINE_DELIMITER);
for (List<Object> row : rows) {
StringJoiner line = new StringJoiner(FIELD_DELIMITER);
@@ -155,10 +176,24 @@ public String listToString(List<List<Object>> rows){
}


public void load(List<List<Object>> rows) throws StreamLoadException {
String records = listToString(rows);
load(records);
public void loadV2(List<List<Object>> rows) throws StreamLoadException, JsonProcessingException {
List<Map<Object,Object>> dataList = new ArrayList<>();
try {
for (List<Object> row : rows) {
Map<Object,Object> dataMap = new HashMap<>();
if (dfColumns.length == row.size()) {
for (int i = 0; i < dfColumns.length; i++) {
dataMap.put(dfColumns[i], row.get(i));
}
}
dataList.add(dataMap);
}
} catch (Exception e) {
throw new StreamLoadException("The number of configured columns does not match the number of data columns.");
}
load((new ObjectMapper()).writeValueAsString(dataList));
}

public void load(String value) throws StreamLoadException {
LOG.debug("Streamload Request:{} ,Body:{}", loadUrlStr, value);
LoadResponse loadResponse = loadBatch(value);
@@ -23,6 +23,7 @@

public class Schema {
private int status = 0;
private String keysType;
private List<Field> properties;
private String keysType;

@@ -42,6 +43,14 @@ public void setStatus(int status) {
this.status = status;
}

public String getKeysType() {
return keysType;
}

public void setKeysType(String keysType) {
this.keysType = keysType;
}

public List<Field> getProperties() {
return properties;
}
@@ -99,6 +108,7 @@ public int hashCode() {
public String toString() {
return "Schema{" +
"status=" + status +
", keysType='" + keysType +
", properties=" + properties +
'}';
}
@@ -60,7 +60,7 @@ private[sql] class DorisSourceProvider extends DataSourceRegister
val sparkSettings = new SparkSettings(sqlContext.sparkContext.getConf)
sparkSettings.merge(Utils.params(parameters, logger).asJava)
// init stream loader
val dorisStreamLoader = new DorisStreamLoad(sparkSettings)
val dorisStreamLoader = new DorisStreamLoad(sparkSettings, data.columns)

val maxRowCount = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE, ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT)
val maxRetryTimes = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES, ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT)
@@ -93,7 +93,7 @@ private[sql] class DorisSourceProvider extends DataSourceRegister

for (i <- 1 to maxRetryTimes) {
try {
dorisStreamLoader.load(rowsBuffer)
dorisStreamLoader.loadV2(rowsBuffer)
rowsBuffer.clear()
loop.break()
}
@@ -102,7 +102,7 @@ private[sql] class DorisSourceProvider extends DataSourceRegister
try {
logger.warn("Failed to load data on BE: {} node ", dorisStreamLoader.getLoadUrlStr)
//If the current BE node fails to execute Stream Load, randomly switch to other BE nodes and try again
dorisStreamLoader.setHostPort(RestService.randomBackendV2(sparkSettings,logger))
dorisStreamLoader.setHostPort(RestService.randomBackendV2(sparkSettings, logger))
Thread.sleep(1000 * i)
} catch {
case ex: InterruptedException =>
@@ -113,7 +113,7 @@ private[sql] class DorisSourceProvider extends DataSourceRegister
}
}

if(!rowsBuffer.isEmpty){
if (!rowsBuffer.isEmpty) {
logger.warn("Data that failed to load : " + dorisStreamLoader.listToString(rowsBuffer))
throw new IOException(s"Failed to load data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max retry times.")
}
@@ -17,6 +17,7 @@

package org.apache.doris.spark.sql

import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
import org.apache.doris.spark.{CachedDorisStreamLoadClient, DorisStreamLoad}
import org.apache.spark.sql.execution.QueryExecution
@@ -25,7 +26,6 @@ import org.apache.spark.sql.{DataFrame, SQLContext}
import org.slf4j.{Logger, LoggerFactory}
import java.io.IOException
import java.util

import org.apache.doris.spark.rest.RestService

import scala.util.control.Breaks
@@ -49,20 +49,20 @@ private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSe

def write(queryExecution: QueryExecution): Unit = {
queryExecution.toRdd.foreachPartition(iter => {
val rowsBuffer: util.List[util.List[Object]] = new util.ArrayList[util.List[Object]]()
val objectMapper = new ObjectMapper()
val arrayNode = objectMapper.createArrayNode()
iter.foreach(row => {
val line: util.List[Object] = new util.ArrayList[Object](maxRowCount)
for (i <- 0 until row.numFields) {
val field = row.copy().getUTF8String(i)
line.add(field.asInstanceOf[AnyRef])
arrayNode.add(objectMapper.readTree(field.toString))
}
rowsBuffer.add(line)
if (rowsBuffer.size > maxRowCount - 1) {
if (arrayNode.size > maxRowCount - 1) {
flush
}
})
// flush buffer
if (!rowsBuffer.isEmpty) {
if (!arrayNode.isEmpty) {
flush
}

@@ -76,28 +76,28 @@ private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSe

for (i <- 0 to maxRetryTimes) {
try {
dorisStreamLoader.load(rowsBuffer)
rowsBuffer.clear()
dorisStreamLoader.load(arrayNode.toString)
arrayNode.removeAll()
loop.break()
}
catch {
case e: Exception =>
try {
logger.warn("Failed to load data on BE: {} node ", dorisStreamLoader.getLoadUrlStr)
//If the current BE node fails to execute Stream Load, randomly switch to other BE nodes and try again
dorisStreamLoader.setHostPort(RestService.randomBackendV2(settings,logger))
dorisStreamLoader.setHostPort(RestService.randomBackendV2(settings, logger))
Thread.sleep(1000 * i)
} catch {
case ex: InterruptedException =>
logger.warn("Data that failed to load : " + dorisStreamLoader.listToString(rowsBuffer))
logger.warn("Data that failed to load : " + arrayNode.toString)
Thread.currentThread.interrupt()
throw new IOException("unable to flush; interrupted while doing another attempt", e)
}
}
}

if(!rowsBuffer.isEmpty){
logger.warn("Data that failed to load : " + dorisStreamLoader.listToString(rowsBuffer))
if (!arrayNode.isEmpty) {
logger.warn("Data that failed to load : " + arrayNode.toString)
throw new IOException(s"Failed to load data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max retry times.")
}
}
@@ -0,0 +1,89 @@
package org.apache.doris.spark.sql

import org.apache.spark.sql.SparkSession
import org.junit.Test

class TestConnectorWriteDoris {

val dorisFeNodes = "127.0.0.1:8030"
val dorisUser = "root"
val dorisPwd = ""
val dorisTable = "test.test_order"

val kafkaServers = "127.0.0.1:9093"
val kafkaTopics = "test_spark"

@Test
def listDataWriteTest(): Unit = {
val spark = SparkSession.builder().master("local[*]").getOrCreate()
val df = spark.createDataFrame(Seq(
("1", 100, "待付款"),
("2", 200, "待发货"),
("3", 300, "已收货")
)).toDF("order_id", "order_amount", "order_status")
df.write
.format("doris")
.option("doris.fenodes", dorisFeNodes)
.option("doris.table.identifier", dorisTable)
.option("user", dorisUser)
.option("password", dorisPwd)
.option("sink.batch.size", 2)
.option("sink.max-retries", 2)
.save()
spark.stop()
}


@Test
def csvDataWriteTest(): Unit = {
val spark = SparkSession.builder().master("local[*]").getOrCreate()
val df = spark.read
.option("header", "true") // uses the first line as names of columns
.option("inferSchema", "true") // infers the input schema automatically from data
.csv("data.csv")
df.createTempView("tmp_tb")
val doris = spark.sql(
"""
|create TEMPORARY VIEW test_lh
|USING doris
|OPTIONS(
| "table.identifier"="test.test_lh",
| "fenodes"="127.0.0.1:8030",
| "user"="root",
| "password"=""
|);
|""".stripMargin)
spark.sql(
"""
|insert into test_lh select name,gender,age from tmp_tb ;
|""".stripMargin)
spark.stop()
}

@Test
def structuredStreamingWriteTest(): Unit = {
val spark = SparkSession.builder()
.master("local")
.getOrCreate()
val df = spark.readStream
.option("kafka.bootstrap.servers", kafkaServers)
.option("startingOffsets", "latest")
.option("subscribe", kafkaTopics)
.format("kafka")
.option("failOnDataLoss", false)
.load()

df.selectExpr("CAST(value AS STRING)")
.writeStream
.format("doris")
.option("checkpointLocation", "/tmp/test")
.option("doris.table.identifier", dorisTable)
.option("doris.fenodes", dorisFeNodes)
.option("user", dorisUser)
.option("password", dorisPwd)
.option("sink.batch.size", 2)
.option("sink.max-retries", 2)
.start().awaitTermination()
}

}

0 comments on commit b80046d

Please sign in to comment.