Skip to content

Commit

Permalink
[HUDI-2789] Flink batch upsert for non partitioned table does not work (
Browse files Browse the repository at this point in the history
  • Loading branch information
danny0405 committed Nov 18, 2021
1 parent 2d3f2a3 commit 71a2ae0
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.table.format.FilePathUtils;

import org.apache.flink.configuration.Configuration;

Expand Down Expand Up @@ -92,4 +93,11 @@ public static boolean isDeltaTimeCompaction(Configuration conf) {
final String strategy = conf.getString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY).toLowerCase(Locale.ROOT);
return FlinkOptions.TIME_ELAPSED.equals(strategy) || FlinkOptions.NUM_OR_TIME.equals(strategy);
}

/**
* Returns whether the table is partitioned.
*/
public static boolean isPartitionedTable(Configuration conf) {
return FilePathUtils.extractPartitionKeys(conf).length > 0;
}
}
21 changes: 10 additions & 11 deletions hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.sink.StreamWriteOperator;
import org.apache.hudi.sink.append.AppendWriteOperator;
Expand Down Expand Up @@ -129,21 +130,22 @@ public static DataStream<HoodieRecord> bootstrap(
final boolean globalIndex = conf.getBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED);
if (overwrite) {
return rowDataToHoodieRecord(conf, rowType, dataStream);
} else if (bounded && !globalIndex) {
} else if (bounded && !globalIndex && OptionsResolver.isPartitionedTable(conf)) {
return boundedBootstrap(conf, rowType, defaultParallelism, dataStream);
} else {
return streamBootstrap(conf, rowType, defaultParallelism, dataStream);
return streamBootstrap(conf, rowType, defaultParallelism, dataStream, bounded);
}
}

private static DataStream<HoodieRecord> streamBootstrap(
Configuration conf,
RowType rowType,
int defaultParallelism,
DataStream<RowData> dataStream) {
DataStream<RowData> dataStream,
boolean bounded) {
DataStream<HoodieRecord> dataStream1 = rowDataToHoodieRecord(conf, rowType, dataStream);

if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED) || bounded) {
dataStream1 = dataStream1
.transform(
"index_bootstrap",
Expand All @@ -161,13 +163,10 @@ private static DataStream<HoodieRecord> boundedBootstrap(
RowType rowType,
int defaultParallelism,
DataStream<RowData> dataStream) {
final String[] partitionFields = FilePathUtils.extractPartitionKeys(conf);
if (partitionFields.length > 0) {
RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
// shuffle by partition keys
dataStream = dataStream
.keyBy(rowDataKeyGen::getPartitionPath);
}
final RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
// shuffle by partition keys
dataStream = dataStream
.keyBy(rowDataKeyGen::getPartitionPath);

return rowDataToHoodieRecord(conf, rowType, dataStream)
.transform(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,8 @@ void testBatchModeUpsertWithoutPartition(HoodieTableType tableType) {
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.TABLE_NAME, tableType.name())
.option("hoodie.parquet.small.file.limit", "0") // invalidate the small file strategy
.option("hoodie.parquet.max.file.size", "0")
.noPartition()
.end();
tableEnv.executeSql(hoodieTableDDL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,11 @@ public Sql option(ConfigOption<?> option, Object val) {
return this;
}

public Sql option(String key, Object val) {
this.options.put(key, val.toString());
return this;
}

public Sql options(Map<String, String> options) {
this.options.putAll(options);
return this;
Expand Down

0 comments on commit 71a2ae0

Please sign in to comment.