diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java index 96f30455b5d68..763c9cbc07e13 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java @@ -20,6 +20,7 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.connectors.hive.util.JobConfUtils; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.hive.HiveCatalog; @@ -65,82 +66,72 @@ public Set> optionalOptions() { @Override public DynamicTableSink createDynamicTableSink(Context context) { - boolean isHiveTable = HiveCatalog.isHiveTable(context.getCatalogTable().getOptions()); + final boolean isHiveTable = HiveCatalog.isHiveTable(context.getCatalogTable().getOptions()); // we don't support temporary hive tables yet - if (isHiveTable && !context.isTemporary()) { - Integer configuredParallelism = - Configuration.fromMap(context.getCatalogTable().getOptions()) - .get(FileSystemConnectorOptions.SINK_PARALLELISM); - JobConf jobConf = JobConfUtils.createJobConfWithCredentials(hiveConf); - return new HiveTableSink( - context.getConfiguration(), - jobConf, - context.getObjectIdentifier(), - context.getCatalogTable(), - configuredParallelism); - } else { - return FactoryUtil.createTableSink( - null, // we already in the factory of catalog + if (!isHiveTable || context.isTemporary()) { + return FactoryUtil.createDynamicTableSink( + null, context.getObjectIdentifier(), context.getCatalogTable(), context.getConfiguration(), context.getClassLoader(), context.isTemporary()); } + + final Integer configuredParallelism = + Configuration.fromMap(context.getCatalogTable().getOptions()) + .get(FileSystemConnectorOptions.SINK_PARALLELISM); + final JobConf jobConf = JobConfUtils.createJobConfWithCredentials(hiveConf); + return new HiveTableSink( + context.getConfiguration(), + jobConf, + context.getObjectIdentifier(), + context.getCatalogTable(), + configuredParallelism); } @Override public DynamicTableSource createDynamicTableSource(Context context) { - boolean isHiveTable = HiveCatalog.isHiveTable(context.getCatalogTable().getOptions()); + final ReadableConfig configuration = + Configuration.fromMap(context.getCatalogTable().getOptions()); - // we don't support temporary hive tables yet - if (isHiveTable && !context.isTemporary()) { - CatalogTable catalogTable = Preconditions.checkNotNull(context.getCatalogTable()); - - boolean isStreamingSource = - Boolean.parseBoolean( - catalogTable - .getOptions() - .getOrDefault( - STREAMING_SOURCE_ENABLE.key(), - STREAMING_SOURCE_ENABLE.defaultValue().toString())); - - boolean includeAllPartition = - STREAMING_SOURCE_PARTITION_INCLUDE - .defaultValue() - .equals( - catalogTable - .getOptions() - .getOrDefault( - STREAMING_SOURCE_PARTITION_INCLUDE.key(), - STREAMING_SOURCE_PARTITION_INCLUDE - .defaultValue())); - JobConf jobConf = JobConfUtils.createJobConfWithCredentials(hiveConf); - // hive table source that has not lookup ability - if (isStreamingSource && includeAllPartition) { - return new HiveTableSource( - jobConf, - context.getConfiguration(), - context.getObjectIdentifier().toObjectPath(), - catalogTable); - } else { - // hive table source that has scan and lookup ability - return new HiveLookupTableSource( - jobConf, - context.getConfiguration(), - context.getObjectIdentifier().toObjectPath(), - catalogTable); - } + final boolean isHiveTable = HiveCatalog.isHiveTable(context.getCatalogTable().getOptions()); - } else { - return FactoryUtil.createTableSource( - null, // we already in the factory of catalog + // we don't support temporary hive tables yet + if (!isHiveTable || context.isTemporary()) { + return FactoryUtil.createDynamicTableSource( + null, context.getObjectIdentifier(), context.getCatalogTable(), context.getConfiguration(), context.getClassLoader(), context.isTemporary()); } + + final CatalogTable catalogTable = Preconditions.checkNotNull(context.getCatalogTable()); + + final boolean isStreamingSource = configuration.get(STREAMING_SOURCE_ENABLE); + final boolean includeAllPartition = + STREAMING_SOURCE_PARTITION_INCLUDE + .defaultValue() + .equals(configuration.get(STREAMING_SOURCE_PARTITION_INCLUDE)); + final JobConf jobConf = JobConfUtils.createJobConfWithCredentials(hiveConf); + + // hive table source that has not lookup ability + if (isStreamingSource && includeAllPartition) { + return new HiveTableSource( + jobConf, + context.getConfiguration(), + context.getObjectIdentifier().toObjectPath(), + catalogTable); + } else { + // hive table source that has scan and lookup ability + return new HiveLookupTableSource( + jobConf, + context.getConfiguration(), + context.getObjectIdentifier().toObjectPath(), + catalogTable); + } } } diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicTableFactoryTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicTableFactoryTest.java index bbd839b4da38e..f7d9dae316532 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicTableFactoryTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicTableFactoryTest.java @@ -29,6 +29,8 @@ import org.apache.flink.table.catalog.hive.HiveTestUtils; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.filesystem.FileSystemConnectorOptions.PartitionOrder; import org.apache.flink.table.filesystem.FileSystemLookupFunction; @@ -282,8 +284,9 @@ private DynamicTableSource getTableSource(String tableName) throws Exception { ObjectIdentifier.of(hiveCatalog.getName(), "default", tableName); CatalogTable catalogTable = (CatalogTable) hiveCatalog.getTable(tableIdentifier.toObjectPath()); - return FactoryUtil.createTableSource( - hiveCatalog, + return FactoryUtil.createDynamicTableSource( + (DynamicTableSourceFactory) + hiveCatalog.getFactory().orElseThrow(IllegalStateException::new), tableIdentifier, tableEnvInternal.getCatalogManager().resolveCatalogTable(catalogTable), tableEnv.getConfig().getConfiguration(), @@ -297,8 +300,9 @@ private DynamicTableSink getTableSink(String tableName) throws Exception { ObjectIdentifier.of(hiveCatalog.getName(), "default", tableName); CatalogTable catalogTable = (CatalogTable) hiveCatalog.getTable(tableIdentifier.toObjectPath()); - return FactoryUtil.createTableSink( - hiveCatalog, + return FactoryUtil.createDynamicTableSink( + (DynamicTableSinkFactory) + hiveCatalog.getFactory().orElseThrow(IllegalStateException::new), tableIdentifier, tableEnvInternal.getCatalogManager().resolveCatalogTable(catalogTable), tableEnv.getConfig().getConfiguration(), diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinITCase.java index dd4c82bd11885..e82444e7578a4 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinITCase.java @@ -30,6 +30,7 @@ import org.apache.flink.table.catalog.hive.HiveTestUtils; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.filesystem.FileSystemConnectorOptions; import org.apache.flink.table.filesystem.FileSystemLookupFunction; @@ -360,8 +361,11 @@ private FileSystemLookupFunction getLookupFunction(String ta (CatalogTable) hiveCatalog.getTable(tableIdentifier.toObjectPath()); HiveLookupTableSource hiveTableSource = (HiveLookupTableSource) - FactoryUtil.createTableSource( - hiveCatalog, + FactoryUtil.createDynamicTableSource( + (DynamicTableSourceFactory) + hiveCatalog + .getFactory() + .orElseThrow(IllegalStateException::new), tableIdentifier, tableEnvInternal .getCatalogManager() diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableFactoryTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableFactoryTest.java index 592e1d25708c1..506dafbc491bd 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableFactoryTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableFactoryTest.java @@ -34,6 +34,8 @@ import org.apache.flink.table.catalog.hive.HiveTestUtils; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.factories.TableFactory; import org.apache.flink.table.factories.TableSinkFactoryContextImpl; @@ -47,6 +49,7 @@ import org.junit.BeforeClass; import org.junit.Test; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -70,23 +73,24 @@ public static void close() { @Test public void testGenericTable() throws Exception { - TableSchema schema = + final TableSchema schema = TableSchema.builder() .field("name", DataTypes.STRING()) .field("age", DataTypes.INT()) .build(); - Map properties = new HashMap<>(); - properties.put(FactoryUtil.CONNECTOR.key(), "COLLECTION"); - catalog.createDatabase("mydb", new CatalogDatabaseImpl(new HashMap<>(), ""), true); - ObjectPath path = new ObjectPath("mydb", "mytable"); - CatalogTable table = new CatalogTableImpl(schema, properties, "csv table"); - catalog.createTable(path, table, true); - Optional opt = catalog.getTableFactory(); - assertTrue(opt.isPresent()); - HiveTableFactory tableFactory = (HiveTableFactory) opt.get(); - TableSource tableSource = + + final Map options = + Collections.singletonMap(FactoryUtil.CONNECTOR.key(), "COLLECTION"); + final CatalogTable table = new CatalogTableImpl(schema, options, "csv table"); + catalog.createTable(new ObjectPath("mydb", "mytable"), table, true); + + final Optional tableFactoryOpt = catalog.getTableFactory(); + assertTrue(tableFactoryOpt.isPresent()); + final HiveTableFactory tableFactory = (HiveTableFactory) tableFactoryOpt.get(); + + final TableSource tableSource = tableFactory.createTableSource( new TableSourceFactoryContextImpl( ObjectIdentifier.of("mycatalog", "mydb", "mytable"), @@ -94,7 +98,8 @@ public void testGenericTable() throws Exception { new Configuration(), false)); assertTrue(tableSource instanceof StreamTableSource); - TableSink tableSink = + + final TableSink tableSink = tableFactory.createTableSink( new TableSinkFactoryContextImpl( ObjectIdentifier.of("mycatalog", "mydb", "mytable"), @@ -107,24 +112,24 @@ public void testGenericTable() throws Exception { @Test public void testHiveTable() throws Exception { - ResolvedSchema schema = + final ResolvedSchema schema = ResolvedSchema.of( Column.physical("name", DataTypes.STRING()), Column.physical("age", DataTypes.INT())); - Map properties = new HashMap<>(); - properties.put(FactoryUtil.CONNECTOR.key(), SqlCreateHiveTable.IDENTIFIER); - catalog.createDatabase("mydb", new CatalogDatabaseImpl(new HashMap<>(), ""), true); - ObjectPath path = new ObjectPath("mydb", "mytable"); - CatalogTable table = - new CatalogTableImpl( - TableSchema.fromResolvedSchema(schema), properties, "hive table"); - catalog.createTable(path, table, true); - - DynamicTableSource tableSource = - FactoryUtil.createTableSource( - catalog, + + final Map options = + Collections.singletonMap( + FactoryUtil.CONNECTOR.key(), SqlCreateHiveTable.IDENTIFIER); + final CatalogTable table = + new CatalogTableImpl(TableSchema.fromResolvedSchema(schema), options, "hive table"); + catalog.createTable(new ObjectPath("mydb", "mytable"), table, true); + + final DynamicTableSource tableSource = + FactoryUtil.createDynamicTableSource( + (DynamicTableSourceFactory) + catalog.getFactory().orElseThrow(IllegalStateException::new), ObjectIdentifier.of("mycatalog", "mydb", "mytable"), new ResolvedCatalogTable(table, schema), new Configuration(), @@ -132,9 +137,10 @@ public void testHiveTable() throws Exception { false); assertTrue(tableSource instanceof HiveTableSource); - DynamicTableSink tableSink = - FactoryUtil.createTableSink( - catalog, + final DynamicTableSink tableSink = + FactoryUtil.createDynamicTableSink( + (DynamicTableSinkFactory) + catalog.getFactory().orElseThrow(IllegalStateException::new), ObjectIdentifier.of("mycatalog", "mydb", "mytable"), new ResolvedCatalogTable(table, schema), new Configuration(), diff --git a/flink-core/src/main/java/org/apache/flink/util/OptionalUtils.java b/flink-core/src/main/java/org/apache/flink/util/OptionalUtils.java new file mode 100644 index 0000000000000..29862537b1a15 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/util/OptionalUtils.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.apache.flink.annotation.Internal; + +import java.util.Optional; +import java.util.stream.Stream; + +/** Utilities for working with {@link Optional}. */ +@Internal +public class OptionalUtils { + + /** + * Converts the given {@link Optional} into a {@link Stream}. + * + *

This is akin to {@code Optional#stream} available in JDK9+. + */ + @SuppressWarnings("OptionalUsedAsFieldOrParameterType") + public static Stream stream(Optional opt) { + return opt.map(Stream::of).orElseGet(Stream::empty); + } + + /** Returns the first {@link Optional} which is present. */ + @SafeVarargs + public static Optional firstPresent(Optional... opts) { + for (Optional opt : opts) { + if (opt.isPresent()) { + return opt; + } + } + + return Optional.empty(); + } + + private OptionalUtils() {} +} diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java index 84557f6c2453e..49e8853cc6798 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java @@ -135,7 +135,12 @@ private StreamTableEnvironment createStreamTableEnvironment( final Planner planner = PlannerFactoryUtil.createPlanner( - settings.getPlanner(), executor, config, catalogManager, functionCatalog); + settings.getPlanner(), + executor, + config, + moduleManager, + catalogManager, + functionCatalog); return new StreamTableEnvironmentImpl( catalogManager, diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java index 02f78bd67800f..cbd4840151d6d 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java @@ -144,7 +144,7 @@ public void testErrorMessage() throws Exception { String[] errorStack = new String[] { "at org.apache.flink.table.factories.FactoryUtil.discoverFactory", - "at org.apache.flink.table.factories.FactoryUtil.createTableSource" + "at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource" }; for (String stack : errorStack) { assertThat(output, not(containsString(stack))); @@ -165,7 +165,7 @@ public void testVerboseErrorMessage() throws Exception { new String[] { "org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'invalid'", "at org.apache.flink.table.factories.FactoryUtil.discoverFactory", - "at org.apache.flink.table.factories.FactoryUtil.createTableSource" + "at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource" }; for (String error : errors) { assertThat(output, containsString(error)); diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java index 589abfeb101fb..72a92d0dfd664 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java @@ -152,6 +152,7 @@ public static StreamTableEnvironment create( settings.getPlanner(), executor, tableConfig, + moduleManager, catalogManager, functionCatalog); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index b72413a14d7cd..037843d0ded65 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -303,6 +303,7 @@ private static TableEnvironmentImpl create( settings.getPlanner(), executor, tableConfig, + moduleManager, catalogManager, functionCatalog); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerFactory.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerFactory.java index bc4c44f479a51..de4b552c3b543 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerFactory.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerFactory.java @@ -23,6 +23,7 @@ import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.FunctionCatalog; import org.apache.flink.table.factories.Factory; +import org.apache.flink.table.module.ModuleManager; /** * Factory that creates {@link Planner}. @@ -50,6 +51,9 @@ interface Context { /** The configuration of the planner to use. */ TableConfig getTableConfig(); + /** The module manager. */ + ModuleManager getModuleManager(); + /** The catalog manager to look up tables and views. */ CatalogManager getCatalogManager(); @@ -61,16 +65,19 @@ interface Context { class DefaultPlannerContext implements Context { private final Executor executor; private final TableConfig tableConfig; + private final ModuleManager moduleManager; private final CatalogManager catalogManager; private final FunctionCatalog functionCatalog; public DefaultPlannerContext( Executor executor, TableConfig tableConfig, + ModuleManager moduleManager, CatalogManager catalogManager, FunctionCatalog functionCatalog) { this.executor = executor; this.tableConfig = tableConfig; + this.moduleManager = moduleManager; this.catalogManager = catalogManager; this.functionCatalog = functionCatalog; } @@ -85,6 +92,11 @@ public TableConfig getTableConfig() { return tableConfig; } + @Override + public ModuleManager getModuleManager() { + return moduleManager; + } + @Override public CatalogManager getCatalogManager() { return catalogManager; diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/PlannerFactoryUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/PlannerFactoryUtil.java index 50966f9cfa291..ef64f32bdb984 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/PlannerFactoryUtil.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/PlannerFactoryUtil.java @@ -27,6 +27,7 @@ import org.apache.flink.table.delegation.PlannerFactory; import org.apache.flink.table.delegation.PlannerFactory.Context; import org.apache.flink.table.delegation.PlannerFactory.DefaultPlannerContext; +import org.apache.flink.table.module.ModuleManager; /** Utility for discovering and instantiating {@link PlannerFactory}. */ @Internal @@ -37,6 +38,7 @@ public static Planner createPlanner( String plannerIdentifier, Executor executor, TableConfig tableConfig, + ModuleManager moduleManager, CatalogManager catalogManager, FunctionCatalog functionCatalog) { final PlannerFactory plannerFactory = @@ -46,7 +48,8 @@ public static Planner createPlanner( plannerIdentifier); final Context context = - new DefaultPlannerContext(executor, tableConfig, catalogManager, functionCatalog); + new DefaultPlannerContext( + executor, tableConfig, moduleManager, catalogManager, functionCatalog); return plannerFactory.create(context); } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/module/ModuleManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/module/ModuleManager.java index 4379a1166262d..3c44a3f077da1 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/module/ModuleManager.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/module/ModuleManager.java @@ -18,8 +18,10 @@ package org.apache.flink.table.module; +import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.factories.Factory; import org.apache.flink.table.functions.FunctionDefinition; import org.apache.flink.util.StringUtils; @@ -35,6 +37,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.function.Function; import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkArgument; @@ -44,15 +47,16 @@ * Responsible for loading/unloading modules, managing their life cycles, and resolving module * objects. */ +@Internal public class ModuleManager { private static final Logger LOG = LoggerFactory.getLogger(ModuleManager.class); /** To keep {@link #listFullModules()} deterministic. */ - private LinkedHashMap loadedModules; + private final LinkedHashMap loadedModules; /** Keep tracking used modules with resolution order. */ - private List usedModules; + private final List usedModules; public ModuleManager() { this.loadedModules = new LinkedHashMap<>(); @@ -184,6 +188,25 @@ public Optional getFunctionDefinition(String name) { return Optional.empty(); } + /** + * Returns the first factory found in the loaded modules given a selector. + * + *

Modules are checked in the order in which they have been loaded. The first factory + * returned by a module will be used. If no loaded module provides a factory, {@link + * Optional#empty()} is returned. + */ + @SuppressWarnings("unchecked") + public Optional getFactory(Function> selector) { + for (final String moduleName : usedModules) { + final Optional factory = selector.apply(loadedModules.get(moduleName)); + if (factory.isPresent()) { + return factory; + } + } + + return Optional.empty(); + } + @VisibleForTesting List getUsedModules() { return usedModules; diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala index f416fb8d5ff22..8fd5e28eeb4f2 100644 --- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala +++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala @@ -492,7 +492,7 @@ object StreamTableEnvironmentImpl { val executor = lookupExecutor(classLoader, settings.getExecutor, executionEnvironment) val planner = PlannerFactoryUtil.createPlanner(settings.getPlanner, executor, tableConfig, - catalogManager, functionCatalog) + moduleManager, catalogManager, functionCatalog) new StreamTableEnvironmentImpl( catalogManager, diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java index 6e02a90d0f8cf..f49144fa68998 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java @@ -127,10 +127,12 @@ public final class FactoryUtil { /** * Creates a {@link DynamicTableSource} from a {@link CatalogTable}. * - *

It considers {@link Catalog#getFactory()} if provided. + *

If {@param preferredFactory} is passed, the table source is created from that factory. + * Otherwise, an attempt is made to discover a matching factory using Java SPI (see {@link + * Factory} for details). */ - public static DynamicTableSource createTableSource( - @Nullable Catalog catalog, + public static DynamicTableSource createDynamicTableSource( + @Nullable DynamicTableSourceFactory preferredFactory, ObjectIdentifier objectIdentifier, ResolvedCatalogTable catalogTable, ReadableConfig configuration, @@ -141,7 +143,9 @@ public static DynamicTableSource createTableSource( objectIdentifier, catalogTable, configuration, classLoader, isTemporary); try { final DynamicTableSourceFactory factory = - getDynamicTableFactory(DynamicTableSourceFactory.class, catalog, context); + preferredFactory != null + ? preferredFactory + : discoverTableFactory(DynamicTableSourceFactory.class, context); return factory.createDynamicTableSource(context); } catch (Throwable t) { throw new ValidationException( @@ -159,11 +163,15 @@ public static DynamicTableSource createTableSource( } /** - * Creates a {@link DynamicTableSink} from a {@link CatalogTable}. + * Creates a {@link DynamicTableSource} from a {@link CatalogTable}. * *

It considers {@link Catalog#getFactory()} if provided. + * + * @deprecated Use {@link #createDynamicTableSource(DynamicTableSourceFactory, ObjectIdentifier, + * ResolvedCatalogTable, ReadableConfig, ClassLoader, boolean)} instead. */ - public static DynamicTableSink createTableSink( + @Deprecated + public static DynamicTableSource createTableSource( @Nullable Catalog catalog, ObjectIdentifier objectIdentifier, ResolvedCatalogTable catalogTable, @@ -173,9 +181,40 @@ public static DynamicTableSink createTableSink( final DefaultDynamicTableContext context = new DefaultDynamicTableContext( objectIdentifier, catalogTable, configuration, classLoader, isTemporary); + + return createDynamicTableSource( + getDynamicTableFactory(DynamicTableSourceFactory.class, catalog, context), + objectIdentifier, + catalogTable, + configuration, + classLoader, + isTemporary); + } + + /** + * Creates a {@link DynamicTableSink} from a {@link CatalogTable}. + * + *

If {@param preferredFactory} is passed, the table sink is created from that factory. + * Otherwise, an attempt is made to discover a matching factory using Java SPI (see {@link + * Factory} for details). + */ + public static DynamicTableSink createDynamicTableSink( + @Nullable DynamicTableSinkFactory preferredFactory, + ObjectIdentifier objectIdentifier, + ResolvedCatalogTable catalogTable, + ReadableConfig configuration, + ClassLoader classLoader, + boolean isTemporary) { + final DefaultDynamicTableContext context = + new DefaultDynamicTableContext( + objectIdentifier, catalogTable, configuration, classLoader, isTemporary); + try { final DynamicTableSinkFactory factory = - getDynamicTableFactory(DynamicTableSinkFactory.class, catalog, context); + preferredFactory != null + ? preferredFactory + : discoverTableFactory(DynamicTableSinkFactory.class, context); + return factory.createDynamicTableSink(context); } catch (Throwable t) { throw new ValidationException( @@ -192,6 +231,35 @@ public static DynamicTableSink createTableSink( } } + /** + * Creates a {@link DynamicTableSink} from a {@link CatalogTable}. + * + *

It considers {@link Catalog#getFactory()} if provided. + * + * @deprecated Use {@link #createDynamicTableSink(DynamicTableSinkFactory, ObjectIdentifier, + * ResolvedCatalogTable, ReadableConfig, ClassLoader, boolean)} instead. + */ + @Deprecated + public static DynamicTableSink createTableSink( + @Nullable Catalog catalog, + ObjectIdentifier objectIdentifier, + ResolvedCatalogTable catalogTable, + ReadableConfig configuration, + ClassLoader classLoader, + boolean isTemporary) { + final DefaultDynamicTableContext context = + new DefaultDynamicTableContext( + objectIdentifier, catalogTable, configuration, classLoader, isTemporary); + + return createDynamicTableSink( + getDynamicTableFactory(DynamicTableSinkFactory.class, catalog, context), + objectIdentifier, + catalogTable, + configuration, + classLoader, + isTemporary); + } + /** * Creates a utility that helps validating options for a {@link CatalogFactory}. * @@ -533,19 +601,24 @@ public static String getFormatPrefix( @SuppressWarnings("unchecked") private static T getDynamicTableFactory( - Class factoryClass, @Nullable Catalog catalog, DefaultDynamicTableContext context) { - // catalog factory has highest precedence - if (catalog != null) { - final Factory factory = - catalog.getFactory() - .filter(f -> factoryClass.isAssignableFrom(f.getClass())) - .orElse(null); - if (factory != null) { - return (T) factory; - } + Class factoryClass, @Nullable Catalog catalog, DynamicTableFactory.Context context) { + return getDynamicTableFactory(factoryClass, catalog) + .orElseGet(() -> discoverTableFactory(factoryClass, context)); + } + + @SuppressWarnings("unchecked") + private static Optional getDynamicTableFactory( + Class factoryClass, @Nullable Catalog catalog) { + if (catalog == null) { + return Optional.empty(); } - // fallback to factory discovery + return catalog.getFactory() + .map(f -> factoryClass.isAssignableFrom(f.getClass()) ? (T) f : null); + } + + private static T discoverTableFactory( + Class factoryClass, DynamicTableFactory.Context context) { final String connectorOption = context.getCatalogTable().getOptions().get(CONNECTOR.key()); if (connectorOption == null) { throw new ValidationException( @@ -574,7 +647,7 @@ private static CatalogFactory getCatalogFactory(CatalogFactory.Context context) } private static ValidationException enrichNoMatchingConnectorError( - Class factoryClass, DefaultDynamicTableContext context, String connectorOption) { + Class factoryClass, DynamicTableFactory.Context context, String connectorOption) { final DynamicTableFactory factory; try { factory = diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/Module.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/Module.java index bab881fef1d61..6d578cc561542 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/Module.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/Module.java @@ -19,6 +19,8 @@ package org.apache.flink.table.module; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.functions.FunctionDefinition; import java.util.Collections; @@ -52,5 +54,47 @@ default Optional getFunctionDefinition(String name) { return Optional.empty(); } + /** + * Returns a {@link DynamicTableSourceFactory} for creating source tables. + * + *

A factory is determined with the following precedence rule: + * + *

    + *
  • 1. Factory provided by the corresponding catalog of a persisted table. + *
  • 2. Factory provided by a module. + *
  • 3. Factory discovered using Java SPI. + *
+ * + *

This will be called on loaded modules in the order in which they have been loaded. The + * first factory returned will be used. + * + *

This method can be useful to disable Java SPI completely or influence how temporary table + * sources should be created without a corresponding catalog. + */ + default Optional getTableSourceFactory() { + return Optional.empty(); + } + + /** + * Returns a {@link DynamicTableSinkFactory} for creating sink tables. + * + *

A factory is determined with the following precedence rule: + * + *

    + *
  • 1. Factory provided by the corresponding catalog of a persisted table. + *
  • 2. Factory provided by a module. + *
  • 3. Factory discovered using Java SPI. + *
+ * + *

This will be called on loaded modules in the order in which they have been loaded. The + * first factory returned will be used. + * + *

This method can be useful to disable Java SPI completely or influence how temporary table + * sinks should be created without a corresponding catalog. + */ + default Optional getTableSinkFactory() { + return Optional.empty(); + } + // user defined types, operators, rules, etc } diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/utils/FactoryMocks.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/utils/FactoryMocks.java index b11cca8dcb83e..dee33bca94bb6 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/utils/FactoryMocks.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/utils/FactoryMocks.java @@ -55,7 +55,7 @@ public final class FactoryMocks { public static DynamicTableSource createTableSource( ResolvedSchema schema, Map options) { - return FactoryUtil.createTableSource( + return FactoryUtil.createDynamicTableSource( null, IDENTIFIER, new ResolvedCatalogTable( @@ -77,7 +77,7 @@ public static DynamicTableSink createTableSink( public static DynamicTableSink createTableSink( ResolvedSchema schema, List partitionKeys, Map options) { - return FactoryUtil.createTableSink( + return FactoryUtil.createDynamicTableSink( null, IDENTIFIER, new ResolvedCatalogTable( diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultPlannerFactory.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultPlannerFactory.java index fe298272a6181..4476a7a7517f9 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultPlannerFactory.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultPlannerFactory.java @@ -57,12 +57,14 @@ public Planner create(Context context) { return new StreamPlanner( context.getExecutor(), context.getTableConfig(), + context.getModuleManager(), context.getFunctionCatalog(), context.getCatalogManager()); case BATCH: return new BatchPlanner( context.getExecutor(), context.getTableConfig(), + context.getModuleManager(), context.getFunctionCatalog(), context.getCatalogManager()); default: diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java index 8ff2429e93c9b..1ec3c824c809e 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java @@ -25,6 +25,7 @@ import org.apache.flink.table.api.TableException; import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.FunctionCatalog; +import org.apache.flink.table.module.ModuleManager; import org.apache.flink.table.planner.calcite.CalciteConfig; import org.apache.flink.table.planner.calcite.CalciteConfig$; import org.apache.flink.table.planner.calcite.FlinkContext; @@ -104,6 +105,7 @@ public class PlannerContext { public PlannerContext( boolean isBatchMode, TableConfig tableConfig, + ModuleManager moduleManager, FunctionCatalog functionCatalog, CatalogManager catalogManager, CalciteSchema rootSchema, @@ -114,6 +116,7 @@ public PlannerContext( new FlinkContextImpl( isBatchMode, tableConfig, + moduleManager, functionCatalog, catalogManager, rexConverterFactory); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilityContext.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilityContext.java index ccb772e73cf56..1fbb61a468f88 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilityContext.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilityContext.java @@ -22,6 +22,7 @@ import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.FunctionCatalog; import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.module.ModuleManager; import org.apache.flink.table.planner.calcite.FlinkContext; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.planner.calcite.SqlExprToRexConverterFactory; @@ -71,6 +72,11 @@ public CatalogManager getCatalogManager() { return context.getCatalogManager(); } + @Override + public ModuleManager getModuleManager() { + return context.getModuleManager(); + } + @Override public SqlExprToRexConverterFactory getSqlExprToRexConverterFactory() { return context.getSqlExprToRexConverterFactory(); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java index ae9b7036fa77b..3633628bda6b0 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java @@ -43,7 +43,7 @@ public BatchExecSink( String description) { super( tableSinkSpec, - tableSinkSpec.getTableSink().getChangelogMode(ChangelogMode.insertOnly()), + ChangelogMode.insertOnly(), true, // isBounded getNewNodeId(), Collections.singletonList(inputProperty), @@ -56,7 +56,6 @@ public BatchExecSink( protected Transformation translateToPlanInternal(PlannerBase planner) { final Transformation inputTransform = (Transformation) getInputEdges().get(0).translateToPlan(planner); - return createSinkTransformation( - planner.getExecEnv(), planner.getTableConfig(), inputTransform, -1, false); + return createSinkTransformation(planner, inputTransform, -1, false); } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java index b0f34906421a1..2ead9c9185512 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java @@ -48,6 +48,7 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator; import org.apache.flink.table.planner.connectors.TransformationSinkProvider; +import org.apache.flink.table.planner.delegation.PlannerBase; import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase; import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; @@ -109,12 +110,12 @@ public DynamicTableSinkSpec getTableSinkSpec() { @SuppressWarnings("unchecked") protected Transformation createSinkTransformation( - StreamExecutionEnvironment env, - TableConfig tableConfig, + PlannerBase planner, Transformation inputTransform, int rowtimeFieldIndex, boolean upsertMaterialize) { - final DynamicTableSink tableSink = tableSinkSpec.getTableSink(); + final DynamicTableSink tableSink = tableSinkSpec.getTableSink(planner); + final ChangelogMode inputChangelogMode = tableSink.getChangelogMode(changelogMode); final ResolvedSchema schema = tableSinkSpec.getCatalogTable().getResolvedSchema(); final SinkRuntimeProvider runtimeProvider = @@ -127,9 +128,15 @@ protected Transformation createSinkTransformation( final int sinkParallelism = deriveSinkParallelism(inputTransform, runtimeProvider); Transformation sinkTransform = - applyNotNullEnforcer(inputTransform, tableConfig, physicalRowType); + applyNotNullEnforcer(inputTransform, planner.getTableConfig(), physicalRowType); - sinkTransform = applyKeyBy(sinkTransform, primaryKeys, sinkParallelism, upsertMaterialize); + sinkTransform = + applyKeyBy( + inputChangelogMode, + sinkTransform, + primaryKeys, + sinkParallelism, + upsertMaterialize); if (upsertMaterialize) { sinkTransform = @@ -137,13 +144,17 @@ protected Transformation createSinkTransformation( sinkTransform, primaryKeys, sinkParallelism, - tableConfig, + planner.getTableConfig(), physicalRowType); } return (Transformation) applySinkProvider( - sinkTransform, env, runtimeProvider, rowtimeFieldIndex, sinkParallelism); + sinkTransform, + planner.getExecEnv(), + runtimeProvider, + rowtimeFieldIndex, + sinkParallelism); } /** @@ -220,12 +231,13 @@ private int deriveSinkParallelism( * messages. */ private Transformation applyKeyBy( + ChangelogMode inputChangelogMode, Transformation inputTransform, int[] primaryKeys, int sinkParallelism, boolean upsertMaterialize) { final int inputParallelism = inputTransform.getParallelism(); - if ((inputParallelism == sinkParallelism || changelogMode.containsOnly(RowKind.INSERT)) + if ((inputParallelism == sinkParallelism || inputChangelogMode.containsOnly(RowKind.INSERT)) && !upsertMaterialize) { return inputTransform; } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSinkSpec.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSinkSpec.java index 4e11544a234a5..2c522eb64b812 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSinkSpec.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSinkSpec.java @@ -21,7 +21,10 @@ import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.factories.DynamicTableSinkFactory; import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.module.Module; +import org.apache.flink.table.planner.delegation.PlannerBase; import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; @@ -59,11 +62,17 @@ public DynamicTableSinkSpec( this.sinkAbilitySpecs = sinkAbilitySpecs; } - public DynamicTableSink getTableSink() { + public DynamicTableSink getTableSink(PlannerBase planner) { if (tableSink == null) { + final DynamicTableSinkFactory factory = + planner.getFlinkContext() + .getModuleManager() + .getFactory(Module::getTableSinkFactory) + .orElse(null); + tableSink = - FactoryUtil.createTableSink( - null, // catalog, TODO support create Factory from catalog + FactoryUtil.createDynamicTableSink( + factory, objectIdentifier, catalogTable, configuration, diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSourceSpec.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSourceSpec.java index 6475c14942b6e..a36fd249f0299 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSourceSpec.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSourceSpec.java @@ -24,7 +24,9 @@ import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.LookupTableSource; import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.module.Module; import org.apache.flink.table.planner.delegation.PlannerBase; import org.apache.flink.table.planner.plan.abilities.source.SourceAbilityContext; import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec; @@ -71,9 +73,16 @@ public DynamicTableSourceSpec( private DynamicTableSource getTableSource(PlannerBase planner) { checkNotNull(configuration); if (tableSource == null) { + final DynamicTableSourceFactory factory = + planner.getFlinkContext() + .getModuleManager() + .getFactory(Module::getTableSourceFactory) + .orElse(null); + tableSource = - FactoryUtil.createTableSource( - null, // catalog, TODO support create Factory from catalog + FactoryUtil.createDynamicTableSource( + // TODO Support creating from a catalog + factory, objectIdentifier, catalogTable, configuration, diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java index f57eacf776e44..c145b5969e3c7 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java @@ -75,7 +75,7 @@ public StreamExecSink( String description) { super( tableSinkSpec, - tableSinkSpec.getTableSink().getChangelogMode(inputChangelogMode), + inputChangelogMode, false, // isBounded getNewNodeId(), Collections.singletonList(inputProperty), @@ -96,7 +96,7 @@ public StreamExecSink( @JsonProperty(FIELD_NAME_DESCRIPTION) String description) { super( tableSinkSpec, - tableSinkSpec.getTableSink().getChangelogMode(inputChangelogMode), + inputChangelogMode, false, // isBounded id, inputProperties, @@ -138,10 +138,6 @@ protected Transformation translateToPlanInternal(PlannerBase planner) { } return createSinkTransformation( - planner.getExecEnv(), - planner.getTableConfig(), - inputTransform, - rowtimeFieldIndex, - upsertMaterialize); + planner, inputTransform, rowtimeFieldIndex, upsertMaterialize); } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.java index d23b2f45824c3..b7ba15f1f7917 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.java @@ -21,16 +21,20 @@ import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.module.Module; import org.apache.flink.table.planner.calcite.FlinkContext; import org.apache.flink.table.planner.calcite.FlinkRelBuilder; import org.apache.flink.table.planner.catalog.CatalogSchemaTable; import org.apache.flink.table.planner.connectors.DynamicSourceUtils; import org.apache.flink.table.planner.hint.FlinkHints; import org.apache.flink.table.planner.utils.ShortcutUtils; +import org.apache.flink.util.OptionalUtils; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptSchema; @@ -41,6 +45,10 @@ import java.util.List; import java.util.Map; +import java.util.Optional; + +import static org.apache.flink.util.OptionalUtils.firstPresent; +import static org.apache.flink.util.OptionalUtils.stream; /** * A {@link FlinkPreparingTableBase} implementation which defines the interfaces required to @@ -113,8 +121,28 @@ private ResolvedCatalogTable createFinalCatalogTable( private DynamicTableSource createDynamicTableSource( FlinkContext context, ResolvedCatalogTable catalogTable) { final ReadableConfig config = context.getTableConfig().getConfiguration(); - return FactoryUtil.createTableSource( - schemaTable.getCatalog().orElse(null), + + final Optional factoryFromCatalog = + stream(schemaTable.getCatalog()) + .map(Catalog::getFactory) + .flatMap(OptionalUtils::stream) + .map( + f -> + f instanceof DynamicTableSourceFactory + ? (DynamicTableSourceFactory) f + : null) + .findFirst(); + + final Optional factoryFromModule = + context.getModuleManager().getFactory(Module::getTableSourceFactory); + + // Since the catalog is more specific, we give it precedence over a factory provided by any + // modules. + final DynamicTableSourceFactory factory = + firstPresent(factoryFromCatalog, factoryFromModule).orElse(null); + + return FactoryUtil.createDynamicTableSource( + factory, schemaTable.getTableIdentifier(), catalogTable, config, diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkContext.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkContext.scala index bb8ed527a6991..4d29cc1c1ea6b 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkContext.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkContext.scala @@ -20,6 +20,7 @@ package org.apache.flink.table.planner.calcite import org.apache.flink.table.api.TableConfig import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog} +import org.apache.flink.table.module.ModuleManager import org.apache.calcite.plan.Context @@ -28,29 +29,22 @@ import org.apache.calcite.plan.Context */ trait FlinkContext extends Context { - /** - * Returns whether the planner runs in batch mode. - */ + /** Returns whether the planner is in batch mode. */ def isBatchMode: Boolean - /** - * Gets [[TableConfig]] instance defined in [[org.apache.flink.table.api.TableEnvironment]]. - */ + /** Returns the [[TableConfig]] defined in [[org.apache.flink.table.api.TableEnvironment]]. */ def getTableConfig: TableConfig - /** - * Gets [[FunctionCatalog]] instance defined in [[org.apache.flink.table.api.TableEnvironment]]. - */ + /** Returns the [[FunctionCatalog]] defined in [[org.apache.flink.table.api.TableEnvironment]]. */ def getFunctionCatalog: FunctionCatalog - /** - * Gets [[CatalogManager]] instance defined in [[org.apache.flink.table.api.TableEnvironment]]. - */ + /** Returns the [[CatalogManager]] defined in [[org.apache.flink.table.api.TableEnvironment]]. */ def getCatalogManager: CatalogManager - /** - * Gets [[SqlExprToRexConverterFactory]] instance to convert sql expression to rex node. - */ + /** Returns the [[ModuleManager]] defined in [[org.apache.flink.table.api.TableEnvironment]]. */ + def getModuleManager: ModuleManager + + /** Returns the [[SqlExprToRexConverterFactory]] to convert SQL expressions to rex nodes. */ def getSqlExprToRexConverterFactory: SqlExprToRexConverterFactory override def unwrap[C](clazz: Class[C]): C = { diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkContextImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkContextImpl.scala index 0850099d75cd7..70e9b78a3927b 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkContextImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkContextImpl.scala @@ -20,10 +20,12 @@ package org.apache.flink.table.planner.calcite import org.apache.flink.table.api.TableConfig import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog} +import org.apache.flink.table.module.ModuleManager class FlinkContextImpl( inBatchMode: Boolean, tableConfig: TableConfig, + moduleManager: ModuleManager, functionCatalog: FunctionCatalog, catalogManager: CatalogManager, toRexFactory: SqlExprToRexConverterFactory) @@ -33,6 +35,8 @@ class FlinkContextImpl( override def getTableConfig: TableConfig = tableConfig + override def getModuleManager: ModuleManager = moduleManager + override def getFunctionCatalog: FunctionCatalog = functionCatalog override def getCatalogManager: CatalogManager = catalogManager diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala index f1a07c8078d37..e5825197e9d60 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala @@ -26,6 +26,7 @@ import org.apache.flink.table.api.config.OptimizerConfigOptions import org.apache.flink.table.api.{ExplainDetail, TableConfig, TableException} import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, ObjectIdentifier} import org.apache.flink.table.delegation.Executor +import org.apache.flink.table.module.ModuleManager import org.apache.flink.table.operations.{CatalogSinkModifyOperation, ModifyOperation, Operation, QueryOperation} import org.apache.flink.table.planner.operations.PlannerQueryOperation import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistributionTraitDef @@ -49,9 +50,11 @@ import scala.collection.JavaConversions._ class BatchPlanner( executor: Executor, config: TableConfig, + moduleManager: ModuleManager, functionCatalog: FunctionCatalog, catalogManager: CatalogManager) - extends PlannerBase(executor, config, functionCatalog, catalogManager, isStreamingMode = false) { + extends PlannerBase(executor, config, moduleManager, functionCatalog, catalogManager, + isStreamingMode = false) { override protected def getTraitDefs: Array[RelTraitDef[_ <: RelTrait]] = { Array( @@ -158,7 +161,7 @@ class BatchPlanner( private def createDummyPlanner(): BatchPlanner = { val dummyExecEnv = new DummyStreamExecutionEnvironment(getExecEnv) val executor = new DefaultExecutor(dummyExecEnv) - new BatchPlanner(executor, config, functionCatalog, catalogManager) + new BatchPlanner(executor, config, moduleManager, functionCatalog, catalogManager) } override def explainJsonPlan(jsonPlan: String, extraDetails: ExplainDetail*): String = { diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala index 1bee1f6d6f98b..ee038f5049de3 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala @@ -28,7 +28,8 @@ import org.apache.flink.table.catalog._ import org.apache.flink.table.connector.sink.DynamicTableSink import org.apache.flink.table.delegation.{Executor, Parser, Planner} import org.apache.flink.table.descriptors.{ConnectorDescriptorValidator, DescriptorProperties} -import org.apache.flink.table.factories.{FactoryUtil, TableFactoryUtil} +import org.apache.flink.table.factories.{DynamicTableSinkFactory, FactoryUtil, TableFactoryUtil} +import org.apache.flink.table.module.{Module, ModuleManager} import org.apache.flink.table.operations.OutputConversionModifyOperation.UpdateMode import org.apache.flink.table.operations._ import org.apache.flink.table.planner.JMap @@ -50,7 +51,7 @@ import org.apache.flink.table.planner.plan.utils.SameRelObjectShuttle import org.apache.flink.table.planner.sinks.DataStreamTableSink import org.apache.flink.table.planner.sinks.TableSinkUtils.{inferSinkPhysicalSchema, validateLogicalPhysicalTypesCompatible, validateTableSink} import org.apache.flink.table.planner.utils.InternalConfigOptions.{TABLE_QUERY_START_EPOCH_TIME, TABLE_QUERY_START_LOCAL_TIME} -import org.apache.flink.table.planner.utils.JavaScalaConversionUtil +import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.{toJava, toScala} import org.apache.flink.table.sinks.TableSink import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter @@ -75,6 +76,7 @@ import _root_.scala.collection.JavaConversions._ * [[StreamExecutionEnvironment]] for * [[org.apache.flink.table.sources.StreamTableSource.getDataStream]] * @param config mutable configuration passed from corresponding [[TableEnvironment]] + * @param moduleManager manager for modules * @param functionCatalog catalog of functions * @param catalogManager manager of catalog meta objects such as tables, views, databases etc. * @param isStreamingMode Determines if the planner should work in a batch (false}) or @@ -83,6 +85,7 @@ import _root_.scala.collection.JavaConversions._ abstract class PlannerBase( executor: Executor, config: TableConfig, + val moduleManager: ModuleManager, val functionCatalog: FunctionCatalog, val catalogManager: CatalogManager, isStreamingMode: Boolean) @@ -103,6 +106,7 @@ abstract class PlannerBase( new PlannerContext( !isStreamingMode, config, + moduleManager, functionCatalog, catalogManager, asRootSchema(new CatalogManagerCalciteSchema(catalogManager, isStreamingMode)), @@ -353,7 +357,7 @@ abstract class PlannerBase( dynamicOptions: JMap[String, String]) : Option[(ResolvedCatalogTable, Any)] = { val optionalLookupResult = - JavaScalaConversionUtil.toScala(catalogManager.getTable(objectIdentifier)) + toScala(catalogManager.getTable(objectIdentifier)) if (optionalLookupResult.isEmpty) { return None } @@ -361,7 +365,7 @@ abstract class PlannerBase( lookupResult.getTable match { case connectorTable: ConnectorCatalogTable[_, _] => val resolvedTable = lookupResult.getResolvedTable.asInstanceOf[ResolvedCatalogTable] - JavaScalaConversionUtil.toScala(connectorTable.getTableSink) match { + toScala(connectorTable.getTableSink) match { case Some(sink) => Some(resolvedTable, sink) case None => None } @@ -373,11 +377,11 @@ abstract class PlannerBase( } else { resolvedTable } - val catalog = catalogManager.getCatalog(objectIdentifier.getCatalogName) + val catalog = toScala(catalogManager.getCatalog(objectIdentifier.getCatalogName)) val isTemporary = lookupResult.isTemporary if (isLegacyConnectorOptions(objectIdentifier, resolvedTable.getOrigin, isTemporary)) { val tableSink = TableFactoryUtil.findAndCreateTableSink( - catalog.orElse(null), + catalog.orNull, objectIdentifier, tableToFind.getOrigin, getTableConfig.getConfiguration, @@ -385,8 +389,20 @@ abstract class PlannerBase( isTemporary) Option(resolvedTable, tableSink) } else { - val tableSink = FactoryUtil.createTableSink( - catalog.orElse(null), + val factoryFromCatalog = catalog.flatMap(f => toScala(f.getFactory)) match { + case Some(f: DynamicTableSinkFactory) => Some(f) + case _ => None + } + + val factoryFromModule = toScala(plannerContext.getFlinkContext.getModuleManager + .getFactory(toJava((m: Module) => m.getTableSinkFactory))) + + // Since the catalog is more specific, we give it precedence over a factory provided by + // any modules. + val factory = factoryFromCatalog.orElse(factoryFromModule).orNull + + val tableSink = FactoryUtil.createDynamicTableSink( + factory, objectIdentifier, tableToFind, getTableConfig.getConfiguration, diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala index eb808101b2441..672e8c61292be 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala @@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.graph.StreamGraph import org.apache.flink.table.api.{ExplainDetail, TableConfig, TableException} import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, ObjectIdentifier} import org.apache.flink.table.delegation.Executor +import org.apache.flink.table.module.ModuleManager import org.apache.flink.table.operations.{CatalogSinkModifyOperation, ModifyOperation, Operation, QueryOperation} import org.apache.flink.table.planner.operations.PlannerQueryOperation import org.apache.flink.table.planner.plan.`trait`._ @@ -47,9 +48,11 @@ import _root_.scala.collection.JavaConversions._ class StreamPlanner( executor: Executor, config: TableConfig, + moduleManager: ModuleManager, functionCatalog: FunctionCatalog, catalogManager: CatalogManager) - extends PlannerBase(executor, config, functionCatalog, catalogManager, isStreamingMode = true) { + extends PlannerBase(executor, config, moduleManager, functionCatalog, catalogManager, + isStreamingMode = true) { override protected def getTraitDefs: Array[RelTraitDef[_ <: RelTrait]] = { Array( @@ -151,7 +154,7 @@ class StreamPlanner( private def createDummyPlanner(): StreamPlanner = { val dummyExecEnv = new DummyStreamExecutionEnvironment(getExecEnv) val executor = new DefaultExecutor(dummyExecEnv) - new StreamPlanner(executor, config, functionCatalog, catalogManager) + new StreamPlanner(executor, config, moduleManager, functionCatalog, catalogManager) } override def explainJsonPlan(jsonPlan: String, extraDetails: ExplainDetail*): String = { diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/BatchCommonSubGraphBasedOptimizer.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/BatchCommonSubGraphBasedOptimizer.scala index b838fa6f0230e..0866e73aa2e04 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/BatchCommonSubGraphBasedOptimizer.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/BatchCommonSubGraphBasedOptimizer.scala @@ -20,6 +20,7 @@ package org.apache.flink.table.planner.plan.optimize import org.apache.flink.table.api.TableConfig import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog} +import org.apache.flink.table.module.ModuleManager import org.apache.flink.table.planner.calcite.{FlinkContext, SqlExprToRexConverterFactory} import org.apache.flink.table.planner.delegation.BatchPlanner import org.apache.flink.table.planner.plan.nodes.calcite.{LegacySink, Sink} @@ -94,6 +95,8 @@ class BatchCommonSubGraphBasedOptimizer(planner: BatchPlanner) override def getCatalogManager: CatalogManager = planner.catalogManager + override def getModuleManager: ModuleManager = planner.moduleManager + override def getSqlExprToRexConverterFactory: SqlExprToRexConverterFactory = context.getSqlExprToRexConverterFactory diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala index fb9e10ffc0ca4..47c9cead00eb4 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala @@ -21,6 +21,7 @@ package org.apache.flink.table.planner.plan.optimize import org.apache.flink.table.api.TableConfig import org.apache.flink.table.api.config.ExecutionConfigOptions import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog} +import org.apache.flink.table.module.ModuleManager import org.apache.flink.table.planner.calcite.{FlinkContext, SqlExprToRexConverterFactory} import org.apache.flink.table.planner.delegation.StreamPlanner import org.apache.flink.table.planner.plan.`trait`.{MiniBatchInterval, MiniBatchIntervalTrait, MiniBatchIntervalTraitDef, MiniBatchMode, ModifyKindSet, ModifyKindSetTraitDef, UpdateKind, UpdateKindTraitDef} @@ -39,6 +40,7 @@ import org.apache.calcite.rex.RexBuilder import java.util import java.util.Collections + import scala.collection.JavaConversions._ /** @@ -170,6 +172,8 @@ class StreamCommonSubGraphBasedOptimizer(planner: StreamPlanner) override def getCatalogManager: CatalogManager = planner.catalogManager + override def getModuleManager: ModuleManager = planner.moduleManager + override def getSqlExprToRexConverterFactory: SqlExprToRexConverterFactory = context.getSqlExprToRexConverterFactory diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/delegation/ParserImplTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/delegation/ParserImplTest.java index 619cf450cb6a8..c78243361aade 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/delegation/ParserImplTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/delegation/ParserImplTest.java @@ -61,6 +61,7 @@ public class ParserImplTest { new PlannerContext( !isStreamingMode, tableConfig, + moduleManager, functionCatalog, catalogManager, asRootSchema(new CatalogManagerCalciteSchema(catalogManager, isStreamingMode)), diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverterTest.java index 7e3eba6b2bea1..17695b4256cdb 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverterTest.java @@ -60,11 +60,13 @@ public class ExpressionConverterTest { private final TableConfig tableConfig = new TableConfig(); private final CatalogManager catalogManager = CatalogManagerMocks.createEmptyCatalogManager(); + private final ModuleManager moduleManager = new ModuleManager(); private final PlannerContext plannerContext = new PlannerContext( false, tableConfig, - new FunctionCatalog(tableConfig, catalogManager, new ModuleManager()), + moduleManager, + new FunctionCatalog(tableConfig, catalogManager, moduleManager), catalogManager, CalciteSchema.from(MetadataTestUtil.initRootSchema()), Arrays.asList( diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java index 576bb44952f49..bcf33a0a33cf2 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java @@ -140,6 +140,7 @@ public class SqlToOperationConverterTest { new PlannerContext( false, tableConfig, + moduleManager, functionCatalog, catalogManager, asRootSchema(new CatalogManagerCalciteSchema(catalogManager, isStreamingMode)), diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java index df606babde9d6..98824578113bc 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java @@ -20,14 +20,19 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.internal.TableEnvironmentImpl; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.module.ModuleManager; import org.apache.flink.table.planner.calcite.FlinkContextImpl; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.delegation.PlannerBase; import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable; import org.apache.flink.table.planner.plan.abilities.sink.OverwriteSpec; import org.apache.flink.table.planner.plan.abilities.sink.PartitioningSpec; @@ -71,6 +76,7 @@ public void testDynamicTableSinkSpecSerde() throws IOException { new FlinkContextImpl( false, TableConfig.getDefault(), + new ModuleManager(), null, CatalogManagerMocks.createEmptyCatalogManager(), null), @@ -89,7 +95,10 @@ public void testDynamicTableSinkSpecSerde() throws IOException { actual.setClassLoader(classLoader); assertNull(actual.getReadableConfig()); actual.setReadableConfig(serdeCtx.getConfiguration()); - assertNotNull(actual.getTableSink()); + TableEnvironmentImpl tableEnv = + (TableEnvironmentImpl) + TableEnvironment.create(EnvironmentSettings.inStreamingMode()); + assertNotNull(actual.getTableSink((PlannerBase) tableEnv.getPlanner())); } @Parameterized.Parameters(name = "{0}") diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java index 4a0ed34e59cf5..9c851757ed460 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java @@ -28,6 +28,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.module.ModuleManager; import org.apache.flink.table.planner.calcite.FlinkContextImpl; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.planner.delegation.PlannerBase; @@ -91,6 +92,7 @@ public void testDynamicTableSourceSpecSerde() throws IOException { new FlinkContextImpl( false, TableConfig.getDefault(), + new ModuleManager(), null, CatalogManagerMocks.createEmptyCatalogManager(), null), diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeSerdeTest.java index 7e2306adad1ff..31da6c01fc54c 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeSerdeTest.java @@ -26,6 +26,7 @@ import org.apache.flink.table.api.dataview.MapView; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.expressions.TimeIntervalUnit; +import org.apache.flink.table.module.ModuleManager; import org.apache.flink.table.planner.calcite.FlinkContextImpl; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable; @@ -95,7 +96,13 @@ public class LogicalTypeSerdeTest { public void testLogicalTypeSerde() throws IOException { SerdeContext serdeCtx = new SerdeContext( - new FlinkContextImpl(false, TableConfig.getDefault(), null, null, null), + new FlinkContextImpl( + false, + TableConfig.getDefault(), + new ModuleManager(), + null, + null, + null), Thread.currentThread().getContextClassLoader(), FlinkTypeFactory.INSTANCE(), FlinkSqlOperatorTable.instance()); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalWindowSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalWindowSerdeTest.java index 864f037cf5adb..e78ee8aab4895 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalWindowSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalWindowSerdeTest.java @@ -21,6 +21,7 @@ import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.expressions.FieldReferenceExpression; import org.apache.flink.table.expressions.ValueLiteralExpression; +import org.apache.flink.table.module.ModuleManager; import org.apache.flink.table.planner.calcite.FlinkContextImpl; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.planner.expressions.PlannerWindowReference; @@ -115,7 +116,13 @@ public static List testData() { public void testLogicalWindowSerde() throws JsonProcessingException { SerdeContext serdeCtx = new SerdeContext( - new FlinkContextImpl(false, TableConfig.getDefault(), null, null, null), + new FlinkContextImpl( + false, + TableConfig.getDefault(), + new ModuleManager(), + null, + null, + null), Thread.currentThread().getContextClassLoader(), FlinkTypeFactory.INSTANCE(), FlinkSqlOperatorTable.instance()); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LookupKeySerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LookupKeySerdeTest.java index 6738b98d8b236..27595cba4bd2c 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LookupKeySerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LookupKeySerdeTest.java @@ -48,6 +48,7 @@ public class LookupKeySerdeTest { @Test public void testLookupKey() throws JsonProcessingException { TableConfig tableConfig = TableConfig.getDefault(); + ModuleManager moduleManager = new ModuleManager(); CatalogManager catalogManager = CatalogManager.newBuilder() .classLoader(Thread.currentThread().getContextClassLoader()) @@ -58,7 +59,8 @@ public void testLookupKey() throws JsonProcessingException { new FlinkContextImpl( false, tableConfig, - new FunctionCatalog(tableConfig, catalogManager, new ModuleManager()), + moduleManager, + new FunctionCatalog(tableConfig, catalogManager, moduleManager), catalogManager, null); SerdeContext serdeCtx = diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeSerdeTest.java index d60ed37c2e658..579e815073094 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeSerdeTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeutils.base.VoidSerializer; import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.module.ModuleManager; import org.apache.flink.table.planner.calcite.FlinkContextImpl; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable; @@ -259,7 +260,13 @@ public static Collection parameters() { public void testTypeSerde() throws Exception { SerdeContext serdeCtx = new SerdeContext( - new FlinkContextImpl(false, TableConfig.getDefault(), null, null, null), + new FlinkContextImpl( + false, + TableConfig.getDefault(), + new ModuleManager(), + null, + null, + null), Thread.currentThread().getContextClassLoader(), FlinkTypeFactory.INSTANCE(), FlinkSqlOperatorTable.instance()); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeSerdeTest.java index 40623da962b0e..fdef991dc8733 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeSerdeTest.java @@ -78,6 +78,7 @@ public class RexNodeSerdeTest { @Parameterized.Parameters(name = "{0}") public static Object[][] parameters() { TableConfig tableConfig = TableConfig.getDefault(); + ModuleManager moduleManager = new ModuleManager(); CatalogManager catalogManager = CatalogManager.newBuilder() .classLoader(Thread.currentThread().getContextClassLoader()) @@ -88,7 +89,8 @@ public static Object[][] parameters() { new FlinkContextImpl( false, tableConfig, - new FunctionCatalog(tableConfig, catalogManager, new ModuleManager()), + moduleManager, + new FunctionCatalog(tableConfig, catalogManager, moduleManager), catalogManager, null); // toRexFactory RexBuilder rexBuilder = new RexBuilder(FACTORY); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundSerdeTest.java index 70d6b673a38ae..046f24feb86e7 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundSerdeTest.java @@ -19,6 +19,7 @@ package org.apache.flink.table.planner.plan.nodes.exec.serde; import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.module.ModuleManager; import org.apache.flink.table.planner.calcite.FlinkContextImpl; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable; @@ -43,7 +44,13 @@ public class RexWindowBoundSerdeTest { public void testSerde() throws JsonProcessingException { SerdeContext serdeCtx = new SerdeContext( - new FlinkContextImpl(false, TableConfig.getDefault(), null, null, null), + new FlinkContextImpl( + false, + TableConfig.getDefault(), + new ModuleManager(), + null, + null, + null), Thread.currentThread().getContextClassLoader(), FlinkTypeFactory.INSTANCE(), FlinkSqlOperatorTable.instance()); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/TemporalTableSourceSpecSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/TemporalTableSourceSpecSerdeTest.java index 731c13a8f6eb9..e3dec81df3bc0 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/TemporalTableSourceSpecSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/TemporalTableSourceSpecSerdeTest.java @@ -26,6 +26,7 @@ import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.connector.source.LookupTableSource; +import org.apache.flink.table.module.ModuleManager; import org.apache.flink.table.planner.calcite.FlinkContext; import org.apache.flink.table.planner.calcite.FlinkContextImpl; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; @@ -64,6 +65,7 @@ public class TemporalTableSourceSpecSerdeTest { new FlinkContextImpl( false, TableConfig.getDefault(), + new ModuleManager(), null, CatalogManagerMocks.createEmptyCatalogManager(), null); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/module/ModuleITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/module/ModuleITCase.java new file mode 100644 index 0000000000000..bdd8f63851b01 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/module/ModuleITCase.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.runtime.stream.module; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.DynamicTableSourceFactory; +import org.apache.flink.table.factories.Factory; +import org.apache.flink.table.module.Module; +import org.apache.flink.table.planner.factories.TableFactoryHarness; +import org.apache.flink.table.planner.runtime.utils.StreamingTestBase; + +import org.junit.Test; + +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows; + +/** Tests for modules. */ +public class ModuleITCase extends StreamingTestBase { + + @Test + public void testTableSourceFactory() { + tEnv().createTemporaryTable( + "T", + TableFactoryHarness.newBuilder() + .schema(Schema.newBuilder().build()) + .source( + new TableFactoryHarness.ScanSourceBase() { + @Override + public ScanRuntimeProvider getScanRuntimeProvider( + ScanContext runtimeProviderContext) { + throw new UnsupportedOperationException( + "Discovered factory should not be used"); + } + }) + .build()); + + final Table table = tEnv().from("T"); + + // Sanity check: without our module loaded, the factory discovery process is used. + assertThrows( + "Discovered factory should not be used", + UnsupportedOperationException.class, + table::explain); + + // The module has precedence over factory discovery. + tEnv().loadModule("M", new SourceSinkFactoryOverwriteModule()); + table.explain(); + } + + @Test + public void testTableSinkFactory() { + tEnv().createTemporaryTable( + "T", + TableFactoryHarness.newBuilder() + .schema(Schema.newBuilder().column("f0", DataTypes.INT()).build()) + .sink( + new TableFactoryHarness.SinkBase() { + @Override + public SinkRuntimeProvider getSinkRuntimeProvider( + Context context) { + throw new UnsupportedOperationException( + "Discovered factory should not be used"); + } + }) + .build()); + + // Sanity check: without our module loaded, the factory discovery process is used. + assertThrows( + "Discovered factory should not be used", + UnsupportedOperationException.class, + () -> tEnv().explainSql("INSERT INTO T SELECT 1")); + + // The module has precedence over factory discovery. + tEnv().loadModule("M", new SourceSinkFactoryOverwriteModule()); + tEnv().explainSql("INSERT INTO T SELECT 1"); + } + + // --------------------------------------------------------------------------------------------- + + private static class SourceSinkFactoryOverwriteModule implements Module { + @Override + public Optional getTableSourceFactory() { + return Optional.of(new SourceFactory()); + } + + @Override + public Optional getTableSinkFactory() { + return Optional.of(new SinkFactory()); + } + } + + private static class SourceFactory extends FactoryBase implements DynamicTableSourceFactory { + @Override + public DynamicTableSource createDynamicTableSource(Context context) { + return new TableFactoryHarness.ScanSourceBase() {}; + } + } + + private static class SinkFactory extends FactoryBase implements DynamicTableSinkFactory { + @Override + public DynamicTableSink createDynamicTableSink(Context context) { + return new TableFactoryHarness.SinkBase() {}; + } + } + + private static class FactoryBase implements Factory { + @Override + public String factoryIdentifier() { + throw new UnsupportedOperationException(); + } + + @Override + public Set> requiredOptions() { + throw new UnsupportedOperationException(); + } + + @Override + public Set> optionalOptions() { + throw new UnsupportedOperationException(); + } + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java index 83faafcc6c495..c2f95e07a3d48 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java @@ -46,6 +46,7 @@ public static FlinkPlannerImpl createDefaultPlanner() { new PlannerContext( false, tableConfig, + moduleManager, functionCatalog, catalogManager, asRootSchema(new CatalogManagerCalciteSchema(catalogManager, true)), diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala index 5ede86eda74bf..3dbb790357228 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala @@ -58,11 +58,13 @@ class WatermarkGeneratorCodeGenTest(useDefinedConstructor: Boolean) { // mock FlinkPlannerImpl to avoid discovering TableEnvironment and Executor. val config = new TableConfig + val moduleManager = new ModuleManager val catalogManager: CatalogManager = CatalogManagerMocks.createEmptyCatalogManager() - val functionCatalog = new FunctionCatalog(config, catalogManager, new ModuleManager) + val functionCatalog = new FunctionCatalog(config, catalogManager, moduleManager) val plannerContext = new PlannerContext( false, config, + moduleManager, functionCatalog, catalogManager, asRootSchema(new CatalogManagerCalciteSchema(catalogManager, false)), diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala index 13c7566ef09cd..93f3db56de101 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala @@ -78,6 +78,7 @@ class AggCallSelectivityEstimatorTest { private def mockScan( statistic: FlinkStatistic = FlinkStatistic.UNKNOWN): TableScan = { val tableConfig = new TableConfig + val moduleManager = new ModuleManager val catalogManager = CatalogManagerMocks.createEmptyCatalogManager() val rootSchema = CalciteSchema.createRootSchema(true, false).plus() val table = new MockMetaTable(relDataType, statistic) @@ -86,7 +87,8 @@ class AggCallSelectivityEstimatorTest { new PlannerContext( false, tableConfig, - new FunctionCatalog(tableConfig, catalogManager, new ModuleManager), + moduleManager, + new FunctionCatalog(tableConfig, catalogManager, moduleManager), catalogManager, CalciteSchema.from(rootSchema), util.Arrays.asList( diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala index 7f457e239766a..c62cf95227830 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala @@ -90,6 +90,7 @@ class FlinkRelMdHandlerTestBase { new PlannerContext( false, tableConfig, + moduleManager, new FunctionCatalog(tableConfig, catalogManager, moduleManager), catalogManager, CalciteSchema.from(rootSchema), diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala index fcee5498cd123..72dab24f619ec 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala @@ -23,6 +23,7 @@ import org.apache.flink.table.api.{DataTypes, TableConfig, TableException, Table import org.apache.flink.table.catalog.{CatalogTable, Column, ObjectIdentifier, ResolvedCatalogTable, ResolvedSchema, UniqueConstraint} import org.apache.flink.table.connector.ChangelogMode import org.apache.flink.table.connector.source.{DynamicTableSource, ScanTableSource} +import org.apache.flink.table.module.ModuleManager import org.apache.flink.table.plan.stats.{ColumnStats, TableStats} import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkContextImpl, FlinkTypeFactory, FlinkTypeSystem} import org.apache.flink.table.planner.plan.schema.{FlinkPreparingTableBase, TableSourceTable} @@ -251,6 +252,7 @@ object MetadataTestUtil { private val flinkContext = new FlinkContextImpl( false, TableConfig.getDefault, + new ModuleManager, null, CatalogManagerMocks.createEmptyCatalogManager, null) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/SelectivityEstimatorTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/SelectivityEstimatorTest.scala index 5ee1481b20348..25797a28ff81a 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/SelectivityEstimatorTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/SelectivityEstimatorTest.scala @@ -79,6 +79,7 @@ class SelectivityEstimatorTest { private def mockScan( statistic: FlinkStatistic = FlinkStatistic.UNKNOWN, tableConfig: TableConfig = TableConfig.getDefault): TableScan = { + val moduleManager = new ModuleManager val catalogManager = CatalogManagerMocks.createEmptyCatalogManager() val rootSchema = CalciteSchema.createRootSchema(true, false).plus() val table = new MockMetaTable(relDataType, statistic) @@ -87,7 +88,8 @@ class SelectivityEstimatorTest { new PlannerContext( false, tableConfig, - new FunctionCatalog(tableConfig, catalogManager, new ModuleManager), + moduleManager, + new FunctionCatalog(tableConfig, catalogManager, moduleManager), catalogManager, CalciteSchema.from(rootSchema), util.Arrays.asList( diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/nodes/calcite/RelNodeTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/nodes/calcite/RelNodeTestBase.scala index fb680a61ac129..743ed16da284e 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/nodes/calcite/RelNodeTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/nodes/calcite/RelNodeTestBase.scala @@ -53,6 +53,7 @@ class RelNodeTestBase { val plannerContext: PlannerContext = new PlannerContext( false, tableConfig, + moduleManager, new FunctionCatalog(tableConfig, catalogManager, moduleManager), catalogManager, CalciteSchema.from(rootSchema), diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala index 66452d8ec2988..d27af9c1c6697 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala @@ -1550,7 +1550,7 @@ object TestingTableEnvironment { val executor = executorFactory.create(tableConfig.getConfiguration) val planner = PlannerFactoryUtil.createPlanner(settings.getPlanner, executor, tableConfig, - catalogMgr, functionCatalog).asInstanceOf[PlannerBase] + moduleManager, catalogMgr, functionCatalog).asInstanceOf[PlannerBase] new TestingTableEnvironment( catalogMgr,