Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-4551] Tweak the default parallelism of flink pipeline to execution env parallelism #6312

Merged
merged 1 commit into from
Aug 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ private FlinkOptions() {
public static final ConfigOption<Integer> READ_TASKS = ConfigOptions
.key("read.tasks")
.intType()
.defaultValue(4)
.withDescription("Parallelism of tasks that do actual read, default is 4");
.noDefaultValue()
.withDescription("Parallelism of tasks that do actual read, default is the parallelism of the execution environment");

public static final ConfigOption<String> SOURCE_AVRO_SCHEMA_PATH = ConfigOptions
.key("source.avro-schema.path")
Expand Down Expand Up @@ -395,19 +395,19 @@ private FlinkOptions() {
.key("write.index_bootstrap.tasks")
.intType()
.noDefaultValue()
.withDescription("Parallelism of tasks that do index bootstrap, default is the parallelism of the execution environment");
.withDescription("Parallelism of tasks that do index bootstrap, default same as the sink parallelism");

public static final ConfigOption<Integer> BUCKET_ASSIGN_TASKS = ConfigOptions
.key("write.bucket_assign.tasks")
.intType()
.noDefaultValue()
.withDescription("Parallelism of tasks that do bucket assign, default is the parallelism of the execution environment");
.withDescription("Parallelism of tasks that do bucket assign, default same as the write task parallelism");

public static final ConfigOption<Integer> WRITE_TASKS = ConfigOptions
.key("write.tasks")
.intType()
.defaultValue(4)
.withDescription("Parallelism of tasks that do actual write, default is 4");
.noDefaultValue()
.withDescription("Parallelism of tasks that do actual write, default is the parallelism of the execution environment");

public static final ConfigOption<Double> WRITE_TASK_MAX_SIZE = ConfigOptions
.key("write.task.max.size")
Expand Down Expand Up @@ -512,8 +512,8 @@ private FlinkOptions() {
public static final ConfigOption<Integer> COMPACTION_TASKS = ConfigOptions
.key("compaction.tasks")
.intType()
.defaultValue(4) // default WRITE_TASKS * COMPACTION_DELTA_COMMITS * 0.2 (assumes 5 commits generate one bucket)
.withDescription("Parallelism of tasks that do actual compaction, default is 4");
.noDefaultValue()
.withDescription("Parallelism of tasks that do actual compaction, default same as the write task parallelism");

public static final String NUM_COMMITS = "num_commits";
public static final String TIME_ELAPSED = "time_elapsed";
Expand Down Expand Up @@ -630,8 +630,8 @@ private FlinkOptions() {
public static final ConfigOption<Integer> CLUSTERING_TASKS = ConfigOptions
.key("clustering.tasks")
.intType()
.defaultValue(4)
.withDescription("Parallelism of tasks that do actual clustering, default is 4");
.noDefaultValue()
.withDescription("Parallelism of tasks that do actual clustering, default same as the write task parallelism");

public static final ConfigOption<Integer> CLUSTERING_TARGET_PARTITIONS = ConfigOptions
.key("clustering.plan.strategy.daybased.lookback.partitions")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.configuration;

import org.apache.flink.configuration.Configuration;

/**
* Tool helping to infer the flink options {@link FlinkOptions}.
*/
public class OptionsInference {

/**
* Sets up the default source task parallelism if it is not specified.
*
* @param conf The configuration
* @param envTasks The parallelism of the execution env
*/
public static void setupSourceTasks(Configuration conf, int envTasks) {
if (!conf.contains(FlinkOptions.READ_TASKS)) {
conf.setInteger(FlinkOptions.READ_TASKS, envTasks);
}
}

/**
* Sets up the default sink tasks parallelism if it is not specified.
*
* @param conf The configuration
* @param envTasks The parallelism of the execution env
*/
public static void setupSinkTasks(Configuration conf, int envTasks) {
// write task number, default same as execution env tasks
if (!conf.contains(FlinkOptions.WRITE_TASKS)) {
conf.setInteger(FlinkOptions.WRITE_TASKS, envTasks);
}
int writeTasks = conf.getInteger(FlinkOptions.WRITE_TASKS);
// bucket assign tasks, default same as write tasks
if (!conf.contains(FlinkOptions.BUCKET_ASSIGN_TASKS)) {
conf.setInteger(FlinkOptions.BUCKET_ASSIGN_TASKS, writeTasks);
}
// compaction tasks, default same as write tasks
if (!conf.contains(FlinkOptions.COMPACTION_TASKS)) {
conf.setInteger(FlinkOptions.COMPACTION_TASKS, writeTasks);
}
// clustering tasks, default same as write tasks
if (!conf.contains(FlinkOptions.CLUSTERING_TASKS)) {
conf.setInteger(FlinkOptions.CLUSTERING_TASKS, writeTasks);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,8 @@ public static DataStream<Object> append(
public static DataStream<HoodieRecord> bootstrap(
Configuration conf,
RowType rowType,
int defaultParallelism,
DataStream<RowData> dataStream) {
return bootstrap(conf, rowType, defaultParallelism, dataStream, false, false);
return bootstrap(conf, rowType, dataStream, false, false);
}

/**
Expand All @@ -221,32 +220,29 @@ public static DataStream<HoodieRecord> bootstrap(
*
* @param conf The configuration
* @param rowType The row type
* @param defaultParallelism The default parallelism
* @param dataStream The data stream
* @param bounded Whether the source is bounded
* @param overwrite Whether it is insert overwrite
*/
public static DataStream<HoodieRecord> bootstrap(
Configuration conf,
RowType rowType,
int defaultParallelism,
DataStream<RowData> dataStream,
boolean bounded,
boolean overwrite) {
final boolean globalIndex = conf.getBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED);
if (overwrite || OptionsResolver.isBucketIndexType(conf)) {
return rowDataToHoodieRecord(conf, rowType, dataStream);
} else if (bounded && !globalIndex && OptionsResolver.isPartitionedTable(conf)) {
return boundedBootstrap(conf, rowType, defaultParallelism, dataStream);
return boundedBootstrap(conf, rowType, dataStream);
} else {
return streamBootstrap(conf, rowType, defaultParallelism, dataStream, bounded);
return streamBootstrap(conf, rowType, dataStream, bounded);
}
}

private static DataStream<HoodieRecord> streamBootstrap(
Configuration conf,
RowType rowType,
int defaultParallelism,
DataStream<RowData> dataStream,
boolean bounded) {
DataStream<HoodieRecord> dataStream1 = rowDataToHoodieRecord(conf, rowType, dataStream);
Expand All @@ -257,7 +253,7 @@ private static DataStream<HoodieRecord> streamBootstrap(
"index_bootstrap",
TypeInformation.of(HoodieRecord.class),
new BootstrapOperator<>(conf))
.setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(defaultParallelism))
.setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(dataStream1.getParallelism()))
.uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME));
}

Expand All @@ -272,7 +268,6 @@ private static DataStream<HoodieRecord> streamBootstrap(
private static DataStream<HoodieRecord> boundedBootstrap(
Configuration conf,
RowType rowType,
int defaultParallelism,
DataStream<RowData> dataStream) {
final RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
// shuffle by partition keys
Expand All @@ -284,7 +279,7 @@ private static DataStream<HoodieRecord> boundedBootstrap(
"batch_index_bootstrap",
TypeInformation.of(HoodieRecord.class),
new BatchBootstrapOperator<>(conf))
.setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(defaultParallelism))
.setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(dataStream.getParallelism()))
.uid("uid_batch_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME));
}

Expand Down Expand Up @@ -315,11 +310,10 @@ public static DataStream<HoodieRecord> rowDataToHoodieRecord(Configuration conf,
* and flushes the data set to disk.
*
* @param conf The configuration
* @param defaultParallelism The default parallelism
* @param dataStream The input data stream
* @return the stream write data stream pipeline
*/
public static DataStream<Object> hoodieStreamWrite(Configuration conf, int defaultParallelism, DataStream<HoodieRecord> dataStream) {
public static DataStream<Object> hoodieStreamWrite(Configuration conf, DataStream<HoodieRecord> dataStream) {
if (OptionsResolver.isBucketIndexType(conf)) {
WriteOperatorFactory<HoodieRecord> operatorFactory = BucketStreamWriteOperator.getFactory(conf);
int bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
Expand All @@ -339,7 +333,7 @@ public static DataStream<Object> hoodieStreamWrite(Configuration conf, int defau
TypeInformation.of(HoodieRecord.class),
new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
.uid("uid_bucket_assigner_" + conf.getString(FlinkOptions.TABLE_NAME))
.setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(defaultParallelism))
.setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS))
// shuffle by fileId(bucket id)
.keyBy(record -> record.getCurrentLocation().getFileId())
.transform(opIdentifier("stream_write", conf), TypeInformation.of(Object.class), operatorFactory)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsInference;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.sink.transform.Transformer;
import org.apache.hudi.sink.utils.Pipelines;
Expand Down Expand Up @@ -76,7 +77,6 @@ public static void main(String[] args) throws Exception {
.getLogicalType();

long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
int parallelism = env.getParallelism();
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);

DataStream<RowData> dataStream = env.addSource(new FlinkKafkaConsumer<>(
Expand All @@ -98,8 +98,9 @@ public static void main(String[] args) throws Exception {
}
}

DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream);
DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
OptionsInference.setupSinkTasks(conf, env.getParallelism());
DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, dataStream);
DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, hoodieRecordDataStream);
if (OptionsResolver.needsAsyncCompaction(conf)) {
Pipelines.compact(conf, pipeline);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsInference;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.sink.utils.Pipelines;
import org.apache.hudi.util.ChangelogModes;
Expand Down Expand Up @@ -66,6 +67,8 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
long ckpTimeout = dataStream.getExecutionEnvironment()
.getCheckpointConfig().getCheckpointTimeout();
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
// set up default parallelism
OptionsInference.setupSinkTasks(conf, dataStream.getExecutionConfig().getParallelism());

RowType rowType = (RowType) schema.toSinkRowDataType().notNull().getLogicalType();

Expand All @@ -85,14 +88,12 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
}
}

// default parallelism
int parallelism = dataStream.getExecutionConfig().getParallelism();
DataStream<Object> pipeline;
// bootstrap
final DataStream<HoodieRecord> hoodieRecordDataStream =
Pipelines.bootstrap(conf, rowType, parallelism, dataStream, context.isBounded(), overwrite);
Pipelines.bootstrap(conf, rowType, dataStream, context.isBounded(), overwrite);
// write pipeline
pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
pipeline = Pipelines.hoodieStreamWrite(conf, hoodieRecordDataStream);
// compaction
if (OptionsResolver.needsAsyncCompaction(conf)) {
// use synchronous compaction for bounded source.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsInference;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.source.FileIndex;
Expand Down Expand Up @@ -179,6 +180,7 @@ public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv)
@SuppressWarnings("unchecked")
TypeInformation<RowData> typeInfo =
(TypeInformation<RowData>) TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType());
OptionsInference.setupSourceTasks(conf, execEnv.getParallelism());
if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
StreamReadMonitoringFunction monitoringFunction = new StreamReadMonitoringFunction(
conf, FilePathUtils.toFlinkPath(path), tableRowType, maxCompactionMemoryInBytes, getRequiredPartitionPaths());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsInference;
import org.apache.hudi.sink.transform.ChainedTransformer;
import org.apache.hudi.sink.transform.Transformer;
import org.apache.hudi.sink.utils.Pipelines;
Expand Down Expand Up @@ -239,9 +240,9 @@ private void testWriteToHoodie(
dataStream = transformer.get().apply(dataStream);
}

int parallelism = execEnv.getParallelism();
DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream);
DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
OptionsInference.setupSinkTasks(conf, execEnv.getParallelism());
DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, dataStream);
DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, hoodieRecordDataStream);
execEnv.addOperator(pipeline.getTransformation());

if (isMor) {
Expand Down Expand Up @@ -305,6 +306,7 @@ private void testWriteToHoodieWithCluster(
.setParallelism(4);
}

OptionsInference.setupSinkTasks(conf, execEnv.getParallelism());
DataStream<Object> pipeline = Pipelines.append(conf, rowType, dataStream, true);
execEnv.addOperator(pipeline.getTransformation());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void testHoodieFlinkClustering() throws Exception {
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
tableEnv.getConfig().getConfiguration()
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());

Expand Down Expand Up @@ -187,7 +187,7 @@ public void testHoodieFlinkClusteringService() throws Exception {
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
tableEnv.getConfig().getConfiguration()
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void testHoodieFlinkCompactor(boolean enableChangelog) throws Exception {
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
tableEnv.getConfig().getConfiguration()
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "false");
options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
Expand Down Expand Up @@ -173,7 +173,7 @@ public void testHoodieFlinkCompactorService(boolean enableChangelog) throws Exce
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
tableEnv.getConfig().getConfiguration()
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
Expand Down Expand Up @@ -214,7 +214,7 @@ public void testHoodieFlinkCompactorWithPlanSelectStrategy(boolean enableChangel
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
tableEnv.getConfig().getConfiguration()
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
Expand Down Expand Up @@ -294,7 +294,7 @@ public void testCompactionInBatchExecutionMode() throws Exception {
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
tableEnv.getConfig().getConfiguration()
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "2");
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
Expand Down
Loading