Skip to content
Permalink
Browse files
[FlinkConnector] Support time interval for flink connector (#5934)
  • Loading branch information
JNSimba committed Jun 30, 2021
1 parent d07c904 commit 31f359c7971f0794b4ac7671164c11bc6231d8d5
Showing 4 changed files with 82 additions and 14 deletions.
@@ -17,7 +17,10 @@
package org.apache.doris.flink.cfg;


import org.apache.flink.util.Preconditions;

import java.io.Serializable;
import java.time.Duration;

/**
* JDBC sink batch options.
@@ -27,10 +30,13 @@ public class DorisExecutionOptions implements Serializable {

private final Integer batchSize;
private final Integer maxRetries;
private final Long batchIntervalMs;

public DorisExecutionOptions(Integer batchSize, Integer maxRetries) {
public DorisExecutionOptions(Integer batchSize, Integer maxRetries,Long batchIntervalMs) {
Preconditions.checkArgument(maxRetries >= 0);
this.batchSize = batchSize;
this.maxRetries = maxRetries;
this.batchIntervalMs = batchIntervalMs;
}

public Integer getBatchSize() {
@@ -41,6 +47,10 @@ public Integer getMaxRetries() {
return maxRetries;
}

public Long getBatchIntervalMs() {
return batchIntervalMs;
}

public static Builder builder() {
return new Builder();
}
@@ -51,6 +61,7 @@ public static Builder builder() {
public static class Builder {
private Integer batchSize;
private Integer maxRetries;
private Long batchIntervalMs;

public Builder setBatchSize(Integer batchSize) {
this.batchSize = batchSize;
@@ -62,8 +73,13 @@ public Builder setMaxRetries(Integer maxRetries) {
return this;
}

public Builder setBatchIntervalMs(Long batchIntervalMs) {
this.batchIntervalMs = batchIntervalMs;
return this;
}

public DorisExecutionOptions build() {
return new DorisExecutionOptions(batchSize,maxRetries);
return new DorisExecutionOptions(batchSize,maxRetries,batchIntervalMs);
}
}

@@ -24,15 +24,21 @@
import org.apache.doris.flink.rest.RestService;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.StringJoiner;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;


/**
@@ -51,6 +57,10 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> {
private final List<String> batch = new ArrayList<>();
private transient volatile boolean closed = false;

private transient ScheduledExecutorService scheduler;
private transient ScheduledFuture<?> scheduledFuture;
private transient volatile Exception flushException;

public DorisDynamicOutputFormat(DorisOptions option,DorisReadOptions readOptions,DorisExecutionOptions executionOptions) {
this.options = option;
this.readOptions = readOptions;
@@ -71,10 +81,33 @@ public void open(int taskNumber, int numTasks) throws IOException {
options.getUsername(),
options.getPassword());
LOG.info("Streamload BE:{}",dorisStreamLoad.getLoadUrlStr());

if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) {
this.scheduler = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("doris-streamload-output-format"));
this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
synchronized (DorisDynamicOutputFormat.this) {
if (!closed) {
try {
flush();
} catch (Exception e) {
flushException = e;
}
}
}
}, executionOptions.getBatchIntervalMs(), executionOptions.getBatchIntervalMs(), TimeUnit.MILLISECONDS);
}
}

private void checkFlushException() {
if (flushException != null) {
throw new RuntimeException("Writing records to streamload failed.", flushException);
}
}

@Override
public void writeRecord(RowData row) throws IOException {
public synchronized void writeRecord(RowData row) throws IOException {
checkFlushException();

addBatch(row);
if (executionOptions.getBatchSize() > 0 && batch.size() >= executionOptions.getBatchSize()) {
flush();
@@ -91,22 +124,30 @@ private void addBatch(RowData row) {
}

@Override
public void close() throws IOException {
public synchronized void close() throws IOException {
if (!closed) {
closed = true;
if (batch.size() > 0) {
try {
flush();
} catch (Exception e) {
LOG.warn("Writing records to doris failed.", e);
throw new RuntimeException("Writing records to doris failed.", e);
}

if (this.scheduledFuture != null) {
scheduledFuture.cancel(false);
this.scheduler.shutdown();
}

try {
flush();
} catch (Exception e) {
LOG.warn("Writing records to doris failed.", e);
throw new RuntimeException("Writing records to doris failed.", e);
}
}
checkFlushException();
}


public void flush() throws IOException {
public synchronized void flush() throws IOException {
checkFlushException();
if(batch.isEmpty()){
return;
}
for (int i = 0; i <= executionOptions.getMaxRetries(); i++) {
try {
dorisStreamLoad.load(String.join(lineDelimiter,batch));
@@ -129,6 +170,7 @@ public void flush() throws IOException {
}
}


private String getBackend() throws IOException{
try {
//get be url from fe
@@ -32,6 +32,7 @@
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.TableSchemaUtils;

import java.time.Duration;
import java.util.HashSet;
import java.util.Set;

@@ -140,6 +141,13 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactor
.defaultValue(3)
.withDescription("the max retry times if writing records to database failed.");

private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL = ConfigOptions
.key("sink.batch.interval")
.durationType()
.defaultValue(Duration.ofSeconds(1))
.withDescription("the flush interval mills, over this time, asynchronous threads will flush data. The " +
"default value is 1s.");


@Override
public String factoryIdentifier() {
@@ -176,6 +184,7 @@ public Set<ConfigOption<?>> optionalOptions() {

options.add(SINK_BUFFER_FLUSH_MAX_ROWS);
options.add(SINK_MAX_RETRIES);
options.add(SINK_BUFFER_FLUSH_INTERVAL);
return options;
}

@@ -229,6 +238,7 @@ private DorisExecutionOptions getDorisExecutionOptions(ReadableConfig readableCo
final DorisExecutionOptions.Builder builder = DorisExecutionOptions.builder();
builder.setBatchSize(readableConfig.get(SINK_BUFFER_FLUSH_MAX_ROWS));
builder.setMaxRetries(readableConfig.get(SINK_MAX_RETRIES));
builder.setBatchIntervalMs(readableConfig.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
return builder.build();
}

@@ -134,7 +134,7 @@ public void load(String value) throws StreamLoadException {

private LoadResponse loadBatch(String value) {
Calendar calendar = Calendar.getInstance();
String label = String.format("audit_%s%02d%02d_%02d%02d%02d_%s",
String label = String.format("flink_connector_%s%02d%02d_%02d%02d%02d_%s",
calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH),
calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND),
UUID.randomUUID().toString().replaceAll("-", ""));

0 comments on commit 31f359c

Please sign in to comment.