Skip to content
Permalink
Browse files
[Improvement](spark-connector) Add 'sink.batch.size' and 'sink.max-re…
…tries' options in spark-connector (#7281)

Add  `sink.batch.size` `sink.max-retries` options in `Doris Spark-connector`.
Be consistent with `link-connector` options .
eg:
```scala
   df.write
      .format("doris")
      // specify maximum number of lines in a single flushing
      .option("sink.batch.size",2048)
      // specify number of retries after writing failed
      .option("sink.max-retries",3)
      .save()
```
  • Loading branch information
chovy-3012 committed Dec 6, 2021
1 parent 95d494c commit c52825216cfe606c7bb85a164584cff41ef8394c
Showing 4 changed files with 16 additions and 6 deletions.
@@ -65,4 +65,12 @@ public interface ConfigurationOptions {
int DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT = 64;

String DORIS_WRITE_FIELDS = "doris.write.fields";

String SINK_BATCH_SIZE = "sink.batch.size";
String DORIS_SINK_BATCH_SIZE = "doris.sink.batch.size";
int SINK_BATCH_SIZE_DEFAULT = 1024;

String SINK_MAX_RETRIES = "sink.max-retries";
String DORIS_SINK_MAX_RETRIES = "doris.sink.max-retries";
int SINK_MAX_RETRIES_DEFAULT = 3;
}
@@ -59,8 +59,8 @@ private[sql] class DorisSourceProvider extends DataSourceRegister
// init stream loader
val dorisStreamLoader = new DorisStreamLoad(sparkSettings)

val maxRowCount = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_BATCH_SIZE, ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT)
val maxRetryTimes = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_REQUEST_RETRIES, ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT)
val maxRowCount = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE, ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT)
val maxRetryTimes = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES, ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT)

data.rdd.foreachPartition(partition => {
val rowsBuffer: util.List[util.List[Object]] = new util.ArrayList[util.List[Object]](maxRowCount)
@@ -98,8 +98,6 @@ private[sql] class DorisSourceProvider extends DataSourceRegister
case e: Exception =>
try {
Thread.sleep(1000 * i)
dorisStreamLoader.load(rowsBuffer)
rowsBuffer.clear()
} catch {
case ex: InterruptedException =>
Thread.currentThread.interrupt()
@@ -32,8 +32,8 @@ private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSe

private val logger: Logger = LoggerFactory.getLogger(classOf[DorisStreamLoadSink].getName)
@volatile private var latestBatchId = -1L
val maxRowCount: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_BATCH_SIZE, ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT)
val maxRetryTimes: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_REQUEST_RETRIES, ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT)
val maxRowCount: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE, ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT)
val maxRetryTimes: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES, ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT)
val dorisStreamLoader: DorisStreamLoad = CachedDorisStreamLoadClient.getOrCreate(settings)

override def addBatch(batchId: Long, data: DataFrame): Unit = {
@@ -67,6 +67,8 @@ class TestSparkConnector {
.option("password", dorisPwd)
//specify your field
.option("doris.write.field", "name,gender")
.option("sink.batch.size",2)
.option("sink.max-retries",2)
.save()
session.stop()
}
@@ -108,6 +110,8 @@ class TestSparkConnector {
.option("doris.fenodes", dorisFeNodes)
.option("user", dorisUser)
.option("password", dorisPwd)
.option("sink.batch.size",2)
.option("sink.max-retries",2)
.start().awaitTermination()
}
}

0 comments on commit c528252

Please sign in to comment.