Skip to content
Permalink
Browse files
[Fix] Flink connector support json import and use httpclient to strea…
…mlaod (#6740)

* [Bug]:fix when data null , throw NullPointerException

* [Bug]:Distinguish between null and empty string

* [Feature]:flink-connector supports streamload parameters

* [Fix]:code style

* [Fix]: support json format import and use httpclient to streamload

* [Fix]:remove System out

* [Fix]:upgrade httpclient  version

* [Doc]: add json format import doc

Co-authored-by: wudi <wud3@shuhaisc.com>
  • Loading branch information
JNSimba and wudi committed Sep 28, 2021
1 parent 001b52c commit 4ef1b10f8c426883530a6307376ae94c5f6cf3f5
Showing 6 changed files with 150 additions and 109 deletions.
15 pom.xml
@@ -118,6 +118,21 @@
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>${libthrift.version}</version>
<exclusions>
<exclusion>
<artifactId>httpclient</artifactId>
<groupId>org.apache.httpcomponents</groupId>
</exclusion>
<exclusion>
<artifactId>httpcore</artifactId>
<groupId>org.apache.httpcomponents</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
@@ -16,6 +16,7 @@
// under the License.
package org.apache.doris.flink.table;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
@@ -33,8 +34,10 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Arrays;
import java.util.StringJoiner;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -50,35 +53,45 @@
public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> {

private static final Logger LOG = LoggerFactory.getLogger(DorisDynamicOutputFormat.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

private static final String FIELD_DELIMITER_KEY = "column_separator";
private static final String FIELD_DELIMITER_DEFAULT = "\t";
private static final String LINE_DELIMITER_KEY = "line_delimiter";
private static final String LINE_DELIMITER_DEFAULT = "\n";
private static final String FORMAT_KEY = "format";
private static final String FORMAT_JSON_VALUE = "json";
private static final String NULL_VALUE = "\\N";

private final String fieldDelimiter;
private final String lineDelimiter;

private final String[] fieldNames;
private final boolean jsonFormat;
private DorisOptions options;
private DorisReadOptions readOptions;
private DorisExecutionOptions executionOptions;
private DorisStreamLoad dorisStreamLoad;
private final RowData.FieldGetter[] fieldGetters;


private final List<String> batch = new ArrayList<>();
private final List batch = new ArrayList<>();
private transient volatile boolean closed = false;

private transient ScheduledExecutorService scheduler;
private transient ScheduledFuture<?> scheduledFuture;
private transient volatile Exception flushException;

private final RowData.FieldGetter[] fieldGetters;

public DorisDynamicOutputFormat(DorisOptions option, DorisReadOptions readOptions, DorisExecutionOptions executionOptions, LogicalType[] logicalTypes) {
public DorisDynamicOutputFormat(DorisOptions option,
DorisReadOptions readOptions,
DorisExecutionOptions executionOptions,
LogicalType[] logicalTypes,
String[] fieldNames) {
this.options = option;
this.readOptions = readOptions;
this.executionOptions = executionOptions;
this.fieldDelimiter = executionOptions.getStreamLoadProp().getProperty(FIELD_DELIMITER_KEY, FIELD_DELIMITER_DEFAULT);
this.lineDelimiter = executionOptions.getStreamLoadProp().getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT);
this.fieldNames = fieldNames;
this.jsonFormat = FORMAT_JSON_VALUE.equals(executionOptions.getStreamLoadProp().getProperty(FORMAT_KEY));
this.fieldGetters = new RowData.FieldGetter[logicalTypes.length];
for (int i = 0; i < logicalTypes.length; i++) {
fieldGetters[i] = createFieldGetter(logicalTypes[i], i);
@@ -133,16 +146,20 @@ public synchronized void writeRecord(RowData row) throws IOException {
}

private void addBatch(RowData row) {
Map<String, String> valueMap = new HashMap<>();
StringJoiner value = new StringJoiner(this.fieldDelimiter);
for (int i = 0; i < row.getArity() && i < fieldGetters.length; ++i) {
Object field = fieldGetters[i].getFieldOrNull(row);
if (field != null) {
value.add(field.toString());
if (jsonFormat) {
String data = field != null ? field.toString() : null;
valueMap.put(this.fieldNames[i], data);
} else {
value.add(NULL_VALUE);
String data = field != null ? field.toString() : NULL_VALUE;
value.add(data);
}
}
batch.add(value.toString());
Object data = jsonFormat ? valueMap : value.toString();
batch.add(data);
}

@Override
@@ -170,9 +187,15 @@ public synchronized void flush() throws IOException {
if (batch.isEmpty()) {
return;
}
String result;
if (jsonFormat) {
result = OBJECT_MAPPER.writeValueAsString(batch);
} else {
result = String.join(this.lineDelimiter, batch);
}
for (int i = 0; i <= executionOptions.getMaxRetries(); i++) {
try {
dorisStreamLoad.load(String.join(this.lineDelimiter, batch));
dorisStreamLoad.load(result);
batch.clear();
break;
} catch (StreamLoadException e) {
@@ -221,6 +244,7 @@ public static class Builder {
private DorisReadOptions readOptions;
private DorisExecutionOptions executionOptions;
private DataType[] fieldDataTypes;
private String[] fieldNames;

public Builder() {
this.optionsBuilder = DorisOptions.builder();
@@ -256,6 +280,11 @@ public Builder setExecutionOptions(DorisExecutionOptions executionOptions) {
return this;
}

public Builder setFieldNames(String[] fieldNames) {
this.fieldNames = fieldNames;
return this;
}

public Builder setFieldDataTypes(DataType[] fieldDataTypes) {
this.fieldDataTypes = fieldDataTypes;
return this;
@@ -267,7 +296,7 @@ public DorisDynamicOutputFormat build() {
.map(DataType::getLogicalType)
.toArray(LogicalType[]::new);
return new DorisDynamicOutputFormat(
optionsBuilder.build(), readOptions, executionOptions, logicalTypes
optionsBuilder.build(), readOptions, executionOptions, logicalTypes, fieldNames
);
}
}
@@ -38,7 +38,15 @@
import java.util.Properties;
import java.util.Set;

import static org.apache.doris.flink.cfg.ConfigurationOptions.*;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_EXEC_MEM_LIMIT_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT;

/**
* The {@link DorisDynamicTableFactory} translates the catalog table to a table source.
@@ -260,13 +268,13 @@ public DynamicTableSink createDynamicTableSink(Context context) {

Properties streamLoadProp = getStreamLoadProp(context.getCatalogTable().getOptions());
TableSchema physicalSchema =
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
// create and return dynamic table source
return new DorisDynamicTableSink(
getDorisOptions(helper.getOptions()),
getDorisReadOptions(helper.getOptions()),
getDorisExecutionOptions(helper.getOptions(), streamLoadProp),
physicalSchema
getDorisOptions(helper.getOptions()),
getDorisReadOptions(helper.getOptions()),
getDorisExecutionOptions(helper.getOptions(), streamLoadProp),
physicalSchema
);
}
}
@@ -35,7 +35,10 @@ public class DorisDynamicTableSink implements DynamicTableSink {
private final DorisExecutionOptions executionOptions;
private final TableSchema tableSchema;

public DorisDynamicTableSink(DorisOptions options, DorisReadOptions readOptions, DorisExecutionOptions executionOptions, TableSchema tableSchema) {
public DorisDynamicTableSink(DorisOptions options,
DorisReadOptions readOptions,
DorisExecutionOptions executionOptions,
TableSchema tableSchema) {
this.options = options;
this.readOptions = readOptions;
this.executionOptions = executionOptions;
@@ -60,8 +63,8 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
.setTableIdentifier(options.getTableIdentifier())
.setReadOptions(readOptions)
.setExecutionOptions(executionOptions)
.setFieldDataTypes(tableSchema.getFieldDataTypes());;

.setFieldDataTypes(tableSchema.getFieldDataTypes())
.setFieldNames(tableSchema.getFieldNames());
return OutputFormatProvider.of(builder.build());
}

0 comments on commit 4ef1b10

Please sign in to comment.