Skip to content

Commit

Permalink
[FLINK-35293][hive] Hive source supports dynamic parallelism inference
Browse files Browse the repository at this point in the history
  • Loading branch information
SinBex committed May 9, 2024
1 parent beb0b16 commit 263d2ba
Show file tree
Hide file tree
Showing 15 changed files with 605 additions and 42 deletions.
17 changes: 12 additions & 5 deletions docs/content/docs/connectors/table/hive/hive_read_write.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,16 +152,23 @@ following parameters in `TableConfig` (note that these parameters affect all sou
</thead>
<tbody>
<tr>
<td><h5>table.exec.hive.infer-source-parallelism</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>If is true, source parallelism is inferred according to splits number. If is false, parallelism of source are set by config.</td>
<td><h5>table.exec.hive.infer-source-parallelism.mode</h5></td>
<td style="word-wrap: break-word;">dynamic</td>
<td>InferMode</td>
<td>An option for selecting the hive source parallelism inference mode to infer parallelism according to splits number.
'static' represents static inference, which will infer source parallelism at job create stage.
'dynamic' represents dynamic inference, which will infer parallelism at job execution stage and could more accurately infer the source parallelism.
'none' represents disabling parallelism inference.
Note that it is still affected by the deprecated option 'table.exec.hive.infer-source-parallelism', requiring its value to be true for enabling parallelism inference.
</td>
</tr>
<tr>
<td><h5>table.exec.hive.infer-source-parallelism.max</h5></td>
<td style="word-wrap: break-word;">1000</td>
<td>Integer</td>
<td>Sets max infer parallelism for source operator.</td>
<td>Sets max infer parallelism for source operator.
Note that the default value is effective only in the static parallelism inference mode.
</td>
</tr>
</tbody>
</table>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ protected AbstractFileSource(
// Getters
// ------------------------------------------------------------------------

FileEnumerator.Provider getEnumeratorFactory() {
protected FileEnumerator.Provider getEnumeratorFactory() {
return enumeratorFactory;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ private boolean reachLimit() {
return globalNumberRead != null && globalNumberRead.get() >= limit;
}

public long getLimit() {
return this.limit;
}

@Override
public boolean isSplittable() {
return format.isSplittable();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connectors.hive;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.table.catalog.ObjectPath;

import org.apache.hadoop.mapred.JobConf;

/**
* The factory class for {@link HiveParallelismInference} to support Hive source dynamic parallelism
* inference.
*/
class HiveDynamicParallelismInferenceFactory implements HiveParallelismInference.Provider {

private final ObjectPath tablePath;
private final JobConf jobConf;
private final int globalMaxParallelism;

HiveDynamicParallelismInferenceFactory(
ObjectPath tablePath, JobConf jobConf, int globalMaxParallelism) {
this.tablePath = tablePath;
this.jobConf = jobConf;
this.globalMaxParallelism = globalMaxParallelism;
}

@Override
public HiveParallelismInference create() {
boolean inferEnabled =
jobConf.getBoolean(
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM.key(),
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM.defaultValue());
HiveOptions.InferMode inferMode =
jobConf.getEnum(
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE.key(),
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE.defaultValue());
// This logic should be fixed if config option `table.exec.hive.infer-source-parallelism`
// is deprecated.
boolean infer = inferEnabled && inferMode == HiveOptions.InferMode.DYNAMIC;
int inferMaxParallelism =
Math.min(
(int)
jobConf.getLong(
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX
.key(),
globalMaxParallelism),
globalMaxParallelism);
int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
return new HiveParallelismInference(tablePath, infer, inferMaxParallelism, parallelism);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public class HiveOptions {
"If it is true, flink will read the files of partitioned hive table from subdirectories under the partition directory to be read.\n"
+ "If it is false, an exception that 'not a file: xxx' will be thrown when the partition directory contains any sub-directory.");

/** @deprecated Use {@link #TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE} instead. */
@Deprecated
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM =
key("table.exec.hive.infer-source-parallelism")
.booleanType()
Expand All @@ -58,11 +60,32 @@ public class HiveOptions {
"If is false, parallelism of source are set by config.\n"
+ "If is true, source parallelism is inferred according to splits number.\n");

public static final ConfigOption<InferMode> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE =
key("table.exec.hive.infer-source-parallelism.mode")
.enumType(InferMode.class)
.defaultValue(InferMode.DYNAMIC)
.withDescription(
Description.builder()
.text(
"An option for selecting the hive source parallelism inference mode to infer parallelism according to splits number.")
.list(
text(
"'static' represents static inference, which will infer source parallelism at job create stage."),
text(
"'dynamic' represents dynamic inference, which will infer parallelism at job execution stage and could more accurately infer the source parallelism."),
text(
"'none' represents disabling parallelism inference."),
text(
"Note that it is still affected by the deprecated option 'table.exec.hive.infer-source-parallelism', requiring its value to be true for enabling parallelism inference."))
.build());

public static final ConfigOption<Integer> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX =
key("table.exec.hive.infer-source-parallelism.max")
.intType()
.defaultValue(1000)
.withDescription("Sets max infer parallelism for source operator.");
.withDescription(
"Sets max infer parallelism for source operator. "
+ "Note that the default value is effective only in the static parallelism inference mode.");

public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER =
key("table.exec.hive.fallback-mapred-writer")
Expand Down Expand Up @@ -281,4 +304,30 @@ public InlineElement getDescription() {
return description;
}
}

/** Infer mode used for {@link #TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE}. */
public enum InferMode implements DescribedEnum {
STATIC("static", text("Static parallelism inference mode.")),
DYNAMIC("dynamic", text("Dynamic parallelism inference mode.")),
NONE("none", text("Disable parallelism inference."));

private final String value;

private final InlineElement description;

InferMode(String value, InlineElement description) {
this.value = value;
this.description = description;
}

@Override
public String toString() {
return value;
}

@Override
public InlineElement getDescription() {
return description;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@

package org.apache.flink.connectors.hive;

import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;

import org.slf4j.Logger;
Expand All @@ -40,18 +37,12 @@ class HiveParallelismInference {

private int parallelism;

HiveParallelismInference(ObjectPath tablePath, ReadableConfig flinkConf) {
HiveParallelismInference(
ObjectPath tablePath, boolean infer, int inferMaxParallelism, int parallelism) {
this.tablePath = tablePath;
this.infer = flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM);
this.inferMaxParallelism =
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX);
Preconditions.checkArgument(
inferMaxParallelism >= 1,
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX.key()
+ " cannot be less than 1");

this.parallelism =
flinkConf.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM);
this.infer = infer;
this.inferMaxParallelism = inferMaxParallelism;
this.parallelism = parallelism;
}

/**
Expand All @@ -73,7 +64,8 @@ int limit(Long limit) {

/**
* Infer parallelism by number of files and number of splits. If {@link
* HiveOptions#TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM} is not set this method does nothing.
* HiveOptions#TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM} is false or {@link
* HiveOptions#TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE} is none, this method does nothing.
*/
HiveParallelismInference infer(
SupplierWithException<Integer, IOException> numFiles,
Expand Down Expand Up @@ -113,4 +105,15 @@ private int logRunningTime(
result);
return result;
}

/** Factory for the {@code HiveParallelismInference}. */
interface Provider {

/**
* Creates a new {@code HiveParallelismInference}.
*
* @return a new {@code HiveParallelismInference} with designated factors.
*/
HiveParallelismInference create();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.DynamicFilteringInfo;
import org.apache.flink.api.connector.source.DynamicParallelismInference;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.connector.file.src.AbstractFileSource;
Expand All @@ -29,10 +31,12 @@
import org.apache.flink.connector.file.src.enumerate.FileEnumerator;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.table.ContinuousPartitionFetcher;
import org.apache.flink.connector.file.table.LimitableBulkFormat;
import org.apache.flink.connectors.hive.read.HiveSourceSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.connector.source.DynamicFilteringEvent;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;

Expand All @@ -55,7 +59,8 @@
* @param <T> the type of record returned by this source
*/
@PublicEvolving
public class HiveSource<T> extends AbstractFileSource<T, HiveSourceSplit> {
public class HiveSource<T> extends AbstractFileSource<T, HiveSourceSplit>
implements DynamicParallelismInference {

private static final long serialVersionUID = 1L;

Expand All @@ -68,6 +73,7 @@ public class HiveSource<T> extends AbstractFileSource<T, HiveSourceSplit> {
private final ContinuousPartitionFetcher<Partition, ?> fetcher;
private final HiveTableSource.HiveContinuousPartitionFetcherContext<?> fetcherContext;
private final ObjectPath tablePath;
private Long limit = null;

HiveSource(
Path[] inputPaths,
Expand Down Expand Up @@ -97,6 +103,9 @@ public class HiveSource<T> extends AbstractFileSource<T, HiveSourceSplit> {
this.partitionBytes = partitionBytes;
this.fetcher = fetcher;
this.fetcherContext = fetcherContext;
if (readerFormat instanceof LimitableBulkFormat) {
limit = ((LimitableBulkFormat<?, ?>) readerFormat).getLimit();
}
}

@Override
Expand Down Expand Up @@ -186,4 +195,48 @@ private boolean continuousPartitionedEnumerator() {
jobConfWrapper),
getAssignerFactory());
}

@Override
public int inferParallelism(Context dynamicParallelismContext) {
FileEnumerator fileEnumerator;
List<HiveTablePartition> partitions;
if (dynamicFilterPartitionKeys != null) {
fileEnumerator =
new HiveSourceDynamicFileEnumerator.Provider(
tablePath.getFullName(),
dynamicFilterPartitionKeys,
partitionBytes,
hiveVersion,
jobConfWrapper)
.create();
if (dynamicParallelismContext.getDynamicFilteringInfo().isPresent()) {
DynamicFilteringInfo dynamicFilteringInfo =
dynamicParallelismContext.getDynamicFilteringInfo().get();
if (dynamicFilteringInfo instanceof DynamicFilteringEvent) {
((HiveSourceDynamicFileEnumerator) fileEnumerator)
.setDynamicFilteringData(
((DynamicFilteringEvent) dynamicFilteringInfo).getData());
}
}
partitions = ((HiveSourceDynamicFileEnumerator) fileEnumerator).getFinalPartitions();
} else {
fileEnumerator = getEnumeratorFactory().create();
partitions = ((HiveSourceFileEnumerator) fileEnumerator).getPartitions();
}

return new HiveDynamicParallelismInferenceFactory(
tablePath,
jobConfWrapper.conf(),
dynamicParallelismContext.getParallelismInferenceUpperBound())
.create()
.infer(
() ->
HiveSourceFileEnumerator.getNumFiles(
partitions, jobConfWrapper.conf()),
() ->
HiveSourceFileEnumerator.createInputSplits(
0, partitions, jobConfWrapper.conf(), true)
.size())
.limit(limit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredS
}

@VisibleForTesting
List<HiveTablePartition> getFinalPartitions() {
public List<HiveTablePartition> getFinalPartitions() {
return finalPartitions;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,10 @@ public static int getNumFiles(List<HiveTablePartition> partitions, JobConf jobCo
return numFiles;
}

public List<HiveTablePartition> getPartitions() {
return this.partitions;
}

private static long getSplitMaxSize(JobConf jobConf) {
return jobConf.getLong(
HiveOptions.TABLE_EXEC_HIVE_SPLIT_MAX_BYTES.key(),
Expand Down

0 comments on commit 263d2ba

Please sign in to comment.