diff --git a/docs/content/docs/connectors/table/hive/hive_read_write.md b/docs/content/docs/connectors/table/hive/hive_read_write.md index 98742a92be776a..5642db3009efcb 100644 --- a/docs/content/docs/connectors/table/hive/hive_read_write.md +++ b/docs/content/docs/connectors/table/hive/hive_read_write.md @@ -152,16 +152,23 @@ following parameters in `TableConfig` (note that these parameters affect all sou -
table.exec.hive.infer-source-parallelism
- true - Boolean - If is true, source parallelism is inferred according to splits number. If is false, parallelism of source are set by config. +
table.exec.hive.infer-source-parallelism.mode
+ dynamic + InferMode + An option for selecting the hive source parallelism inference mode to infer parallelism according to splits number. + 'static' represents static inference, which will infer source parallelism at job create stage. + 'dynamic' represents dynamic inference, which will infer parallelism at job execution stage and could more accurately infer the source parallelism. + 'none' represents disabling parallelism inference. + Note that it is still affected by the deprecated option 'table.exec.hive.infer-source-parallelism', requiring its value to be true for enabling parallelism inference. +
table.exec.hive.infer-source-parallelism.max
1000 Integer - Sets max infer parallelism for source operator. + Sets max infer parallelism for source operator. + Note that the default value is effective only in the static parallelism inference mode. + diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java index b14d46b3f9c350..c913317207035d 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java @@ -100,7 +100,7 @@ protected AbstractFileSource( // Getters // ------------------------------------------------------------------------ - FileEnumerator.Provider getEnumeratorFactory() { + protected FileEnumerator.Provider getEnumeratorFactory() { return enumeratorFactory; } diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/LimitableBulkFormat.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/LimitableBulkFormat.java index feaeb1d984288f..2d95f55787687a 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/LimitableBulkFormat.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/LimitableBulkFormat.java @@ -84,6 +84,10 @@ private boolean reachLimit() { return globalNumberRead != null && globalNumberRead.get() >= limit; } + public long getLimit() { + return this.limit; + } + @Override public boolean isSplittable() { return format.isSplittable(); diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicParallelismInferenceFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicParallelismInferenceFactory.java new file mode 100644 index 00000000000000..6aecd1e64487bc --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicParallelismInferenceFactory.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connectors.hive; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.table.catalog.ObjectPath; + +import org.apache.hadoop.mapred.JobConf; + +/** + * The factory class for {@link HiveParallelismInference} to support Hive source dynamic parallelism + * inference. + */ +class HiveDynamicParallelismInferenceFactory implements HiveParallelismInference.Provider { + + private final ObjectPath tablePath; + private final JobConf jobConf; + private final int globalMaxParallelism; + + HiveDynamicParallelismInferenceFactory( + ObjectPath tablePath, JobConf jobConf, int globalMaxParallelism) { + this.tablePath = tablePath; + this.jobConf = jobConf; + this.globalMaxParallelism = globalMaxParallelism; + } + + @Override + public HiveParallelismInference create() { + boolean inferEnabled = + jobConf.getBoolean( + HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM.key(), + HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM.defaultValue()); + HiveOptions.InferMode inferMode = + jobConf.getEnum( + HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE.key(), + HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE.defaultValue()); + // This logic should be fixed if config option `table.exec.hive.infer-source-parallelism` + // is deprecated. + boolean infer = inferEnabled && inferMode == HiveOptions.InferMode.DYNAMIC; + int inferMaxParallelism = + Math.min( + (int) + jobConf.getLong( + HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX + .key(), + globalMaxParallelism), + globalMaxParallelism); + int parallelism = ExecutionConfig.PARALLELISM_DEFAULT; + return new HiveParallelismInference(tablePath, infer, inferMaxParallelism, parallelism); + } +} diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java index 01e1493681d070..08c8ebf29fbca5 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java @@ -50,6 +50,8 @@ public class HiveOptions { "If it is true, flink will read the files of partitioned hive table from subdirectories under the partition directory to be read.\n" + "If it is false, an exception that 'not a file: xxx' will be thrown when the partition directory contains any sub-directory."); + /** @deprecated Use {@link #TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE} instead. */ + @Deprecated public static final ConfigOption TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM = key("table.exec.hive.infer-source-parallelism") .booleanType() @@ -58,11 +60,32 @@ public class HiveOptions { "If is false, parallelism of source are set by config.\n" + "If is true, source parallelism is inferred according to splits number.\n"); + public static final ConfigOption TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE = + key("table.exec.hive.infer-source-parallelism.mode") + .enumType(InferMode.class) + .defaultValue(InferMode.DYNAMIC) + .withDescription( + Description.builder() + .text( + "An option for selecting the hive source parallelism inference mode to infer parallelism according to splits number.") + .list( + text( + "'static' represents static inference, which will infer source parallelism at job create stage."), + text( + "'dynamic' represents dynamic inference, which will infer parallelism at job execution stage and could more accurately infer the source parallelism."), + text( + "'none' represents disabling parallelism inference."), + text( + "Note that it is still affected by the deprecated option 'table.exec.hive.infer-source-parallelism', requiring its value to be true for enabling parallelism inference.")) + .build()); + public static final ConfigOption TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX = key("table.exec.hive.infer-source-parallelism.max") .intType() .defaultValue(1000) - .withDescription("Sets max infer parallelism for source operator."); + .withDescription( + "Sets max infer parallelism for source operator. " + + "Note that the default value is effective only in the static parallelism inference mode."); public static final ConfigOption TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER = key("table.exec.hive.fallback-mapred-writer") @@ -281,4 +304,30 @@ public InlineElement getDescription() { return description; } } + + /** Infer mode used for {@link #TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE}. */ + public enum InferMode implements DescribedEnum { + STATIC("static", text("Static parallelism inference mode.")), + DYNAMIC("dynamic", text("Dynamic parallelism inference mode.")), + NONE("none", text("Disable parallelism inference.")); + + private final String value; + + private final InlineElement description; + + InferMode(String value, InlineElement description) { + this.value = value; + this.description = description; + } + + @Override + public String toString() { + return value; + } + + @Override + public InlineElement getDescription() { + return description; + } + } } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveParallelismInference.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveParallelismInference.java index 850008fe5d273a..e46eb7f0d7bcf2 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveParallelismInference.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveParallelismInference.java @@ -18,10 +18,7 @@ package org.apache.flink.connectors.hive; -import org.apache.flink.configuration.ReadableConfig; -import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.catalog.ObjectPath; -import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.SupplierWithException; import org.slf4j.Logger; @@ -40,18 +37,12 @@ class HiveParallelismInference { private int parallelism; - HiveParallelismInference(ObjectPath tablePath, ReadableConfig flinkConf) { + HiveParallelismInference( + ObjectPath tablePath, boolean infer, int inferMaxParallelism, int parallelism) { this.tablePath = tablePath; - this.infer = flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM); - this.inferMaxParallelism = - flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX); - Preconditions.checkArgument( - inferMaxParallelism >= 1, - HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX.key() - + " cannot be less than 1"); - - this.parallelism = - flinkConf.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM); + this.infer = infer; + this.inferMaxParallelism = inferMaxParallelism; + this.parallelism = parallelism; } /** @@ -73,7 +64,8 @@ int limit(Long limit) { /** * Infer parallelism by number of files and number of splits. If {@link - * HiveOptions#TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM} is not set this method does nothing. + * HiveOptions#TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM} is false or {@link + * HiveOptions#TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE} is none, this method does nothing. */ HiveParallelismInference infer( SupplierWithException numFiles, @@ -113,4 +105,15 @@ private int logRunningTime( result); return result; } + + /** Factory for the {@code HiveParallelismInference}. */ + interface Provider { + + /** + * Creates a new {@code HiveParallelismInference}. + * + * @return a new {@code HiveParallelismInference} with designated factors. + */ + HiveParallelismInference create(); + } } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java index 5b768b0320aebe..6c423e747c848c 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java @@ -20,6 +20,8 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.DynamicFilteringInfo; +import org.apache.flink.api.connector.source.DynamicParallelismInference; import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.connector.file.src.AbstractFileSource; @@ -29,10 +31,12 @@ import org.apache.flink.connector.file.src.enumerate.FileEnumerator; import org.apache.flink.connector.file.src.reader.BulkFormat; import org.apache.flink.connector.file.table.ContinuousPartitionFetcher; +import org.apache.flink.connector.file.table.LimitableBulkFormat; import org.apache.flink.connectors.hive.read.HiveSourceSplit; import org.apache.flink.core.fs.Path; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.connector.source.DynamicFilteringEvent; import org.apache.flink.table.data.RowData; import org.apache.flink.util.Preconditions; @@ -55,7 +59,8 @@ * @param the type of record returned by this source */ @PublicEvolving -public class HiveSource extends AbstractFileSource { +public class HiveSource extends AbstractFileSource + implements DynamicParallelismInference { private static final long serialVersionUID = 1L; @@ -68,6 +73,7 @@ public class HiveSource extends AbstractFileSource { private final ContinuousPartitionFetcher fetcher; private final HiveTableSource.HiveContinuousPartitionFetcherContext fetcherContext; private final ObjectPath tablePath; + private Long limit = null; HiveSource( Path[] inputPaths, @@ -97,6 +103,9 @@ public class HiveSource extends AbstractFileSource { this.partitionBytes = partitionBytes; this.fetcher = fetcher; this.fetcherContext = fetcherContext; + if (readerFormat instanceof LimitableBulkFormat) { + limit = ((LimitableBulkFormat) readerFormat).getLimit(); + } } @Override @@ -186,4 +195,48 @@ private boolean continuousPartitionedEnumerator() { jobConfWrapper), getAssignerFactory()); } + + @Override + public int inferParallelism(Context dynamicParallelismContext) { + FileEnumerator fileEnumerator; + List partitions; + if (dynamicFilterPartitionKeys != null) { + fileEnumerator = + new HiveSourceDynamicFileEnumerator.Provider( + tablePath.getFullName(), + dynamicFilterPartitionKeys, + partitionBytes, + hiveVersion, + jobConfWrapper) + .create(); + if (dynamicParallelismContext.getDynamicFilteringInfo().isPresent()) { + DynamicFilteringInfo dynamicFilteringInfo = + dynamicParallelismContext.getDynamicFilteringInfo().get(); + if (dynamicFilteringInfo instanceof DynamicFilteringEvent) { + ((HiveSourceDynamicFileEnumerator) fileEnumerator) + .setDynamicFilteringData( + ((DynamicFilteringEvent) dynamicFilteringInfo).getData()); + } + } + partitions = ((HiveSourceDynamicFileEnumerator) fileEnumerator).getFinalPartitions(); + } else { + fileEnumerator = getEnumeratorFactory().create(); + partitions = ((HiveSourceFileEnumerator) fileEnumerator).getPartitions(); + } + + return new HiveDynamicParallelismInferenceFactory( + tablePath, + jobConfWrapper.conf(), + dynamicParallelismContext.getParallelismInferenceUpperBound()) + .create() + .infer( + () -> + HiveSourceFileEnumerator.getNumFiles( + partitions, jobConfWrapper.conf()), + () -> + HiveSourceFileEnumerator.createInputSplits( + 0, partitions, jobConfWrapper.conf(), true) + .size()) + .limit(limit); + } } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceDynamicFileEnumerator.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceDynamicFileEnumerator.java index 4d84fd5e048746..a1a62c813b5245 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceDynamicFileEnumerator.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceDynamicFileEnumerator.java @@ -178,7 +178,7 @@ public Collection enumerateSplits(Path[] paths, int minDesiredS } @VisibleForTesting - List getFinalPartitions() { + public List getFinalPartitions() { return finalPartitions; } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceFileEnumerator.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceFileEnumerator.java index c30cd1c8e1b979..500fe54d97981b 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceFileEnumerator.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceFileEnumerator.java @@ -202,6 +202,10 @@ public static int getNumFiles(List partitions, JobConf jobCo return numFiles; } + public List getPartitions() { + return this.partitions; + } + private static long getSplitMaxSize(JobConf jobConf) { return jobConf.getLong( HiveOptions.TABLE_EXEC_HIVE_SPLIT_MAX_BYTES.key(), diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveStaticParallelismInferenceFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveStaticParallelismInferenceFactory.java new file mode 100644 index 00000000000000..0b7c2da968ff20 --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveStaticParallelismInferenceFactory.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connectors.hive; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.util.Preconditions; + +/** + * The factory class for {@link HiveParallelismInference} to support Hive source static parallelism + * inference. + */ +class HiveStaticParallelismInferenceFactory implements HiveParallelismInference.Provider { + + private final ObjectPath tablePath; + private final ReadableConfig flinkConf; + + HiveStaticParallelismInferenceFactory(ObjectPath tablePath, ReadableConfig flinkConf) { + this.tablePath = tablePath; + this.flinkConf = flinkConf; + } + + @Override + public HiveParallelismInference create() { + boolean inferEnabled = flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM); + HiveOptions.InferMode inferMode = + flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE); + // This logic should be fixed if config option `table.exec.hive.infer-source-parallelism` + // is deprecated. + boolean infer = inferEnabled && inferMode == HiveOptions.InferMode.STATIC; + int inferMaxParallelism = + flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX); + Preconditions.checkArgument( + inferMaxParallelism >= 1, + HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX.key() + + " cannot be less than 1"); + int parallelism = + flinkConf.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM); + // Keeping the parallelism unset is a prerequisite for dynamic parallelism inference. + if (inferEnabled && inferMode == HiveOptions.InferMode.DYNAMIC) { + parallelism = ExecutionConfig.PARALLELISM_DEFAULT; + } + + return new HiveParallelismInference(tablePath, infer, inferMaxParallelism, parallelism); + } +} diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java index 919313002176f2..f174d107cb06e1 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java @@ -171,7 +171,8 @@ protected DataStream getDataStream( remainingPartitions); int parallelism = - new HiveParallelismInference(tablePath, flinkConf) + new HiveStaticParallelismInferenceFactory(tablePath, flinkConf) + .create() .infer( () -> HiveSourceFileEnumerator.getNumFiles( diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceDynamicFileEnumeratorTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceDynamicFileEnumeratorTest.java index 93ddb42b33939b..58c7b0b2170922 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceDynamicFileEnumeratorTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceDynamicFileEnumeratorTest.java @@ -18,10 +18,8 @@ package org.apache.flink.connectors.hive; -import org.apache.flink.api.common.serialization.SerializerConfigImpl; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.table.catalog.hive.client.HiveShimLoader; import org.apache.flink.table.connector.source.DynamicFilteringData; import org.apache.flink.table.data.GenericRowData; @@ -47,8 +45,6 @@ import org.apache.hadoop.mapred.JobConf; import org.junit.jupiter.api.Test; -import java.io.ByteArrayOutputStream; -import java.io.IOException; import java.time.LocalDate; import java.time.LocalDateTime; import java.util.ArrayList; @@ -60,6 +56,7 @@ import java.util.Properties; import java.util.stream.Collectors; +import static org.apache.flink.table.catalog.hive.HiveTestUtils.serialize; import static org.apache.flink.table.data.StringData.fromString; import static org.assertj.core.api.Assertions.assertThat; @@ -215,16 +212,4 @@ private HiveSourceDynamicFileEnumerator createTestEnumerator( HiveShimLoader.getHiveVersion(), new JobConf()); } - - private byte[] serialize(TypeInformation typeInfo, RowData row) { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - try { - typeInfo.createSerializer(new SerializerConfigImpl()) - .serialize(row, new DataOutputViewStreamWrapper(baos)); - } catch (IOException e) { - // throw as RuntimeException so the function can use in lambda - throw new RuntimeException(e); - } - return baos.toByteArray(); - } } diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceITCase.java index 8cb63de76b063e..1fb37bacddbb14 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceITCase.java @@ -50,7 +50,7 @@ import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; import static org.assertj.core.api.Assertions.assertThat; -/** Tests for {@link HiveSource}. */ +/** IT case for {@link HiveSource}. */ public class HiveSourceITCase { private static HiveCatalog hiveCatalog; diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceTest.java new file mode 100644 index 00000000000000..3fb91c45035034 --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceTest.java @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connectors.hive; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.DynamicFilteringInfo; +import org.apache.flink.api.connector.source.DynamicParallelismInference; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.hive.HiveCatalog; +import org.apache.flink.table.catalog.hive.HiveTestUtils; +import org.apache.flink.table.catalog.hive.client.HiveShimLoader; +import org.apache.flink.table.connector.source.DynamicFilteringData; +import org.apache.flink.table.connector.source.DynamicFilteringEvent; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.hadoop.mapred.JobConf; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +import static org.apache.flink.table.catalog.hive.util.Constants.IDENTIFIER; +import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link HiveSource}. */ +public class HiveSourceTest { + + private static HiveCatalog hiveCatalog; + + private static final List keys = Collections.singletonList("p"); + + private static final List> partitionSpecs = + Arrays.asList( + Collections.singletonMap("p", "1"), + Collections.singletonMap("p", "2"), + Collections.singletonMap("p", "3")); + + @BeforeClass + public static void setup() throws Exception { + hiveCatalog = HiveTestUtils.createHiveCatalog(); + hiveCatalog.open(); + } + + @AfterClass + public static void tearDown() { + if (hiveCatalog != null) { + hiveCatalog.close(); + } + } + + @Test + public void testDynamicParallelismInference() throws Exception { + // test non-partitioned table + ObjectPath tablePath1 = new ObjectPath("default", "hiveNonPartTbl"); + createTable(tablePath1, hiveCatalog, false); + + HiveSource hiveSource = + new HiveSourceBuilder( + new JobConf(hiveCatalog.getHiveConf()), + new Configuration(), + HiveShimLoader.getHiveVersion(), + tablePath1.getDatabaseName(), + tablePath1.getObjectName(), + Collections.emptyMap()) + .buildWithDefaultBulkFormat(); + + DynamicParallelismInference.Context context = + genDynamicParallelismContext(10, Collections.emptyList()); + assertThat(hiveSource.inferParallelism(context)).isEqualTo(1); + + hiveCatalog.dropTable(tablePath1, false); + + // test partitioned table + ObjectPath tablePath2 = new ObjectPath("default", "hiveTbl1"); + createTable(tablePath2, hiveCatalog, true); + + hiveSource = + new HiveSourceBuilder( + new JobConf(hiveCatalog.getHiveConf()), + new Configuration(), + HiveShimLoader.getHiveVersion(), + tablePath2.getDatabaseName(), + tablePath2.getObjectName(), + Collections.emptyMap()) + .setPartitions( + partitionSpecs.stream() + .map( + spec -> + HiveTablePartition.ofPartition( + hiveCatalog.getHiveConf(), + hiveCatalog.getHiveVersion(), + tablePath2.getDatabaseName(), + tablePath2.getObjectName(), + new LinkedHashMap<>(spec))) + .collect(Collectors.toList())) + .buildWithDefaultBulkFormat(); + + // test inferred parallelism less than maxParallelism + context = genDynamicParallelismContext(10, Collections.emptyList()); + assertThat(hiveSource.inferParallelism(context)).isEqualTo(3); + + // test inferred parallelism larger than maxParallelism + context = genDynamicParallelismContext(2, Collections.emptyList()); + assertThat(hiveSource.inferParallelism(context)).isEqualTo(2); + + hiveCatalog.dropTable(tablePath2, false); + } + + @Test + public void testDynamicParallelismInferenceWithLimit() throws Exception { + ObjectPath tablePath = new ObjectPath("default", "hiveTbl2"); + createTable(tablePath, hiveCatalog, true); + + HiveSource hiveSource = + new HiveSourceBuilder( + new JobConf(hiveCatalog.getHiveConf()), + new Configuration(), + HiveShimLoader.getHiveVersion(), + tablePath.getDatabaseName(), + tablePath.getObjectName(), + Collections.emptyMap()) + .setPartitions( + partitionSpecs.stream() + .map( + spec -> + HiveTablePartition.ofPartition( + hiveCatalog.getHiveConf(), + hiveCatalog.getHiveVersion(), + tablePath.getDatabaseName(), + tablePath.getObjectName(), + new LinkedHashMap<>(spec))) + .collect(Collectors.toList())) + .setLimit(1L) + .buildWithDefaultBulkFormat(); + + // test inferred parallelism less than maxParallelism + DynamicParallelismInference.Context context = + genDynamicParallelismContext(10, Collections.emptyList()); + assertThat(hiveSource.inferParallelism(context)).isEqualTo(1); + + hiveCatalog.dropTable(tablePath, false); + } + + @Test + public void testDynamicParallelismInferenceWithFiltering() throws Exception { + ObjectPath tablePath = new ObjectPath("default", "hiveTbl3"); + createTable(tablePath, hiveCatalog, true); + + HiveSource hiveSource = + new HiveSourceBuilder( + new JobConf(hiveCatalog.getHiveConf()), + new Configuration(), + HiveShimLoader.getHiveVersion(), + tablePath.getDatabaseName(), + tablePath.getObjectName(), + Collections.emptyMap()) + .setDynamicFilterPartitionKeys(keys) + .setPartitions( + partitionSpecs.stream() + .map( + spec -> + HiveTablePartition.ofPartition( + hiveCatalog.getHiveConf(), + hiveCatalog.getHiveVersion(), + tablePath.getDatabaseName(), + tablePath.getObjectName(), + new LinkedHashMap<>(spec))) + .collect(Collectors.toList())) + .buildWithDefaultBulkFormat(); + + DynamicParallelismInference.Context context = + genDynamicParallelismContext(10, Arrays.asList(1, 2)); + + assertThat(hiveSource.inferParallelism(context)).isEqualTo(2); + hiveCatalog.dropTable(tablePath, false); + } + + private void createTable(ObjectPath tablePath, HiveCatalog hiveCatalog, boolean isPartitioned) + throws Exception { + Map tableOptions = new HashMap<>(); + tableOptions.put(CONNECTOR.key(), IDENTIFIER); + + List partitionTableColumns = new ArrayList<>(); + partitionTableColumns.add(Column.physical("i", DataTypes.INT())); + if (isPartitioned) { + HiveSourceTest.keys.stream() + .map(key -> Column.physical(key, DataTypes.STRING())) + .forEach(partitionTableColumns::add); + } + ResolvedSchema partitionTableRSchema = ResolvedSchema.of(partitionTableColumns); + + hiveCatalog.createTable( + tablePath, + new ResolvedCatalogTable( + CatalogTable.of( + Schema.newBuilder() + .fromResolvedSchema(partitionTableRSchema) + .build(), + null, + isPartitioned ? keys : Collections.emptyList(), + tableOptions), + partitionTableRSchema), + false); + + if (isPartitioned) { + HiveSourceTest.partitionSpecs.forEach( + spec -> { + try { + HiveTestUtils.createTextTableInserter( + hiveCatalog, + tablePath.getDatabaseName(), + tablePath.getObjectName()) + .addRow(new Object[] {1}) + .addRow(new Object[] {2}) + .commit( + spec.keySet().iterator().next() + + "='" + + spec.values().iterator().next() + + "'"); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } else { + HiveTestUtils.createTextTableInserter( + hiveCatalog, tablePath.getDatabaseName(), tablePath.getObjectName()) + .addRow(new Object[] {1}) + .addRow(new Object[] {2}) + .commit(); + } + } + + private DynamicParallelismInference.Context genDynamicParallelismContext( + int maxParallelism, List filteringPartitions) { + return new DynamicParallelismInference.Context() { + @Override + public Optional getDynamicFilteringInfo() { + RowType rowType = RowType.of(new IntType()); + TypeInformation rowTypeInfo = InternalTypeInfo.of(rowType); + if (!filteringPartitions.isEmpty()) { + List serializedRows = + filteringPartitions.stream() + .map( + key -> { + GenericRowData filteringRow = new GenericRowData(1); + filteringRow.setField(0, key); + return HiveTestUtils.serialize( + rowTypeInfo, filteringRow); + }) + .collect(Collectors.toList()); + + DynamicFilteringData data = + new DynamicFilteringData( + InternalTypeInfo.of(rowType), rowType, serializedRows, true); + return Optional.of(new DynamicFilteringEvent(data)); + } else { + return Optional.empty(); + } + } + + @Override + public int getParallelismInferenceUpperBound() { + return maxParallelism; + } + + @Override + public long getDataVolumePerTask() { + return 10L; + } + }; + } +} diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java index 2cefd641ae909d..1a9e06764487ba 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java @@ -18,10 +18,13 @@ package org.apache.flink.table.catalog.hive; +import org.apache.flink.api.common.serialization.SerializerConfigImpl; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.BatchExecutionOptions; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.MemorySize; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Schema; @@ -33,6 +36,7 @@ import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.catalog.hive.client.HiveShimLoader; +import org.apache.flink.table.data.RowData; import org.apache.flink.table.delegation.Parser; import org.apache.flink.table.operations.ddl.AddPartitionsOperation; import org.apache.flink.table.types.DataType; @@ -47,6 +51,7 @@ import org.junit.rules.TemporaryFolder; import java.io.BufferedWriter; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileWriter; import java.io.IOException; @@ -204,6 +209,18 @@ public static TextTableInserter createTextTableInserter( return new TextTableInserter(hiveCatalog, dbName, tableName); } + public static byte[] serialize(TypeInformation typeInfo, RowData row) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try { + typeInfo.createSerializer(new SerializerConfigImpl()) + .serialize(row, new DataOutputViewStreamWrapper(baos)); + } catch (IOException e) { + // throw as RuntimeException so the function can use in lambda + throw new RuntimeException(e); + } + return baos.toByteArray(); + } + /** insert table operation. */ public static class TextTableInserter {