Skip to content
Permalink
Browse files
[Feature]:Flink-connector supports streamload parameters (#6243)
Flink-connector supports streamload parameters
#6199
  • Loading branch information
JNSimba committed Aug 9, 2021
1 parent aa4013d commit ba29ce0d7c8ba8427c1d70d13b37d1592f1279df
Showing 22 changed files with 624 additions and 538 deletions.
@@ -112,6 +112,7 @@ private void close() {

/**
* Open a scanner for reading Doris data.
*
* @param openParams thrift struct to required by request
* @return scan open result
* @throws ConnectedFailedException throw if cannot connect to Doris BE
@@ -147,6 +148,7 @@ public TScanOpenResult openScanner(TScanOpenParams openParams) throws ConnectedF

/**
* get next row batch from Doris BE
*
* @param nextBatchParams thrift struct to required by request
* @return scan batch result
* @throws ConnectedFailedException throw if cannot connect to Doris BE
@@ -161,7 +163,7 @@ public TScanBatchResult getNext(TScanNextBatchParams nextBatchParams) throws Dor
for (int attempt = 0; attempt < retries; ++attempt) {
logger.debug("Attempt {} to getNext {}.", attempt, routing);
try {
result = client.get_next(nextBatchParams);
result = client.get_next(nextBatchParams);
if (result == null) {
logger.warn("GetNext result from {} is null.", routing);
continue;
@@ -189,6 +191,7 @@ public TScanBatchResult getNext(TScanNextBatchParams nextBatchParams) throws Dor

/**
* close an scanner.
*
* @param closeParams thrift struct to required by request
*/
public void closeScanner(TScanCloseParams closeParams) {
@@ -21,7 +21,7 @@
import java.io.Serializable;

/**
* Doris connection options.
* Doris connection options.
*/
public class DorisConnectionOptions implements Serializable {

@@ -21,22 +21,29 @@

import java.io.Serializable;
import java.time.Duration;
import java.util.Properties;

/**
* JDBC sink batch options.
*/
public class DorisExecutionOptions implements Serializable {
public class DorisExecutionOptions implements Serializable {
private static final long serialVersionUID = 1L;

private final Integer batchSize;
private final Integer maxRetries;
private final Long batchIntervalMs;

public DorisExecutionOptions(Integer batchSize, Integer maxRetries,Long batchIntervalMs) {
/**
* Properties for the StreamLoad.
*/
private final Properties streamLoadProp;

public DorisExecutionOptions(Integer batchSize, Integer maxRetries, Long batchIntervalMs, Properties streamLoadProp) {
Preconditions.checkArgument(maxRetries >= 0);
this.batchSize = batchSize;
this.maxRetries = maxRetries;
this.batchIntervalMs = batchIntervalMs;
this.streamLoadProp = streamLoadProp;
}

public Integer getBatchSize() {
@@ -51,6 +58,10 @@ public Long getBatchIntervalMs() {
return batchIntervalMs;
}

public Properties getStreamLoadProp() {
return streamLoadProp;
}

public static Builder builder() {
return new Builder();
}
@@ -62,6 +73,7 @@ public static class Builder {
private Integer batchSize;
private Integer maxRetries;
private Long batchIntervalMs;
private Properties streamLoadProp;

public Builder setBatchSize(Integer batchSize) {
this.batchSize = batchSize;
@@ -78,8 +90,13 @@ public Builder setBatchIntervalMs(Long batchIntervalMs) {
return this;
}

public Builder setStreamLoadProp(Properties streamLoadProp) {
this.streamLoadProp = streamLoadProp;
return this;
}

public DorisExecutionOptions build() {
return new DorisExecutionOptions(batchSize,maxRetries,batchIntervalMs);
return new DorisExecutionOptions(batchSize, maxRetries, batchIntervalMs, streamLoadProp);
}
}

@@ -25,7 +25,7 @@
/**
* Options for the Doris connector.
*/
public class DorisOptions extends DorisConnectionOptions{
public class DorisOptions extends DorisConnectionOptions {

private static final long serialVersionUID = 1L;

@@ -22,7 +22,7 @@
/**
* Doris read Options
*/
public class DorisReadOptions implements Serializable {
public class DorisReadOptions implements Serializable {

private static final long serialVersionUID = 1L;

@@ -35,7 +35,7 @@ public class DorisReadOptions implements Serializable {
private Integer requestRetries;
private Integer requestBatchSize;
private Long execMemLimit;
private Integer deserializeQueueSize;
private Integer deserializeQueueSize;
private Boolean deserializeArrowAsync;

public DorisReadOptions(String readFields, String filterQuery, Integer requestTabletSize, Integer requestConnectTimeoutMs, Integer requestReadTimeoutMs,
@@ -117,7 +117,7 @@ public static class Builder {
private Integer requestRetries;
private Integer requestBatchSize;
private Long execMemLimit;
private Integer deserializeQueueSize;
private Integer deserializeQueueSize;
private Boolean deserializeArrowAsync;


@@ -177,7 +177,7 @@ public Builder setDeserializeArrowAsync(Boolean deserializeArrowAsync) {
}

public DorisReadOptions build() {
return new DorisReadOptions(readFields,filterQuery,requestTabletSize,requestConnectTimeoutMs,requestReadTimeoutMs,requestQueryTimeoutS,requestRetries,requestBatchSize,execMemLimit,deserializeQueueSize,deserializeArrowAsync);
return new DorisReadOptions(readFields, filterQuery, requestTabletSize, requestConnectTimeoutMs, requestReadTimeoutMs, requestQueryTimeoutS, requestRetries, requestBatchSize, execMemLimit, deserializeQueueSize, deserializeArrowAsync);
}
}

@@ -38,15 +38,15 @@

public class DorisSourceFunction<T> extends RichSourceFunction<T> implements ResultTypeQueryable<T> {

private static final Logger logger = LoggerFactory.getLogger(DorisSourceFunction.class);
private static final Logger logger = LoggerFactory.getLogger(DorisSourceFunction.class);

private DorisDeserializationSchema deserializer;
private DorisOptions options;
private DorisReadOptions readOptions;
private List<PartitionDefinition> dorisPartitions;
private List<PartitionDefinition> dorisPartitions;
private ScalaValueReader scalaValueReader;

public DorisSourceFunction(DorisStreamOptions streamOptions, DorisDeserializationSchema deserializer) {
public DorisSourceFunction(DorisStreamOptions streamOptions, DorisDeserializationSchema deserializer) {
this.deserializer = deserializer;
this.options = streamOptions.getOptions();
this.readOptions = streamOptions.getReadOptions();
@@ -55,14 +55,14 @@ public DorisSourceFunction(DorisStreamOptions streamOptions, DorisDeserializatio
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.dorisPartitions = RestService.findPartitions(options,readOptions,logger);
this.dorisPartitions = RestService.findPartitions(options, readOptions, logger);
}

@Override
public void run(SourceContext sourceContext) throws Exception{
for(PartitionDefinition partitions : dorisPartitions){
scalaValueReader = new ScalaValueReader(partitions, options,readOptions);
while (scalaValueReader.hasNext()){
public void run(SourceContext sourceContext) throws Exception {
for (PartitionDefinition partitions : dorisPartitions) {
scalaValueReader = new ScalaValueReader(partitions, options, readOptions);
while (scalaValueReader.hasNext()) {
Object next = scalaValueReader.next();
sourceContext.collect(next);
}
@@ -18,10 +18,11 @@


import org.apache.flink.api.common.typeinfo.TypeInformation;

import java.util.List;


public class SimpleListDeserializationSchema implements DorisDeserializationSchema{
public class SimpleListDeserializationSchema implements DorisDeserializationSchema {

@Override
public TypeInformation getProducedType() {
@@ -21,18 +21,22 @@ public class DorisException extends Exception {
public DorisException() {
super();
}

public DorisException(String message) {
super(message);
}

public DorisException(String message, Throwable cause) {
super(message, cause);
}

public DorisException(Throwable cause) {
super(cause);
}

protected DorisException(String message, Throwable cause,
boolean enableSuppression,
boolean writableStackTrace) {
boolean enableSuppression,
boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
@@ -17,4 +17,5 @@

package org.apache.doris.flink.exception;

public class ShouldNeverHappenException extends DorisException { }
public class ShouldNeverHappenException extends DorisException {
}
@@ -21,15 +21,19 @@ public class StreamLoadException extends Exception {
public StreamLoadException() {
super();
}

public StreamLoadException(String message) {
super(message);
}

public StreamLoadException(String message, Throwable cause) {
super(message, cause);
}

public StreamLoadException(Throwable cause) {
super(cause);
}

protected StreamLoadException(String message, Throwable cause,
boolean enableSuppression,
boolean writableStackTrace) {
@@ -103,7 +103,7 @@ public int compareTo(PartitionDefinition o) {
similar.retainAll(o.tabletIds);
diffSelf.removeAll(similar);
diffOther.removeAll(similar);
if (diffSelf.size() == 0) {
if (diffSelf.size() == 0) {
return 0;
}
long diff = Collections.min(diffSelf) - Collections.min(diffOther);

0 comments on commit ba29ce0

Please sign in to comment.