Skip to content
Permalink
Browse files
[Feature][flink-connector] support flink delete option (#7457)
* Flink Connector supports delete option on Unique models
Co-authored-by: wudi <wud3@shuhaisc.com>
  • Loading branch information
JNSimba committed Jan 23, 2022
1 parent 06f58f2 commit 021e281e0e283e7c0c862ceaf878cc6b7e83de5b
Showing 6 changed files with 218 additions and 152 deletions.
@@ -26,8 +26,8 @@
* JDBC sink batch options.
*/
public class DorisExecutionOptions implements Serializable {
private static final long serialVersionUID = 1L;

private static final long serialVersionUID = 1L;
public static final Integer DEFAULT_BATCH_SIZE = 10000;
public static final Integer DEFAULT_MAX_RETRY_TIMES = 1;
private static final Long DEFAULT_INTERVAL_MILLIS = 10000L;
@@ -41,12 +41,27 @@ public class DorisExecutionOptions implements Serializable {
*/
private final Properties streamLoadProp;

public DorisExecutionOptions(Integer batchSize, Integer maxRetries, Long batchIntervalMs, Properties streamLoadProp) {
private final Boolean enableDelete;


public DorisExecutionOptions(Integer batchSize, Integer maxRetries, Long batchIntervalMs, Properties streamLoadProp, Boolean enableDelete) {
Preconditions.checkArgument(maxRetries >= 0);
this.batchSize = batchSize;
this.maxRetries = maxRetries;
this.batchIntervalMs = batchIntervalMs;
this.streamLoadProp = streamLoadProp;
this.enableDelete = enableDelete;
}

public static Builder builder() {
return new Builder();
}

public static DorisExecutionOptions defaults() {
Properties pro = new Properties();
pro.setProperty("format", "json");
pro.setProperty("strip_outer_array", "true");
return new Builder().setStreamLoadProp(pro).build();
}

public Integer getBatchSize() {
@@ -65,15 +80,8 @@ public Properties getStreamLoadProp() {
return streamLoadProp;
}

public static Builder builder() {
return new Builder();
}

public static DorisExecutionOptions defaults() {
Properties pro = new Properties();
pro.setProperty("format", "json");
pro.setProperty("strip_outer_array", "true");
return new Builder().setStreamLoadProp(pro).build();
public Boolean getEnableDelete() {
return enableDelete;
}

/**
@@ -84,6 +92,7 @@ public static class Builder {
private Integer maxRetries = DEFAULT_MAX_RETRY_TIMES;
private Long batchIntervalMs = DEFAULT_INTERVAL_MILLIS;
private Properties streamLoadProp = new Properties();
private Boolean enableDelete = false;

public Builder setBatchSize(Integer batchSize) {
this.batchSize = batchSize;
@@ -105,8 +114,13 @@ public Builder setStreamLoadProp(Properties streamLoadProp) {
return this;
}

public Builder setEnableDelete(Boolean enableDelete) {
this.enableDelete = enableDelete;
return this;
}

public DorisExecutionOptions build() {
return new DorisExecutionOptions(batchSize, maxRetries, batchIntervalMs, streamLoadProp);
return new DorisExecutionOptions(batchSize, maxRetries, batchIntervalMs, streamLoadProp, enableDelete);
}
}

@@ -32,8 +32,8 @@ public class DorisStreamOptions implements Serializable {
private DorisReadOptions readOptions;

public DorisStreamOptions(Properties prop) {
this.prop = prop;
init();
this.prop = prop;
init();
}

/**
@@ -47,17 +47,17 @@ private void init() {
.setTableIdentifier(prop.getProperty(ConfigurationOptions.TABLE_IDENTIFIER));

DorisReadOptions.Builder readOptionsBuilder = DorisReadOptions.builder()
.setDeserializeArrowAsync(Boolean.valueOf(prop.getProperty(ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC,ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT.toString())))
.setDeserializeQueueSize(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE,ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT.toString())))
.setExecMemLimit(Long.valueOf(prop.getProperty(ConfigurationOptions.DORIS_EXEC_MEM_LIMIT,ConfigurationOptions.DORIS_EXEC_MEM_LIMIT_DEFAULT.toString())))
.setDeserializeArrowAsync(Boolean.valueOf(prop.getProperty(ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC, ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT.toString())))
.setDeserializeQueueSize(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE, ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT.toString())))
.setExecMemLimit(Long.valueOf(prop.getProperty(ConfigurationOptions.DORIS_EXEC_MEM_LIMIT, ConfigurationOptions.DORIS_EXEC_MEM_LIMIT_DEFAULT.toString())))
.setFilterQuery(prop.getProperty(ConfigurationOptions.DORIS_FILTER_QUERY))
.setReadFields(prop.getProperty(ConfigurationOptions.DORIS_READ_FIELD))
.setRequestQueryTimeoutS(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_REQUEST_QUERY_TIMEOUT_S,ConfigurationOptions.DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT.toString())))
.setRequestBatchSize(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_BATCH_SIZE,ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT.toString())))
.setRequestConnectTimeoutMs(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS,ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT.toString())))
.setRequestReadTimeoutMs(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS,ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT.toString())))
.setRequestRetries(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_REQUEST_RETRIES,ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT.toString())))
.setRequestTabletSize(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_TABLET_SIZE,ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT.toString())));
.setRequestQueryTimeoutS(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_REQUEST_QUERY_TIMEOUT_S, ConfigurationOptions.DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT.toString())))
.setRequestBatchSize(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_BATCH_SIZE, ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT.toString())))
.setRequestConnectTimeoutMs(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS, ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT.toString())))
.setRequestReadTimeoutMs(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS, ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT.toString())))
.setRequestRetries(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_REQUEST_RETRIES, ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT.toString())))
.setRequestTabletSize(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_TABLET_SIZE, ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT.toString())));

this.options = optionsBuilder.build();
this.readOptions = readOptionsBuilder.build();
@@ -23,6 +23,7 @@

public class Schema {
private int status = 0;
private String keysType;
private List<Field> properties;

public Schema() {
@@ -41,6 +42,14 @@ public void setStatus(int status) {
this.status = status;
}

public String getKeysType() {
return keysType;
}

public void setKeysType(String keysType) {
this.keysType = keysType;
}

public List<Field> getProperties() {
return properties;
}
@@ -23,12 +23,14 @@
import org.apache.doris.flink.exception.DorisException;
import org.apache.doris.flink.exception.StreamLoadException;
import org.apache.doris.flink.rest.RestService;
import org.apache.doris.flink.rest.models.Schema;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.RowKind;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -46,6 +48,7 @@
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.apache.flink.table.data.RowData.createFieldGetter;

@@ -58,6 +61,7 @@
private static final Logger LOG = LoggerFactory.getLogger(DorisDynamicOutputFormat.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

private static final String COLUMNS_KEY = "columns";
private static final String FIELD_DELIMITER_KEY = "column_separator";
private static final String FIELD_DELIMITER_DEFAULT = "\t";
private static final String LINE_DELIMITER_KEY = "line_delimiter";
@@ -67,20 +71,21 @@
private static final String NULL_VALUE = "\\N";
private static final String ESCAPE_DELIMITERS_KEY = "escape_delimiters";
private static final String ESCAPE_DELIMITERS_DEFAULT = "false";

private String fieldDelimiter;
private String lineDelimiter;
private static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__";
private static final String UNIQUE_KEYS_TYPE = "UNIQUE_KEYS";
private final String[] fieldNames;
private final boolean jsonFormat;
private final RowData.FieldGetter[] fieldGetters;
private final List batch = new ArrayList<>();
private String fieldDelimiter;
private String lineDelimiter;
private DorisOptions options;
private DorisReadOptions readOptions;
private DorisExecutionOptions executionOptions;
private DorisStreamLoad dorisStreamLoad;
private final RowData.FieldGetter[] fieldGetters;
private String keysType;

private final List batch = new ArrayList<>();
private transient volatile boolean closed = false;

private transient ScheduledExecutorService scheduler;
private transient ScheduledFuture<?> scheduledFuture;
private transient volatile Exception flushException;
@@ -93,9 +98,42 @@ public DorisDynamicOutputFormat(DorisOptions option,
this.options = option;
this.readOptions = readOptions;
this.executionOptions = executionOptions;
this.fieldNames = fieldNames;
this.jsonFormat = FORMAT_JSON_VALUE.equals(executionOptions.getStreamLoadProp().getProperty(FORMAT_KEY));
this.keysType = parseKeysType();

Properties streamLoadProp = executionOptions.getStreamLoadProp();
handleStreamloadProp();
this.fieldGetters = new RowData.FieldGetter[logicalTypes.length];
for (int i = 0; i < logicalTypes.length; i++) {
fieldGetters[i] = createFieldGetter(logicalTypes[i], i);
}
}

/**
* parse table keysType
*
* @return keysType
*/
private String parseKeysType() {
try {
Schema schema = RestService.getSchema(options, readOptions, LOG);
return schema.getKeysType();
} catch (DorisException e) {
throw new RuntimeException("Failed fetch doris table schema: " + options.getTableIdentifier());
}
}

/**
* A builder used to set parameters to the output format's configuration in a fluent way.
*
* @return builder
*/
public static Builder builder() {
return new Builder();
}

private void handleStreamloadProp() {
Properties streamLoadProp = executionOptions.getStreamLoadProp();
boolean ifEscape = Boolean.parseBoolean(streamLoadProp.getProperty(ESCAPE_DELIMITERS_KEY, ESCAPE_DELIMITERS_DEFAULT));
if (ifEscape) {
this.fieldDelimiter = escapeString(streamLoadProp.getProperty(FIELD_DELIMITER_KEY,
@@ -113,11 +151,13 @@ public DorisDynamicOutputFormat(DorisOptions option,
LINE_DELIMITER_DEFAULT);
}

this.fieldNames = fieldNames;
this.jsonFormat = FORMAT_JSON_VALUE.equals(executionOptions.getStreamLoadProp().getProperty(FORMAT_KEY));
this.fieldGetters = new RowData.FieldGetter[logicalTypes.length];
for (int i = 0; i < logicalTypes.length; i++) {
fieldGetters[i] = createFieldGetter(logicalTypes[i], i);
//add column key when fieldNames is not empty
if (!streamLoadProp.containsKey(COLUMNS_KEY) && fieldNames != null && fieldNames.length > 0) {
String columns = String.join(",", Arrays.stream(fieldNames).map(item -> String.format("`%s`", item.trim().replace("`", ""))).collect(Collectors.toList()));
if (enableBatchDelete()) {
columns = String.format("%s,%s", columns, DORIS_DELETE_SIGN);
}
streamLoadProp.put(COLUMNS_KEY, columns);
}
}

@@ -133,6 +173,10 @@ private String escapeString(String s) {
return buf.toString();
}

private boolean enableBatchDelete() {
return executionOptions.getEnableDelete() || UNIQUE_KEYS_TYPE.equals(keysType);
}

@Override
public void configure(Configuration configuration) {
}
@@ -195,16 +239,34 @@ private void addBatch(T row) {
value.add(data);
}
}
// add doris delete sign
if (enableBatchDelete()) {
if (jsonFormat) {
valueMap.put(DORIS_DELETE_SIGN, parseDeleteSign(rowData.getRowKind()));
} else {
value.add(parseDeleteSign(rowData.getRowKind()));
}
}
Object data = jsonFormat ? valueMap : value.toString();
batch.add(data);

} else if (row instanceof String) {
batch.add(row);
} else {
throw new RuntimeException("The type of element should be 'RowData' or 'String' only.");
}
}

private String parseDeleteSign(RowKind rowKind) {
if (RowKind.INSERT.equals(rowKind) || RowKind.UPDATE_AFTER.equals(rowKind)) {
return "0";
} else if (RowKind.DELETE.equals(rowKind) || RowKind.UPDATE_BEFORE.equals(rowKind)) {
return "1";
} else {
throw new RuntimeException("Unrecognized row kind:" + rowKind.toString());
}
}


@Override
public synchronized void close() throws IOException {
if (!closed) {
@@ -264,7 +326,6 @@ public synchronized void flush() throws IOException {
}
}


private String getBackend() throws IOException {
try {
//get be url from fe
@@ -275,16 +336,6 @@ private String getBackend() throws IOException {
}
}


/**
* A builder used to set parameters to the output format's configuration in a fluent way.
*
* @return builder
*/
public static Builder builder() {
return new Builder();
}

/**
* Builder for {@link DorisDynamicOutputFormat}.
*/
@@ -348,5 +399,7 @@ public DorisDynamicOutputFormat build() {
optionsBuilder.build(), readOptions, executionOptions, logicalTypes, fieldNames
);
}


}
}

0 comments on commit 021e281

Please sign in to comment.