Skip to content

Commit

Permalink
[improvement] json load by line (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
madongz committed Apr 8, 2022
1 parent 03ea974 commit 2ffc9b8
Show file tree
Hide file tree
Showing 7 changed files with 14 additions and 22 deletions.
Expand Up @@ -126,10 +126,12 @@ public Builder<IN> setSerializer(DorisRecordSerializer<IN> serializer) {

public DorisSink<IN> build() {
Preconditions.checkNotNull(dorisOptions);
Preconditions.checkNotNull(dorisReadOptions);
Preconditions.checkNotNull(dorisExecutionOptions);
Preconditions.checkNotNull(serializer);
EscapeHandler.handleEscape(dorisExecutionOptions.getStreamLoadProp());
if(dorisReadOptions == null) {
dorisReadOptions = DorisReadOptions.builder().build();
}
return new DorisSink<>(dorisOptions, dorisReadOptions, dorisExecutionOptions, serializer);
}
}
Expand Down
Expand Up @@ -58,6 +58,7 @@
import static org.apache.doris.flink.sink.writer.LoadConstants.CSV;
import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
import static org.apache.doris.flink.sink.writer.LoadConstants.JSON;
import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY;

/**
Expand All @@ -70,8 +71,6 @@ public class DorisStreamLoad implements Serializable {
private final byte[] lineDelimiter;
private static final String LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load";
private static final String ABORT_URL_PATTERN = "http://%s/api/%s/_stream_load_2pc";
private static final byte[] JSON_ARRAY_START = "[".getBytes(StandardCharsets.UTF_8);
private static final byte[] JSON_ARRAY_END = "]".getBytes(StandardCharsets.UTF_8);
private static final String JOB_EXIST_FINISHED = "FINISHED";

private String loadUrlStr;
Expand All @@ -86,7 +85,6 @@ public class DorisStreamLoad implements Serializable {
private Future<CloseableHttpResponse> pendingLoadFuture;
private final CloseableHttpClient httpClient;
private final ExecutorService executorService;
private final String format;
private boolean loadBatchFirstRecord;

public DorisStreamLoad(String hostPort,
Expand All @@ -109,12 +107,7 @@ public DorisStreamLoad(String hostPort,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), new ExecutorThreadFactory("stream-load-upload"));
this.recordStream = new RecordStream(executionOptions.getBufferSize(), executionOptions.getBufferCount());
this.format = executionOptions.getStreamLoadProp().getProperty(FORMAT_KEY, CSV);
if (JSON.equals(format)) {
lineDelimiter = ",".getBytes();
} else {
lineDelimiter = streamLoadProp.getProperty(LINE_DELIMITER_KEY, "\n").getBytes();
}
lineDelimiter = streamLoadProp.getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT).getBytes();
loadBatchFirstRecord = true;
}

Expand Down Expand Up @@ -219,9 +212,6 @@ public RespContent handlePreCommitResponse(CloseableHttpResponse response) throw
}

public RespContent stopLoad() throws IOException{
if (JSON.equals(format)) {
recordStream.write(JSON_ARRAY_END);
}
recordStream.endInput();
LOG.info("stream load stopped.");
Preconditions.checkState(pendingLoadFuture != null);
Expand Down Expand Up @@ -261,9 +251,6 @@ public void startLoad(long chkID) throws IOException{
LOG.warn(err, e);
throw e;
}
if (JSON.equals(format)) {
recordStream.write(JSON_ARRAY_START);
}
}

private void abortTransaction(long txnID) throws Exception {
Expand Down
Expand Up @@ -77,8 +77,7 @@ public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
Properties loadProperties = executionOptions.getStreamLoadProp();
boolean deletable = RestService.isUniqueKeyType(options, readOptions, LOG) && executionOptions.getDeletable();

boolean deletable = RestService.isUniqueKeyType(options, readOptions, LOG) || executionOptions.getDeletable();
if (!loadProperties.containsKey(COLUMNS_KEY)) {
String[] fieldNames = tableSchema.getFieldNames();
Preconditions.checkState(fieldNames != null && fieldNames.length > 0);
Expand Down
Expand Up @@ -21,6 +21,7 @@
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.writer.SimpleStringSerializer;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
Expand All @@ -40,6 +41,7 @@ public class DorisSinkExample {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.enableCheckpointing(10000);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.milliseconds(30000)));
Expand Down
Expand Up @@ -16,6 +16,7 @@
// under the License.
package org.apache.doris.flink;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand All @@ -31,6 +32,7 @@ public class DorisSinkSQLExample {
public static void main(String[] args) {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

List<Tuple2<String, Integer>> data = new ArrayList<>();
Expand All @@ -53,7 +55,7 @@ public static void main(String[] args) {
" 'sink.buffer-count' = '4',\n" +
" 'sink.buffer-size' = '4086'," +
" 'sink.label-prefix' = 'doris_label',\n" +
" 'sink.properties.strip_outer_array' = 'true'\n" +
" 'sink.properties.read_json_by_line' = 'true'\n" +
")");
tEnv.executeSql("INSERT INTO doris_test_sink select name,age from doris_test");
}
Expand Down
Expand Up @@ -55,8 +55,8 @@ public static void main(String[] args) {
" 'table.identifier' = 'db.table',\n" +
" 'username' = 'root',\n" +
" 'password' = '',\n" +
" 'sink.batch.size' = '3',\n" +
" 'sink.max-retries' = '2'\n" +
" 'sink.properties.format' = 'csv',\n" +
" 'sink.label-prefix' = 'doris_csv_table'\n" +
")");

tEnv.executeSql("INSERT INTO doris_test_sink select name,age,price,sale from doris_test");
Expand Down
Expand Up @@ -106,7 +106,7 @@ public void testWriteTwoRecordInJson() throws Exception{
properties.setProperty("line_delimiter", "\n");
properties.setProperty("format", "json");
executionOptions = OptionUtils.buildExecutionOptional(properties);
byte[] expectBuffer = "[{\"id\": 1},{\"id\": 2}]".getBytes(StandardCharsets.UTF_8);
byte[] expectBuffer = "{\"id\": 1}\n{\"id\": 2}".getBytes(StandardCharsets.UTF_8);
CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
CloseableHttpResponse preCommitResponse = HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_RESPONSE, true);
when(httpClient.execute(any())).thenReturn(preCommitResponse);
Expand Down

0 comments on commit 2ffc9b8

Please sign in to comment.