Skip to content
Permalink
Browse files
[Flink] Fix bug of flink doris connector (#6655)
Flink-Doris-Connector do not support flink 1.13, refactor doris sink forma 
to not use GenericRowData. But to use RowData::FieldGetter.
  • Loading branch information
xhmz committed Sep 24, 2021
1 parent 0fa1220 commit 001b52c082944da2870c83bc61fa8c45c325cf0c
Showing 3 changed files with 127 additions and 111 deletions.
@@ -25,22 +25,24 @@
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.StringJoiner;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import static org.apache.flink.table.data.RowData.createFieldGetter;


/**
* DorisDynamicOutputFormat
@@ -69,12 +71,18 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> {
private transient ScheduledFuture<?> scheduledFuture;
private transient volatile Exception flushException;

public DorisDynamicOutputFormat(DorisOptions option, DorisReadOptions readOptions, DorisExecutionOptions executionOptions) {
private final RowData.FieldGetter[] fieldGetters;

public DorisDynamicOutputFormat(DorisOptions option, DorisReadOptions readOptions, DorisExecutionOptions executionOptions, LogicalType[] logicalTypes) {
this.options = option;
this.readOptions = readOptions;
this.executionOptions = executionOptions;
this.fieldDelimiter = executionOptions.getStreamLoadProp().getProperty(FIELD_DELIMITER_KEY, FIELD_DELIMITER_DEFAULT);
this.lineDelimiter = executionOptions.getStreamLoadProp().getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT);
this.fieldGetters = new RowData.FieldGetter[logicalTypes.length];
for (int i = 0; i < logicalTypes.length; i++) {
fieldGetters[i] = createFieldGetter(logicalTypes[i], i);
}
}

@Override
@@ -84,12 +92,12 @@ public void configure(Configuration configuration) {
@Override
public void open(int taskNumber, int numTasks) throws IOException {
dorisStreamLoad = new DorisStreamLoad(
getBackend(),
options.getTableIdentifier().split("\\.")[0],
options.getTableIdentifier().split("\\.")[1],
options.getUsername(),
options.getPassword(),
executionOptions.getStreamLoadProp());
getBackend(),
options.getTableIdentifier().split("\\.")[0],
options.getTableIdentifier().split("\\.")[1],
options.getUsername(),
options.getPassword(),
executionOptions.getStreamLoadProp());
LOG.info("Streamload BE:{}", dorisStreamLoad.getLoadUrlStr());

if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) {
@@ -126,9 +134,8 @@ public synchronized void writeRecord(RowData row) throws IOException {

private void addBatch(RowData row) {
StringJoiner value = new StringJoiner(this.fieldDelimiter);
GenericRowData rowData = (GenericRowData) row;
for (int i = 0; i < row.getArity(); ++i) {
Object field = rowData.getField(i);
for (int i = 0; i < row.getArity() && i < fieldGetters.length; ++i) {
Object field = fieldGetters[i].getFieldOrNull(row);
if (field != null) {
value.add(field.toString());
} else {
@@ -213,6 +220,7 @@ public static class Builder {
private DorisOptions.Builder optionsBuilder;
private DorisReadOptions readOptions;
private DorisExecutionOptions executionOptions;
private DataType[] fieldDataTypes;

public Builder() {
this.optionsBuilder = DorisOptions.builder();
@@ -248,9 +256,18 @@ public Builder setExecutionOptions(DorisExecutionOptions executionOptions) {
return this;
}

public Builder setFieldDataTypes(DataType[] fieldDataTypes) {
this.fieldDataTypes = fieldDataTypes;
return this;
}

public DorisDynamicOutputFormat build() {
final LogicalType[] logicalTypes =
Arrays.stream(fieldDataTypes)
.map(DataType::getLogicalType)
.toArray(LogicalType[]::new);
return new DorisDynamicOutputFormat(
optionsBuilder.build(), readOptions, executionOptions
optionsBuilder.build(), readOptions, executionOptions, logicalTypes
);
}
}
@@ -38,15 +38,7 @@
import java.util.Properties;
import java.util.Set;

import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_EXEC_MEM_LIMIT_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.*;

/**
* The {@link DorisDynamicTableFactory} translates the catalog table to a table source.
@@ -63,92 +55,92 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory

// doris options
private static final ConfigOption<String> DORIS_READ_FIELD = ConfigOptions
.key("doris.read.field")
.stringType()
.noDefaultValue()
.withDescription("List of column names in the Doris table, separated by commas");
.key("doris.read.field")
.stringType()
.noDefaultValue()
.withDescription("List of column names in the Doris table, separated by commas");

private static final ConfigOption<String> DORIS_FILTER_QUERY = ConfigOptions
.key("doris.filter.query")
.stringType()
.noDefaultValue()
.withDescription("Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering");
.key("doris.filter.query")
.stringType()
.noDefaultValue()
.withDescription("Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering");

private static final ConfigOption<Integer> DORIS_TABLET_SIZE = ConfigOptions
.key("doris.request.tablet.size")
.intType()
.defaultValue(DORIS_TABLET_SIZE_DEFAULT)
.withDescription("");
.key("doris.request.tablet.size")
.intType()
.defaultValue(DORIS_TABLET_SIZE_DEFAULT)
.withDescription("");

private static final ConfigOption<Integer> DORIS_REQUEST_CONNECT_TIMEOUT_MS = ConfigOptions
.key("doris.request.connect.timeout.ms")
.intType()
.defaultValue(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT)
.withDescription("");
.key("doris.request.connect.timeout.ms")
.intType()
.defaultValue(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT)
.withDescription("");

private static final ConfigOption<Integer> DORIS_REQUEST_READ_TIMEOUT_MS = ConfigOptions
.key("doris.request.read.timeout.ms")
.intType()
.defaultValue(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT)
.withDescription("");
.key("doris.request.read.timeout.ms")
.intType()
.defaultValue(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT)
.withDescription("");

private static final ConfigOption<Integer> DORIS_REQUEST_QUERY_TIMEOUT_S = ConfigOptions
.key("doris.request.query.timeout.s")
.intType()
.defaultValue(DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT)
.withDescription("");
.key("doris.request.query.timeout.s")
.intType()
.defaultValue(DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT)
.withDescription("");

private static final ConfigOption<Integer> DORIS_REQUEST_RETRIES = ConfigOptions
.key("doris.request.retries")
.intType()
.defaultValue(DORIS_REQUEST_RETRIES_DEFAULT)
.withDescription("");
.key("doris.request.retries")
.intType()
.defaultValue(DORIS_REQUEST_RETRIES_DEFAULT)
.withDescription("");

private static final ConfigOption<Boolean> DORIS_DESERIALIZE_ARROW_ASYNC = ConfigOptions
.key("doris.deserialize.arrow.async")
.booleanType()
.defaultValue(DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT)
.withDescription("");
.key("doris.deserialize.arrow.async")
.booleanType()
.defaultValue(DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT)
.withDescription("");

private static final ConfigOption<Integer> DORIS_DESERIALIZE_QUEUE_SIZE = ConfigOptions
.key("doris.request.retriesdoris.deserialize.queue.size")
.intType()
.defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT)
.withDescription("");
.key("doris.request.retriesdoris.deserialize.queue.size")
.intType()
.defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT)
.withDescription("");


private static final ConfigOption<Integer> DORIS_BATCH_SIZE = ConfigOptions
.key("doris.batch.size")
.intType()
.defaultValue(DORIS_BATCH_SIZE_DEFAULT)
.withDescription("");
.key("doris.batch.size")
.intType()
.defaultValue(DORIS_BATCH_SIZE_DEFAULT)
.withDescription("");

private static final ConfigOption<Long> DORIS_EXEC_MEM_LIMIT = ConfigOptions
.key("doris.exec.mem.limit")
.longType()
.defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT)
.withDescription("");
.key("doris.exec.mem.limit")
.longType()
.defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT)
.withDescription("");

// flink write config options
private static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS = ConfigOptions
.key("sink.batch.size")
.intType()
.defaultValue(100)
.withDescription("the flush max size (includes all append, upsert and delete records), over this number" +
" of records, will flush data. The default value is 100.");
.key("sink.batch.size")
.intType()
.defaultValue(100)
.withDescription("the flush max size (includes all append, upsert and delete records), over this number" +
" of records, will flush data. The default value is 100.");

private static final ConfigOption<Integer> SINK_MAX_RETRIES = ConfigOptions
.key("sink.max-retries")
.intType()
.defaultValue(3)
.withDescription("the max retry times if writing records to database failed.");
.key("sink.max-retries")
.intType()
.defaultValue(3)
.withDescription("the max retry times if writing records to database failed.");

private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL = ConfigOptions
.key("sink.batch.interval")
.durationType()
.defaultValue(Duration.ofSeconds(1))
.withDescription("the flush interval mills, over this time, asynchronous threads will flush data. The " +
"default value is 1s.");
.key("sink.batch.interval")
.durationType()
.defaultValue(Duration.ofSeconds(1))
.withDescription("the flush interval mills, over this time, asynchronous threads will flush data. The " +
"default value is 1s.");


// Prefix for Doris StreamLoad specific properties.
@@ -207,16 +199,16 @@ public DynamicTableSource createDynamicTableSource(Context context) {
TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
// create and return dynamic table source
return new DorisDynamicTableSource(
getDorisOptions(helper.getOptions()),
getDorisReadOptions(helper.getOptions()),
physicalSchema);
getDorisOptions(helper.getOptions()),
getDorisReadOptions(helper.getOptions()),
physicalSchema);
}

private DorisOptions getDorisOptions(ReadableConfig readableConfig) {
final String fenodes = readableConfig.get(FENODES);
final DorisOptions.Builder builder = DorisOptions.builder()
.setFenodes(fenodes)
.setTableIdentifier(readableConfig.get(TABLE_IDENTIFIER));
.setFenodes(fenodes)
.setTableIdentifier(readableConfig.get(TABLE_IDENTIFIER));

readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername);
readableConfig.getOptional(PASSWORD).ifPresent(builder::setPassword);
@@ -226,16 +218,16 @@ private DorisOptions getDorisOptions(ReadableConfig readableConfig) {
private DorisReadOptions getDorisReadOptions(ReadableConfig readableConfig) {
final DorisReadOptions.Builder builder = DorisReadOptions.builder();
builder.setDeserializeArrowAsync(readableConfig.get(DORIS_DESERIALIZE_ARROW_ASYNC))
.setDeserializeQueueSize(readableConfig.get(DORIS_DESERIALIZE_QUEUE_SIZE))
.setExecMemLimit(readableConfig.get(DORIS_EXEC_MEM_LIMIT))
.setFilterQuery(readableConfig.get(DORIS_FILTER_QUERY))
.setReadFields(readableConfig.get(DORIS_READ_FIELD))
.setRequestQueryTimeoutS(readableConfig.get(DORIS_REQUEST_QUERY_TIMEOUT_S))
.setRequestBatchSize(readableConfig.get(DORIS_BATCH_SIZE))
.setRequestConnectTimeoutMs(readableConfig.get(DORIS_REQUEST_CONNECT_TIMEOUT_MS))
.setRequestReadTimeoutMs(readableConfig.get(DORIS_REQUEST_READ_TIMEOUT_MS))
.setRequestRetries(readableConfig.get(DORIS_REQUEST_RETRIES))
.setRequestTabletSize(readableConfig.get(DORIS_TABLET_SIZE));
.setDeserializeQueueSize(readableConfig.get(DORIS_DESERIALIZE_QUEUE_SIZE))
.setExecMemLimit(readableConfig.get(DORIS_EXEC_MEM_LIMIT))
.setFilterQuery(readableConfig.get(DORIS_FILTER_QUERY))
.setReadFields(readableConfig.get(DORIS_READ_FIELD))
.setRequestQueryTimeoutS(readableConfig.get(DORIS_REQUEST_QUERY_TIMEOUT_S))
.setRequestBatchSize(readableConfig.get(DORIS_BATCH_SIZE))
.setRequestConnectTimeoutMs(readableConfig.get(DORIS_REQUEST_CONNECT_TIMEOUT_MS))
.setRequestReadTimeoutMs(readableConfig.get(DORIS_REQUEST_READ_TIMEOUT_MS))
.setRequestRetries(readableConfig.get(DORIS_REQUEST_RETRIES))
.setRequestTabletSize(readableConfig.get(DORIS_TABLET_SIZE));
return builder.build();
}

@@ -267,11 +259,14 @@ public DynamicTableSink createDynamicTableSink(Context context) {
helper.validateExcept(STREAM_LOAD_PROP_PREFIX);

Properties streamLoadProp = getStreamLoadProp(context.getCatalogTable().getOptions());
TableSchema physicalSchema =
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
// create and return dynamic table source
return new DorisDynamicTableSink(
getDorisOptions(helper.getOptions()),
getDorisReadOptions(helper.getOptions()),
getDorisExecutionOptions(helper.getOptions(), streamLoadProp)
getDorisOptions(helper.getOptions()),
getDorisReadOptions(helper.getOptions()),
getDorisExecutionOptions(helper.getOptions(), streamLoadProp),
physicalSchema
);
}
}

0 comments on commit 001b52c

Please sign in to comment.