Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-2746] Do not bootstrap for flink insert overwrite #3980

Merged
merged 1 commit into from
Nov 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 31 additions & 4 deletions hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,42 @@ public static DataStreamSink<Object> append(Configuration conf, RowType rowType,
.name("dummy");
}

/**
* Constructs bootstrap pipeline as streaming.
*/
public static DataStream<HoodieRecord> bootstrap(
Configuration conf,
RowType rowType,
int defaultParallelism,
DataStream<RowData> dataStream) {
return bootstrap(conf, rowType, defaultParallelism, dataStream, false, false);
}

/**
* Constructs bootstrap pipeline.
*
* @param conf The configuration
* @param rowType The row type
* @param defaultParallelism The default parallelism
* @param dataStream The data stream
* @param bounded Whether the source is bounded
* @param overwrite Whether it is insert overwrite
*/
public static DataStream<HoodieRecord> bootstrap(
Configuration conf,
RowType rowType,
int defaultParallelism,
DataStream<RowData> dataStream,
boolean bounded) {
return bounded
? boundedBootstrap(conf, rowType, defaultParallelism, dataStream)
: streamBootstrap(conf, rowType, defaultParallelism, dataStream);
boolean bounded,
boolean overwrite) {
final boolean globalIndex = conf.getBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED);
if (overwrite) {
return rowDataToHoodieRecord(conf, rowType, dataStream);
} else if (bounded && !globalIndex) {
return boundedBootstrap(conf, rowType, defaultParallelism, dataStream);
} else {
return streamBootstrap(conf, rowType, defaultParallelism, dataStream);
}
}

private static DataStream<HoodieRecord> streamBootstrap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public static void main(String[] args) throws Exception {
}
}

DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream, false);
DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream);
DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
if (StreamerUtil.needsAsyncCompaction(conf)) {
Pipelines.compact(conf, pipeline);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
DataStream<Object> pipeline;

// bootstrap
final DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream, context.isBounded());
final DataStream<HoodieRecord> hoodieRecordDataStream =
Pipelines.bootstrap(conf, rowType, parallelism, dataStream, context.isBounded(), overwrite);
// write pipeline
pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
// compaction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public void testMergeOnReadWriteWithCompaction() throws Exception {
.map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
.setParallelism(parallelism);

DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream, false);
DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream);
DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
Pipelines.clean(conf, pipeline);
Pipelines.compact(conf, pipeline);
Expand Down Expand Up @@ -225,7 +225,7 @@ private void testWriteToHoodie(
}

int parallelism = execEnv.getParallelism();
DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream, false);
DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream);
DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
execEnv.addOperator(pipeline.getTransformation());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,36 @@ void testBatchUpsertWithMiniBatches(ExecMode execMode, HoodieTableType tableType
assertRowsEquals(result, "[+I[id1, Sophia, 18, 1970-01-01T00:00:05, par1]]");
}

@ParameterizedTest
@MethodSource("executionModeAndTableTypeParams")
void testBatchUpsertWithMiniBatchesGlobalIndex(ExecMode execMode, HoodieTableType tableType) {
TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv;
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.WRITE_BATCH_SIZE, "0.001")
.option(FlinkOptions.TABLE_TYPE, tableType)
.option(FlinkOptions.INDEX_GLOBAL_ENABLED, true)
.end();
tableEnv.executeSql(hoodieTableDDL);

final String insertInto1 = "insert into t1 values\n"
+ "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1')";

execInsertSql(tableEnv, insertInto1);

final String insertInto2 = "insert into t1 values\n"
+ "('id1','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par2'),\n"
+ "('id1','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par1'),\n"
+ "('id1','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n"
+ "('id1','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3')";

execInsertSql(tableEnv, insertInto2);

List<Row> result = CollectionUtil.iterableToList(
() -> tableEnv.sqlQuery("select * from t1").execute().collect());
assertRowsEquals(result, "[+I[id1, Sophia, 18, 1970-01-01T00:00:05, par3]]");
}

@Test
void testUpdateWithDefaultHoodieRecordPayload() {
TableEnvironment tableEnv = batchTableEnv;
Expand Down