Skip to content
Permalink
Browse files
[Flink]Simplify the use of flink connector (#6892)
1. Simplify the use of flink connector like other stream sink by GenericDorisSinkFunction.
2. Add the use cases of flink connector.

## Use case
```
env.fromElements("{\"longitude\": \"116.405419\", \"city\": \"北京\", \"latitude\": \"39.916927\"}")
     .addSink(
          DorisSink.sink(
             DorisOptions.builder()
                   .setFenodes("FE_IP:8030")
                   .setTableIdentifier("db.table")
                   .setUsername("root")
                   .setPassword("").build()
                ));
```
  • Loading branch information
xiaokangguo committed Oct 23, 2021
1 parent c6bb1b4 commit 340ac7e72d11ad03c66581d9580eac0a3e08ea71
Showing 7 changed files with 517 additions and 36 deletions.
@@ -20,7 +20,6 @@
import org.apache.flink.util.Preconditions;

import java.io.Serializable;
import java.time.Duration;
import java.util.Properties;

/**
@@ -29,6 +28,10 @@
public class DorisExecutionOptions implements Serializable {
private static final long serialVersionUID = 1L;

public static final Integer DEFAULT_BATCH_SIZE = 1000;
public static final Integer DEFAULT_MAX_RETRY_TIMES = 3;
private static final Long DEFAULT_INTERVAL_MILLIS = 10000L;

private final Integer batchSize;
private final Integer maxRetries;
private final Long batchIntervalMs;
@@ -66,14 +69,21 @@ public static Builder builder() {
return new Builder();
}

public static DorisExecutionOptions defaults() {
Properties pro = new Properties();
pro.setProperty("format", "json");
pro.setProperty("strip_outer_array", "true");
return new Builder().setStreamLoadProp(pro).build();
}

/**
* Builder of {@link DorisExecutionOptions}.
*/
public static class Builder {
private Integer batchSize;
private Integer maxRetries;
private Long batchIntervalMs;
private Properties streamLoadProp;
private Integer batchSize = DEFAULT_BATCH_SIZE;
private Integer maxRetries = DEFAULT_MAX_RETRY_TIMES;
private Long batchIntervalMs = DEFAULT_INTERVAL_MILLIS;
private Properties streamLoadProp = new Properties();

public Builder setBatchSize(Integer batchSize) {
this.batchSize = batchSize;
@@ -103,6 +103,10 @@ public static Builder builder() {
return new Builder();
}

public static DorisReadOptions defaults(){
return DorisReadOptions.builder().build();
}

/**
* Builder of {@link DorisReadOptions}.
*/
@@ -179,6 +183,7 @@ public Builder setDeserializeArrowAsync(Boolean deserializeArrowAsync) {
public DorisReadOptions build() {
return new DorisReadOptions(readFields, filterQuery, requestTabletSize, requestConnectTimeoutMs, requestReadTimeoutMs, requestQueryTimeoutS, requestRetries, requestBatchSize, execMemLimit, deserializeQueueSize, deserializeArrowAsync);
}

}


@@ -0,0 +1,95 @@
package org.apache.doris.flink.cfg;

import org.apache.doris.flink.table.DorisDynamicOutputFormat;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.types.logical.LogicalType;

/** Facade to create Doris {@link SinkFunction sinks}. */
public class DorisSink {


private DorisSink() {
}


/**
* Create a Doris DataStream sink with the default {@link DorisReadOptions}
* stream elements could only be JsonString.
*
* @see #sink(String[], LogicalType[], DorisReadOptions, DorisExecutionOptions, DorisOptions)
*/
public static <T> SinkFunction<T> sink(DorisExecutionOptions executionOptions, DorisOptions dorisOptions) {

return sink(new String[]{}, new LogicalType[]{}, DorisReadOptions.defaults(), executionOptions, dorisOptions);
}

/**
* Create a Doris DataStream sink with the default {@link DorisReadOptions}
* stream elements could only be RowData.
*
* @see #sink(String[], LogicalType[], DorisReadOptions, DorisExecutionOptions, DorisOptions)
*/
public static <T> SinkFunction<T> sink(String[] fiels, LogicalType[] types,
DorisExecutionOptions executionOptions, DorisOptions dorisOptions) {

return sink(fiels, types, DorisReadOptions.defaults(), executionOptions, dorisOptions);
}

/**
* Create a Doris DataStream sink with the default {@link DorisExecutionOptions}
* stream elements could only be JsonString.
*
* @see #sink(String[], LogicalType[], DorisReadOptions, DorisExecutionOptions, DorisOptions)
*/
public static <T> SinkFunction<T> sink(DorisOptions dorisOptions) {

return sink(new String[]{}, new LogicalType[]{}, DorisReadOptions.defaults(),
DorisExecutionOptions.defaults(), dorisOptions);
}

/**
* Create a Doris DataStream sink with the default {@link DorisExecutionOptions}
* stream elements could only be RowData.
*
* @see #sink(String[], LogicalType[], DorisReadOptions, DorisExecutionOptions, DorisOptions)
*/
public static <T> SinkFunction<T> sink(String[] fiels, LogicalType[] types, DorisOptions dorisOptions) {
return sink(fiels, types, DorisReadOptions.defaults(), DorisExecutionOptions.defaults(), dorisOptions);
}


/**
* Create a Doris DataStream sink, stream elements could only be JsonString.
*
* @see #sink(String[], LogicalType[], DorisReadOptions, DorisExecutionOptions, DorisOptions)
*/
public static <T> SinkFunction<T> sink(DorisReadOptions readOptions,
DorisExecutionOptions executionOptions, DorisOptions dorisOptions) {

return sink(new String[]{}, new LogicalType[]{}, readOptions, executionOptions, dorisOptions);
}


/**
* Create a Doris DataStream sink, stream elements could only be RowData.
*
* <p>Note: the objects passed to the return sink can be processed in batch and retried.
* Therefore, objects can not be {@link org.apache.flink.api.common.ExecutionConfig#enableObjectReuse() reused}.
* </p>
*
* @param field array of field
* @param types types of field
* @param readOptions parameters of read, such as readFields, filterQuery
* @param executionOptions parameters of execution, such as batch size and maximum retries
* @param dorisOptions parameters of options, such as fenodes, username, password, tableIdentifier
* @param <T> type of data in {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord
* StreamRecord}.
*/
public static <T> SinkFunction<T> sink(String[] field, LogicalType[] types, DorisReadOptions readOptions,
DorisExecutionOptions executionOptions, DorisOptions dorisOptions) {

return new GenericDorisSinkFunction(new DorisDynamicOutputFormat(
dorisOptions, readOptions, executionOptions, types, field));
}

}
@@ -0,0 +1,53 @@
package org.apache.doris.flink.cfg;

import org.apache.doris.flink.table.DorisDynamicOutputFormat;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nonnull;

public class GenericDorisSinkFunction<T> extends RichSinkFunction<T>
implements CheckpointedFunction {

private final DorisDynamicOutputFormat outputFormat;

public GenericDorisSinkFunction(@Nonnull DorisDynamicOutputFormat outputFormat) {
this.outputFormat = Preconditions.checkNotNull(outputFormat);
}

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
RuntimeContext ctx = getRuntimeContext();
outputFormat.setRuntimeContext(ctx);
outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
}


@Override
public void invoke(T value, Context context) throws Exception {
outputFormat.writeRecord(value);
}

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {

}

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {

}

@Override
public void close() throws Exception {
outputFormat.close();
super.close();
}

}
@@ -34,10 +34,10 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Arrays;
import java.util.StringJoiner;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -50,7 +50,7 @@
/**
* DorisDynamicOutputFormat
**/
public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> {
public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> {

private static final Logger LOG = LoggerFactory.getLogger(DorisDynamicOutputFormat.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@@ -88,8 +88,10 @@ public DorisDynamicOutputFormat(DorisOptions option,
this.options = option;
this.readOptions = readOptions;
this.executionOptions = executionOptions;
this.fieldDelimiter = executionOptions.getStreamLoadProp().getProperty(FIELD_DELIMITER_KEY, FIELD_DELIMITER_DEFAULT);
this.lineDelimiter = executionOptions.getStreamLoadProp().getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT);
this.fieldDelimiter = executionOptions.getStreamLoadProp().getProperty(FIELD_DELIMITER_KEY,
FIELD_DELIMITER_DEFAULT);
this.lineDelimiter = executionOptions.getStreamLoadProp().getProperty(LINE_DELIMITER_KEY,
LINE_DELIMITER_DEFAULT);
this.fieldNames = fieldNames;
this.jsonFormat = FORMAT_JSON_VALUE.equals(executionOptions.getStreamLoadProp().getProperty(FORMAT_KEY));
this.fieldGetters = new RowData.FieldGetter[logicalTypes.length];
@@ -98,23 +100,25 @@ public DorisDynamicOutputFormat(DorisOptions option,
}
}


@Override
public void configure(Configuration configuration) {
}

@Override
public void open(int taskNumber, int numTasks) throws IOException {
dorisStreamLoad = new DorisStreamLoad(
getBackend(),
options.getTableIdentifier().split("\\.")[0],
options.getTableIdentifier().split("\\.")[1],
options.getUsername(),
options.getPassword(),
executionOptions.getStreamLoadProp());
getBackend(),
options.getTableIdentifier().split("\\.")[0],
options.getTableIdentifier().split("\\.")[1],
options.getUsername(),
options.getPassword(),
executionOptions.getStreamLoadProp());
LOG.info("Streamload BE:{}", dorisStreamLoad.getLoadUrlStr());

if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) {
this.scheduler = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("doris-streamload-output-format"));
this.scheduler = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("doris-streamload-output" +
"-format"));
this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
synchronized (DorisDynamicOutputFormat.this) {
if (!closed) {
@@ -136,30 +140,37 @@ private void checkFlushException() {
}

@Override
public synchronized void writeRecord(RowData row) throws IOException {
public synchronized void writeRecord(T row) throws IOException {
checkFlushException();

addBatch(row);
if (executionOptions.getBatchSize() > 0 && batch.size() >= executionOptions.getBatchSize()) {
flush();
}
}

private void addBatch(RowData row) {
Map<String, String> valueMap = new HashMap<>();
StringJoiner value = new StringJoiner(this.fieldDelimiter);
for (int i = 0; i < row.getArity() && i < fieldGetters.length; ++i) {
Object field = fieldGetters[i].getFieldOrNull(row);
if (jsonFormat) {
String data = field != null ? field.toString() : null;
valueMap.put(this.fieldNames[i], data);
} else {
String data = field != null ? field.toString() : NULL_VALUE;
value.add(data);
private void addBatch(T row) {
if (row instanceof RowData) {
RowData rowData = (RowData) row;
Map<String, String> valueMap = new HashMap<>();
StringJoiner value = new StringJoiner(this.fieldDelimiter);
for (int i = 0; i < rowData.getArity() && i < fieldGetters.length; ++i) {
Object field = fieldGetters[i].getFieldOrNull(rowData);
if (jsonFormat) {
String data = field != null ? field.toString() : null;
valueMap.put(this.fieldNames[i], data);
} else {
String data = field != null ? field.toString() : NULL_VALUE;
value.add(data);
}
}
Object data = jsonFormat ? valueMap : value.toString();
batch.add(data);

} else if (row instanceof String) {
batch.add(row);
} else {
throw new RuntimeException("The type of element should be 'RowData' or 'String' only.");
}
Object data = jsonFormat ? valueMap : value.toString();
batch.add(data);
}

@Override
@@ -189,7 +200,11 @@ public synchronized void flush() throws IOException {
}
String result;
if (jsonFormat) {
result = OBJECT_MAPPER.writeValueAsString(batch);
if (batch.get(0) instanceof String) {
result = batch.toString();
} else {
result = OBJECT_MAPPER.writeValueAsString(batch);
}
} else {
result = String.join(this.lineDelimiter, batch);
}
@@ -292,11 +307,11 @@ public Builder setFieldDataTypes(DataType[] fieldDataTypes) {

public DorisDynamicOutputFormat build() {
final LogicalType[] logicalTypes =
Arrays.stream(fieldDataTypes)
.map(DataType::getLogicalType)
.toArray(LogicalType[]::new);
Arrays.stream(fieldDataTypes)
.map(DataType::getLogicalType)
.toArray(LogicalType[]::new);
return new DorisDynamicOutputFormat(
optionsBuilder.build(), readOptions, executionOptions, logicalTypes, fieldNames
optionsBuilder.build(), readOptions, executionOptions, logicalTypes, fieldNames
);
}
}

0 comments on commit 340ac7e

Please sign in to comment.