Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connectors.hive.util.JobConfUtils;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.hive.HiveCatalog;
Expand Down Expand Up @@ -65,82 +66,72 @@ public Set<ConfigOption<?>> optionalOptions() {

@Override
public DynamicTableSink createDynamicTableSink(Context context) {
boolean isHiveTable = HiveCatalog.isHiveTable(context.getCatalogTable().getOptions());
final boolean isHiveTable = HiveCatalog.isHiveTable(context.getCatalogTable().getOptions());

// we don't support temporary hive tables yet
if (isHiveTable && !context.isTemporary()) {
Integer configuredParallelism =
Configuration.fromMap(context.getCatalogTable().getOptions())
.get(FileSystemConnectorOptions.SINK_PARALLELISM);
JobConf jobConf = JobConfUtils.createJobConfWithCredentials(hiveConf);
return new HiveTableSink(
context.getConfiguration(),
jobConf,
context.getObjectIdentifier(),
context.getCatalogTable(),
configuredParallelism);
} else {
return FactoryUtil.createTableSink(
null, // we already in the factory of catalog
if (!isHiveTable || context.isTemporary()) {
return FactoryUtil.createDynamicTableSink(
null,
context.getObjectIdentifier(),
context.getCatalogTable(),
context.getConfiguration(),
context.getClassLoader(),
context.isTemporary());
}

final Integer configuredParallelism =
Configuration.fromMap(context.getCatalogTable().getOptions())
.get(FileSystemConnectorOptions.SINK_PARALLELISM);
final JobConf jobConf = JobConfUtils.createJobConfWithCredentials(hiveConf);
return new HiveTableSink(
context.getConfiguration(),
jobConf,
context.getObjectIdentifier(),
context.getCatalogTable(),
configuredParallelism);
}

@Override
public DynamicTableSource createDynamicTableSource(Context context) {
boolean isHiveTable = HiveCatalog.isHiveTable(context.getCatalogTable().getOptions());
final ReadableConfig configuration =
Configuration.fromMap(context.getCatalogTable().getOptions());

// we don't support temporary hive tables yet
if (isHiveTable && !context.isTemporary()) {
CatalogTable catalogTable = Preconditions.checkNotNull(context.getCatalogTable());

boolean isStreamingSource =
Boolean.parseBoolean(
catalogTable
.getOptions()
.getOrDefault(
STREAMING_SOURCE_ENABLE.key(),
STREAMING_SOURCE_ENABLE.defaultValue().toString()));

boolean includeAllPartition =
STREAMING_SOURCE_PARTITION_INCLUDE
.defaultValue()
.equals(
catalogTable
.getOptions()
.getOrDefault(
STREAMING_SOURCE_PARTITION_INCLUDE.key(),
STREAMING_SOURCE_PARTITION_INCLUDE
.defaultValue()));
JobConf jobConf = JobConfUtils.createJobConfWithCredentials(hiveConf);
// hive table source that has not lookup ability
if (isStreamingSource && includeAllPartition) {
return new HiveTableSource(
jobConf,
context.getConfiguration(),
context.getObjectIdentifier().toObjectPath(),
catalogTable);
} else {
// hive table source that has scan and lookup ability
return new HiveLookupTableSource(
jobConf,
context.getConfiguration(),
context.getObjectIdentifier().toObjectPath(),
catalogTable);
}
final boolean isHiveTable = HiveCatalog.isHiveTable(context.getCatalogTable().getOptions());

} else {
return FactoryUtil.createTableSource(
null, // we already in the factory of catalog
// we don't support temporary hive tables yet
if (!isHiveTable || context.isTemporary()) {
return FactoryUtil.createDynamicTableSource(
null,
context.getObjectIdentifier(),
context.getCatalogTable(),
context.getConfiguration(),
context.getClassLoader(),
context.isTemporary());
}

final CatalogTable catalogTable = Preconditions.checkNotNull(context.getCatalogTable());

final boolean isStreamingSource = configuration.get(STREAMING_SOURCE_ENABLE);
final boolean includeAllPartition =
STREAMING_SOURCE_PARTITION_INCLUDE
.defaultValue()
.equals(configuration.get(STREAMING_SOURCE_PARTITION_INCLUDE));
final JobConf jobConf = JobConfUtils.createJobConfWithCredentials(hiveConf);

// hive table source that has not lookup ability
if (isStreamingSource && includeAllPartition) {
return new HiveTableSource(
jobConf,
context.getConfiguration(),
context.getObjectIdentifier().toObjectPath(),
catalogTable);
} else {
// hive table source that has scan and lookup ability
return new HiveLookupTableSource(
jobConf,
context.getConfiguration(),
context.getObjectIdentifier().toObjectPath(),
catalogTable);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.filesystem.FileSystemConnectorOptions.PartitionOrder;
import org.apache.flink.table.filesystem.FileSystemLookupFunction;
Expand Down Expand Up @@ -282,8 +284,9 @@ private DynamicTableSource getTableSource(String tableName) throws Exception {
ObjectIdentifier.of(hiveCatalog.getName(), "default", tableName);
CatalogTable catalogTable =
(CatalogTable) hiveCatalog.getTable(tableIdentifier.toObjectPath());
return FactoryUtil.createTableSource(
hiveCatalog,
return FactoryUtil.createDynamicTableSource(
(DynamicTableSourceFactory)
hiveCatalog.getFactory().orElseThrow(IllegalStateException::new),
tableIdentifier,
tableEnvInternal.getCatalogManager().resolveCatalogTable(catalogTable),
tableEnv.getConfig().getConfiguration(),
Expand All @@ -297,8 +300,9 @@ private DynamicTableSink getTableSink(String tableName) throws Exception {
ObjectIdentifier.of(hiveCatalog.getName(), "default", tableName);
CatalogTable catalogTable =
(CatalogTable) hiveCatalog.getTable(tableIdentifier.toObjectPath());
return FactoryUtil.createTableSink(
hiveCatalog,
return FactoryUtil.createDynamicTableSink(
(DynamicTableSinkFactory)
hiveCatalog.getFactory().orElseThrow(IllegalStateException::new),
tableIdentifier,
tableEnvInternal.getCatalogManager().resolveCatalogTable(catalogTable),
tableEnv.getConfig().getConfiguration(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.filesystem.FileSystemConnectorOptions;
import org.apache.flink.table.filesystem.FileSystemLookupFunction;
Expand Down Expand Up @@ -360,8 +361,11 @@ private FileSystemLookupFunction<HiveTablePartition> getLookupFunction(String ta
(CatalogTable) hiveCatalog.getTable(tableIdentifier.toObjectPath());
HiveLookupTableSource hiveTableSource =
(HiveLookupTableSource)
FactoryUtil.createTableSource(
hiveCatalog,
FactoryUtil.createDynamicTableSource(
(DynamicTableSourceFactory)
hiveCatalog
.getFactory()
.orElseThrow(IllegalStateException::new),
tableIdentifier,
tableEnvInternal
.getCatalogManager()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.TableFactory;
import org.apache.flink.table.factories.TableSinkFactoryContextImpl;
Expand All @@ -47,6 +49,7 @@
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
Expand All @@ -70,31 +73,33 @@ public static void close() {

@Test
public void testGenericTable() throws Exception {
TableSchema schema =
final TableSchema schema =
TableSchema.builder()
.field("name", DataTypes.STRING())
.field("age", DataTypes.INT())
.build();

Map<String, String> properties = new HashMap<>();
properties.put(FactoryUtil.CONNECTOR.key(), "COLLECTION");

catalog.createDatabase("mydb", new CatalogDatabaseImpl(new HashMap<>(), ""), true);
ObjectPath path = new ObjectPath("mydb", "mytable");
CatalogTable table = new CatalogTableImpl(schema, properties, "csv table");
catalog.createTable(path, table, true);
Optional<TableFactory> opt = catalog.getTableFactory();
assertTrue(opt.isPresent());
HiveTableFactory tableFactory = (HiveTableFactory) opt.get();
TableSource tableSource =

final Map<String, String> options =
Collections.singletonMap(FactoryUtil.CONNECTOR.key(), "COLLECTION");
final CatalogTable table = new CatalogTableImpl(schema, options, "csv table");
catalog.createTable(new ObjectPath("mydb", "mytable"), table, true);

final Optional<TableFactory> tableFactoryOpt = catalog.getTableFactory();
assertTrue(tableFactoryOpt.isPresent());
final HiveTableFactory tableFactory = (HiveTableFactory) tableFactoryOpt.get();

final TableSource tableSource =
tableFactory.createTableSource(
new TableSourceFactoryContextImpl(
ObjectIdentifier.of("mycatalog", "mydb", "mytable"),
table,
new Configuration(),
false));
assertTrue(tableSource instanceof StreamTableSource);
TableSink tableSink =

final TableSink tableSink =
tableFactory.createTableSink(
new TableSinkFactoryContextImpl(
ObjectIdentifier.of("mycatalog", "mydb", "mytable"),
Expand All @@ -107,34 +112,35 @@ public void testGenericTable() throws Exception {

@Test
public void testHiveTable() throws Exception {
ResolvedSchema schema =
final ResolvedSchema schema =
ResolvedSchema.of(
Column.physical("name", DataTypes.STRING()),
Column.physical("age", DataTypes.INT()));

Map<String, String> properties = new HashMap<>();
properties.put(FactoryUtil.CONNECTOR.key(), SqlCreateHiveTable.IDENTIFIER);

catalog.createDatabase("mydb", new CatalogDatabaseImpl(new HashMap<>(), ""), true);
ObjectPath path = new ObjectPath("mydb", "mytable");
CatalogTable table =
new CatalogTableImpl(
TableSchema.fromResolvedSchema(schema), properties, "hive table");
catalog.createTable(path, table, true);

DynamicTableSource tableSource =
FactoryUtil.createTableSource(
catalog,

final Map<String, String> options =
Collections.singletonMap(
FactoryUtil.CONNECTOR.key(), SqlCreateHiveTable.IDENTIFIER);
final CatalogTable table =
new CatalogTableImpl(TableSchema.fromResolvedSchema(schema), options, "hive table");
catalog.createTable(new ObjectPath("mydb", "mytable"), table, true);

final DynamicTableSource tableSource =
FactoryUtil.createDynamicTableSource(
(DynamicTableSourceFactory)
catalog.getFactory().orElseThrow(IllegalStateException::new),
ObjectIdentifier.of("mycatalog", "mydb", "mytable"),
new ResolvedCatalogTable(table, schema),
new Configuration(),
Thread.currentThread().getContextClassLoader(),
false);
assertTrue(tableSource instanceof HiveTableSource);

DynamicTableSink tableSink =
FactoryUtil.createTableSink(
catalog,
final DynamicTableSink tableSink =
FactoryUtil.createDynamicTableSink(
(DynamicTableSinkFactory)
catalog.getFactory().orElseThrow(IllegalStateException::new),
ObjectIdentifier.of("mycatalog", "mydb", "mytable"),
new ResolvedCatalogTable(table, schema),
new Configuration(),
Expand Down
53 changes: 53 additions & 0 deletions flink-core/src/main/java/org/apache/flink/util/OptionalUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.util;

import org.apache.flink.annotation.Internal;

import java.util.Optional;
import java.util.stream.Stream;

/** Utilities for working with {@link Optional}. */
@Internal
public class OptionalUtils {

/**
* Converts the given {@link Optional} into a {@link Stream}.
*
* <p>This is akin to {@code Optional#stream} available in JDK9+.
*/
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public static <T> Stream<T> stream(Optional<T> opt) {
return opt.map(Stream::of).orElseGet(Stream::empty);
}

/** Returns the first {@link Optional} which is present. */
@SafeVarargs
public static <T> Optional<T> firstPresent(Optional<T>... opts) {
for (Optional<T> opt : opts) {
if (opt.isPresent()) {
return opt;
}
}

return Optional.empty();
}

private OptionalUtils() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,12 @@ private StreamTableEnvironment createStreamTableEnvironment(

final Planner planner =
PlannerFactoryUtil.createPlanner(
settings.getPlanner(), executor, config, catalogManager, functionCatalog);
settings.getPlanner(),
executor,
config,
moduleManager,
catalogManager,
functionCatalog);

return new StreamTableEnvironmentImpl(
catalogManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public void testErrorMessage() throws Exception {
String[] errorStack =
new String[] {
"at org.apache.flink.table.factories.FactoryUtil.discoverFactory",
"at org.apache.flink.table.factories.FactoryUtil.createTableSource"
"at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource"
};
for (String stack : errorStack) {
assertThat(output, not(containsString(stack)));
Expand All @@ -165,7 +165,7 @@ public void testVerboseErrorMessage() throws Exception {
new String[] {
"org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'invalid'",
"at org.apache.flink.table.factories.FactoryUtil.discoverFactory",
"at org.apache.flink.table.factories.FactoryUtil.createTableSource"
"at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource"
};
for (String error : errors) {
assertThat(output, containsString(error));
Expand Down
Loading