From 9cb9cdcfb2292ea75e5153b526d440c9a7c8af0f Mon Sep 17 00:00:00 2001 From: SteNicholas Date: Fri, 5 Aug 2022 15:51:38 +0800 Subject: [PATCH] [HUDI-4551] The default value of READ_TASKS, WRITE_TASKS, CLUSTERING_TASKS is the parallelism of the execution environment --- .../hudi/configuration/FlinkOptions.java | 20 +++--- .../hudi/configuration/OptionsInference.java | 65 +++++++++++++++++++ .../org/apache/hudi/sink/utils/Pipelines.java | 20 ++---- .../hudi/streamer/HoodieFlinkStreamer.java | 7 +- .../apache/hudi/table/HoodieTableSink.java | 9 +-- .../apache/hudi/table/HoodieTableSource.java | 2 + .../hudi/sink/ITTestDataStreamWrite.java | 8 ++- .../cluster/ITTestHoodieFlinkClustering.java | 4 +- .../compact/ITTestHoodieFlinkCompactor.java | 8 +-- .../hudi/table/ITTestHoodieDataSource.java | 4 +- 10 files changed, 106 insertions(+), 41 deletions(-) create mode 100644 hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsInference.java diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 164106a4e876..134e8ce3a75d 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -153,8 +153,8 @@ private FlinkOptions() { public static final ConfigOption READ_TASKS = ConfigOptions .key("read.tasks") .intType() - .defaultValue(4) - .withDescription("Parallelism of tasks that do actual read, default is 4"); + .noDefaultValue() + .withDescription("Parallelism of tasks that do actual read, default is the parallelism of the execution environment"); public static final ConfigOption SOURCE_AVRO_SCHEMA_PATH = ConfigOptions .key("source.avro-schema.path") @@ -395,19 +395,19 @@ private FlinkOptions() { .key("write.index_bootstrap.tasks") .intType() .noDefaultValue() - .withDescription("Parallelism of tasks that do index bootstrap, default is the parallelism of the execution environment"); + .withDescription("Parallelism of tasks that do index bootstrap, default same as the sink parallelism"); public static final ConfigOption BUCKET_ASSIGN_TASKS = ConfigOptions .key("write.bucket_assign.tasks") .intType() .noDefaultValue() - .withDescription("Parallelism of tasks that do bucket assign, default is the parallelism of the execution environment"); + .withDescription("Parallelism of tasks that do bucket assign, default same as the write task parallelism"); public static final ConfigOption WRITE_TASKS = ConfigOptions .key("write.tasks") .intType() - .defaultValue(4) - .withDescription("Parallelism of tasks that do actual write, default is 4"); + .noDefaultValue() + .withDescription("Parallelism of tasks that do actual write, default is the parallelism of the execution environment"); public static final ConfigOption WRITE_TASK_MAX_SIZE = ConfigOptions .key("write.task.max.size") @@ -512,8 +512,8 @@ private FlinkOptions() { public static final ConfigOption COMPACTION_TASKS = ConfigOptions .key("compaction.tasks") .intType() - .defaultValue(4) // default WRITE_TASKS * COMPACTION_DELTA_COMMITS * 0.2 (assumes 5 commits generate one bucket) - .withDescription("Parallelism of tasks that do actual compaction, default is 4"); + .noDefaultValue() + .withDescription("Parallelism of tasks that do actual compaction, default same as the write task parallelism"); public static final String NUM_COMMITS = "num_commits"; public static final String TIME_ELAPSED = "time_elapsed"; @@ -630,8 +630,8 @@ private FlinkOptions() { public static final ConfigOption CLUSTERING_TASKS = ConfigOptions .key("clustering.tasks") .intType() - .defaultValue(4) - .withDescription("Parallelism of tasks that do actual clustering, default is 4"); + .noDefaultValue() + .withDescription("Parallelism of tasks that do actual clustering, default same as the write task parallelism"); public static final ConfigOption CLUSTERING_TARGET_PARTITIONS = ConfigOptions .key("clustering.plan.strategy.daybased.lookback.partitions") diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsInference.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsInference.java new file mode 100644 index 000000000000..3e02d2373270 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsInference.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.configuration; + +import org.apache.flink.configuration.Configuration; + +/** + * Tool helping to infer the flink options {@link FlinkOptions}. + */ +public class OptionsInference { + + /** + * Sets up the default source task parallelism if it is not specified. + * + * @param conf The configuration + * @param envTasks The parallelism of the execution env + */ + public static void setupSourceTasks(Configuration conf, int envTasks) { + if (!conf.contains(FlinkOptions.READ_TASKS)) { + conf.setInteger(FlinkOptions.READ_TASKS, envTasks); + } + } + + /** + * Sets up the default sink tasks parallelism if it is not specified. + * + * @param conf The configuration + * @param envTasks The parallelism of the execution env + */ + public static void setupSinkTasks(Configuration conf, int envTasks) { + // write task number, default same as execution env tasks + if (!conf.contains(FlinkOptions.WRITE_TASKS)) { + conf.setInteger(FlinkOptions.WRITE_TASKS, envTasks); + } + int writeTasks = conf.getInteger(FlinkOptions.WRITE_TASKS); + // bucket assign tasks, default same as write tasks + if (!conf.contains(FlinkOptions.BUCKET_ASSIGN_TASKS)) { + conf.setInteger(FlinkOptions.BUCKET_ASSIGN_TASKS, writeTasks); + } + // compaction tasks, default same as write tasks + if (!conf.contains(FlinkOptions.COMPACTION_TASKS)) { + conf.setInteger(FlinkOptions.COMPACTION_TASKS, writeTasks); + } + // clustering tasks, default same as write tasks + if (!conf.contains(FlinkOptions.CLUSTERING_TASKS)) { + conf.setInteger(FlinkOptions.CLUSTERING_TASKS, writeTasks); + } + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java index 0341d0af7f87..18b27daeb952 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java @@ -209,9 +209,8 @@ public static DataStream append( public static DataStream bootstrap( Configuration conf, RowType rowType, - int defaultParallelism, DataStream dataStream) { - return bootstrap(conf, rowType, defaultParallelism, dataStream, false, false); + return bootstrap(conf, rowType, dataStream, false, false); } /** @@ -221,7 +220,6 @@ public static DataStream bootstrap( * * @param conf The configuration * @param rowType The row type - * @param defaultParallelism The default parallelism * @param dataStream The data stream * @param bounded Whether the source is bounded * @param overwrite Whether it is insert overwrite @@ -229,7 +227,6 @@ public static DataStream bootstrap( public static DataStream bootstrap( Configuration conf, RowType rowType, - int defaultParallelism, DataStream dataStream, boolean bounded, boolean overwrite) { @@ -237,16 +234,15 @@ public static DataStream bootstrap( if (overwrite || OptionsResolver.isBucketIndexType(conf)) { return rowDataToHoodieRecord(conf, rowType, dataStream); } else if (bounded && !globalIndex && OptionsResolver.isPartitionedTable(conf)) { - return boundedBootstrap(conf, rowType, defaultParallelism, dataStream); + return boundedBootstrap(conf, rowType, dataStream); } else { - return streamBootstrap(conf, rowType, defaultParallelism, dataStream, bounded); + return streamBootstrap(conf, rowType, dataStream, bounded); } } private static DataStream streamBootstrap( Configuration conf, RowType rowType, - int defaultParallelism, DataStream dataStream, boolean bounded) { DataStream dataStream1 = rowDataToHoodieRecord(conf, rowType, dataStream); @@ -257,7 +253,7 @@ private static DataStream streamBootstrap( "index_bootstrap", TypeInformation.of(HoodieRecord.class), new BootstrapOperator<>(conf)) - .setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(defaultParallelism)) + .setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(dataStream1.getParallelism())) .uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME)); } @@ -272,7 +268,6 @@ private static DataStream streamBootstrap( private static DataStream boundedBootstrap( Configuration conf, RowType rowType, - int defaultParallelism, DataStream dataStream) { final RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType); // shuffle by partition keys @@ -284,7 +279,7 @@ private static DataStream boundedBootstrap( "batch_index_bootstrap", TypeInformation.of(HoodieRecord.class), new BatchBootstrapOperator<>(conf)) - .setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(defaultParallelism)) + .setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(dataStream.getParallelism())) .uid("uid_batch_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME)); } @@ -315,11 +310,10 @@ public static DataStream rowDataToHoodieRecord(Configuration conf, * and flushes the data set to disk. * * @param conf The configuration - * @param defaultParallelism The default parallelism * @param dataStream The input data stream * @return the stream write data stream pipeline */ - public static DataStream hoodieStreamWrite(Configuration conf, int defaultParallelism, DataStream dataStream) { + public static DataStream hoodieStreamWrite(Configuration conf, DataStream dataStream) { if (OptionsResolver.isBucketIndexType(conf)) { WriteOperatorFactory operatorFactory = BucketStreamWriteOperator.getFactory(conf); int bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS); @@ -339,7 +333,7 @@ public static DataStream hoodieStreamWrite(Configuration conf, int defau TypeInformation.of(HoodieRecord.class), new KeyedProcessOperator<>(new BucketAssignFunction<>(conf))) .uid("uid_bucket_assigner_" + conf.getString(FlinkOptions.TABLE_NAME)) - .setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(defaultParallelism)) + .setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS)) // shuffle by fileId(bucket id) .keyBy(record -> record.getCurrentLocation().getFileId()) .transform(opIdentifier("stream_write", conf), TypeInformation.of(Object.class), operatorFactory) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java index 29f55f78acf1..b153b2273cf6 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.OptionsInference; import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.sink.transform.Transformer; import org.apache.hudi.sink.utils.Pipelines; @@ -76,7 +77,6 @@ public static void main(String[] args) throws Exception { .getLogicalType(); long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout(); - int parallelism = env.getParallelism(); conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout); DataStream dataStream = env.addSource(new FlinkKafkaConsumer<>( @@ -98,8 +98,9 @@ public static void main(String[] args) throws Exception { } } - DataStream hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream); - DataStream pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream); + OptionsInference.setupSinkTasks(conf, env.getParallelism()); + DataStream hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, dataStream); + DataStream pipeline = Pipelines.hoodieStreamWrite(conf, hoodieRecordDataStream); if (OptionsResolver.needsAsyncCompaction(conf)) { Pipelines.compact(conf, pipeline); } else { diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index 5af86867d86d..f8799d3ac940 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.OptionsInference; import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.sink.utils.Pipelines; import org.apache.hudi.util.ChangelogModes; @@ -66,6 +67,8 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { long ckpTimeout = dataStream.getExecutionEnvironment() .getCheckpointConfig().getCheckpointTimeout(); conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout); + // set up default parallelism + OptionsInference.setupSinkTasks(conf, dataStream.getExecutionConfig().getParallelism()); RowType rowType = (RowType) schema.toSinkRowDataType().notNull().getLogicalType(); @@ -85,14 +88,12 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { } } - // default parallelism - int parallelism = dataStream.getExecutionConfig().getParallelism(); DataStream pipeline; // bootstrap final DataStream hoodieRecordDataStream = - Pipelines.bootstrap(conf, rowType, parallelism, dataStream, context.isBounded(), overwrite); + Pipelines.bootstrap(conf, rowType, dataStream, context.isBounded(), overwrite); // write pipeline - pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream); + pipeline = Pipelines.hoodieStreamWrite(conf, hoodieRecordDataStream); // compaction if (OptionsResolver.needsAsyncCompaction(conf)) { // use synchronous compaction for bounded source. diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index 2034cb322eb8..2deb33d842cf 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -34,6 +34,7 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.HadoopConfigurations; +import org.apache.hudi.configuration.OptionsInference; import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.source.FileIndex; @@ -179,6 +180,7 @@ public DataStream produceDataStream(StreamExecutionEnvironment execEnv) @SuppressWarnings("unchecked") TypeInformation typeInfo = (TypeInformation) TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType()); + OptionsInference.setupSourceTasks(conf, execEnv.getParallelism()); if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) { StreamReadMonitoringFunction monitoringFunction = new StreamReadMonitoringFunction( conf, FilePathUtils.toFlinkPath(path), tableRowType, maxCompactionMemoryInBytes, getRequiredPartitionPaths()); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java index 680c4d02e238..aa420a433dd8 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.OptionsInference; import org.apache.hudi.sink.transform.ChainedTransformer; import org.apache.hudi.sink.transform.Transformer; import org.apache.hudi.sink.utils.Pipelines; @@ -239,9 +240,9 @@ private void testWriteToHoodie( dataStream = transformer.get().apply(dataStream); } - int parallelism = execEnv.getParallelism(); - DataStream hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream); - DataStream pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream); + OptionsInference.setupSinkTasks(conf, execEnv.getParallelism()); + DataStream hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, dataStream); + DataStream pipeline = Pipelines.hoodieStreamWrite(conf, hoodieRecordDataStream); execEnv.addOperator(pipeline.getTransformation()); if (isMor) { @@ -305,6 +306,7 @@ private void testWriteToHoodieWithCluster( .setParallelism(4); } + OptionsInference.setupSinkTasks(conf, execEnv.getParallelism()); DataStream pipeline = Pipelines.append(conf, rowType, dataStream, true); execEnv.addOperator(pipeline.getTransformation()); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java index aba8e4c7b4b9..a0073d8a3703 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java @@ -88,7 +88,7 @@ public void testHoodieFlinkClustering() throws Exception { EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); TableEnvironment tableEnv = TableEnvironmentImpl.create(settings); tableEnv.getConfig().getConfiguration() - .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); + .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4); Map options = new HashMap<>(); options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); @@ -187,7 +187,7 @@ public void testHoodieFlinkClusteringService() throws Exception { EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); TableEnvironment tableEnv = TableEnvironmentImpl.create(settings); tableEnv.getConfig().getConfiguration() - .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); + .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4); Map options = new HashMap<>(); options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java index a8b78ab64da5..428f65f37c1d 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java @@ -101,7 +101,7 @@ public void testHoodieFlinkCompactor(boolean enableChangelog) throws Exception { EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); TableEnvironment tableEnv = TableEnvironmentImpl.create(settings); tableEnv.getConfig().getConfiguration() - .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); + .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4); Map options = new HashMap<>(); options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "false"); options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false"); @@ -173,7 +173,7 @@ public void testHoodieFlinkCompactorService(boolean enableChangelog) throws Exce EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); TableEnvironment tableEnv = TableEnvironmentImpl.create(settings); tableEnv.getConfig().getConfiguration() - .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); + .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4); Map options = new HashMap<>(); options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false"); options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); @@ -214,7 +214,7 @@ public void testHoodieFlinkCompactorWithPlanSelectStrategy(boolean enableChangel EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); TableEnvironment tableEnv = TableEnvironmentImpl.create(settings); tableEnv.getConfig().getConfiguration() - .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); + .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4); Map options = new HashMap<>(); options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false"); options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); @@ -294,7 +294,7 @@ public void testCompactionInBatchExecutionMode() throws Exception { EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); TableEnvironment tableEnv = TableEnvironmentImpl.create(settings); tableEnv.getConfig().getConfiguration() - .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); + .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4); Map options = new HashMap<>(); options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "2"); options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java index afe3e809b0c0..2805ac998e3c 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java @@ -84,7 +84,7 @@ void beforeEach() { EnvironmentSettings settings = EnvironmentSettings.newInstance().build(); streamTableEnv = TableEnvironmentImpl.create(settings); streamTableEnv.getConfig().getConfiguration() - .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); + .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4); Configuration execConf = streamTableEnv.getConfig().getConfiguration(); execConf.setString("execution.checkpointing.interval", "2s"); // configure not to retry after failure @@ -93,7 +93,7 @@ void beforeEach() { batchTableEnv = TestTableEnvs.getBatchTableEnv(); batchTableEnv.getConfig().getConfiguration() - .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); + .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4); } @TempDir