diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/connectors.TableConnectorFactory b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.connectors.TableFactoryDiscoverable similarity index 100% rename from flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/connectors.TableConnectorFactory rename to flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.connectors.TableFactoryDiscoverable diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/connectors.TableConnectorFactory b/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.connectors.TableFactoryDiscoverable similarity index 100% rename from flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/connectors.TableConnectorFactory rename to flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.connectors.TableFactoryDiscoverable diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/connectors.TableConnectorFactory b/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.connectors.TableFactoryDiscoverable similarity index 100% rename from flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/connectors.TableConnectorFactory rename to flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.connectors.TableFactoryDiscoverable diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/connectors.TableConnectorFactory b/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.connectors.TableFactoryDiscoverable similarity index 100% rename from flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/connectors.TableConnectorFactory rename to flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.connectors.TableFactoryDiscoverable diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java index 64199a0df238a4..cbe0211f99c18e 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java @@ -21,12 +21,12 @@ import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.connectors.TableConnectorFactory; +import org.apache.flink.table.connectors.TableFactoryDiscoverable; +import org.apache.flink.table.connectors.TableSourceFactory; import org.apache.flink.table.descriptors.DescriptorProperties; import org.apache.flink.table.descriptors.FormatDescriptorValidator; import org.apache.flink.table.descriptors.KafkaValidator; import org.apache.flink.table.descriptors.SchemaValidator; -import org.apache.flink.table.descriptors.TableDescriptorValidator; import org.apache.flink.table.sources.RowtimeAttributeDescriptor; import org.apache.flink.table.sources.TableSource; import org.apache.flink.types.Row; @@ -69,12 +69,8 @@ /** * Factory for creating configured instances of {@link KafkaJsonTableSource}. */ -abstract class KafkaTableSourceFactory implements TableConnectorFactory> { - - @Override - public String getType() { - return TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE(); - } +abstract class KafkaTableSourceFactory + implements TableSourceFactory, TableFactoryDiscoverable { @Override public Map requiredContext() { @@ -125,7 +121,7 @@ public List supportedProperties() { } @Override - public TableSource create(Map properties) { + public TableSource createTableSource(Map properties) { final DescriptorProperties params = new DescriptorProperties(true); params.putProperties(properties); @@ -188,7 +184,7 @@ public TableSource create(Map properties) { }); // schema - final TableSchema schema = params.getTableSchema(SCHEMA()); + final TableSchema schema = SchemaValidator.deriveTableSourceSchema(params); builder.withSchema(schema); // proctime diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceFactoryTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceFactoryTestBase.java index a02413074db22b..5055acda7b638f 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceFactoryTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceFactoryTestBase.java @@ -22,8 +22,10 @@ import org.apache.flink.formats.avro.generated.Address; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.connectors.TableSourceFactoryService; +import org.apache.flink.table.connectors.TableFactoryService; +import org.apache.flink.table.connectors.TableSourceFactory; import org.apache.flink.table.descriptors.Avro; +import org.apache.flink.table.descriptors.DescriptorProperties; import org.apache.flink.table.descriptors.FormatDescriptor; import org.apache.flink.table.descriptors.Kafka; import org.apache.flink.table.descriptors.Schema; @@ -116,8 +118,11 @@ private void testTableSource(FormatDescriptor format) { .field("zip", Types.STRING) .field("proctime", Types.SQL_TIMESTAMP).proctime()); + DescriptorProperties properties = new DescriptorProperties(true); + testDesc.addProperties(properties); final TableSource factorySource = - (TableSource) TableSourceFactoryService.findAndCreateTableConnector(testDesc); + ((TableSourceFactory) TableFactoryService.find(TableSourceFactory.class, testDesc)) + .createTableSource(properties.asMap()); assertEquals(builderSource, factorySource); } diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java index ccc899b291764c..964ce6756e9654 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java @@ -22,7 +22,9 @@ import org.apache.flink.formats.json.JsonSchemaConverter; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.connectors.TableSourceFactoryService; +import org.apache.flink.table.connectors.TableFactoryService; +import org.apache.flink.table.connectors.TableSourceFactory; +import org.apache.flink.table.descriptors.DescriptorProperties; import org.apache.flink.table.descriptors.FormatDescriptor; import org.apache.flink.table.descriptors.Json; import org.apache.flink.table.descriptors.Kafka; @@ -146,8 +148,11 @@ private void testTableSource(FormatDescriptor format) { new Rowtime().timestampsFromField("time").watermarksFromSource()) .field("proc-time", Types.SQL_TIMESTAMP).proctime()); + DescriptorProperties properties = new DescriptorProperties(true); + testDesc.addProperties(properties); final TableSource factorySource = - (TableSource) TableSourceFactoryService.findAndCreateTableConnector(testDesc); + ((TableSourceFactory) TableFactoryService.find(TableSourceFactory.class, testDesc)) + .createTableSource(properties.asMap()); assertEquals(builderSource, factorySource); } diff --git a/flink-libraries/flink-sql-client/pom.xml b/flink-libraries/flink-sql-client/pom.xml index abc0ab48fbe39a..8843f136041e94 100644 --- a/flink-libraries/flink-sql-client/pom.xml +++ b/flink-libraries/flink-sql-client/pom.xml @@ -159,7 +159,7 @@ under the License. org.codehaus.commons.compiler.properties org/codehaus/janino/** org/codehaus/commons/** - META-INF/services/org.apache.flink.table.connectors.TableConnectorFactory + META-INF/services/org.apache.flink.table.connectors.TableFactoryDiscoverable org/jline/** com/fasterxml/jackson/** diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java index 5076b6d82476c9..233e49b6b0f35e 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java @@ -210,9 +210,6 @@ public void open() { case SELECT: callSelect(cmdCall); break; - case INSERT: - callInsert(cmdCall); - break; case SOURCE: callSource(cmdCall); break; @@ -338,10 +335,6 @@ private void callSelect(SqlCommandCall cmdCall) { } } - private void callInsert(SqlCommandCall cmdCall) { - executor.executeUpdate(context, cmdCall.operands[0]); - } - private void callSource(SqlCommandCall cmdCall) { final String pathString = cmdCall.operands[0]; diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java index 81009c5c8e9782..214a17df835b56 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java @@ -69,7 +69,6 @@ private static String[] splitOperands(SqlCommand cmd, String originalCall, Strin return new String[] {operands.substring(0, delimiter), operands.substring(delimiter + 1)}; } case SELECT: - case INSERT: return new String[] {originalCall}; default: return new String[] {operands}; @@ -90,7 +89,6 @@ enum SqlCommand { DESCRIBE("describe"), EXPLAIN("explain"), SELECT("select"), - INSERT("insert"), SET("set"), RESET("reset"), SOURCE("source"); diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java index 8a03e67a6cb626..1415f2dcdf3145 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java @@ -90,7 +90,7 @@ public void setTables(List> tables) { TableDescriptor tableDescriptor = create(tableName, properties); if (null == tableDescriptor) { throw new SqlClientException( - "Invalid table 'type' attribute value, only 'source' or 'sink' is supported"); + "Invalid table 'type' attribute value, only 'source', 'sink' and 'both' is supported"); } if (this.tables.containsKey(tableName)) { throw new SqlClientException("Duplicate table name '" + tableName + "'."); diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java index 1358db1d17f9e8..c64887e3535b2d 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java @@ -18,6 +18,7 @@ package org.apache.flink.table.client.config; +import org.apache.flink.table.descriptors.DescriptorProperties; import org.apache.flink.table.descriptors.TableDescriptor; import org.apache.flink.table.descriptors.TableDescriptorValidator; @@ -44,6 +45,11 @@ public Map getProperties() { return properties; } + @Override + public void addProperties(DescriptorProperties properties) { + this.properties.forEach(properties::putString); + } + public Source toSource() { final Map newProperties = new HashMap<>(properties); newProperties.replace(TableDescriptorValidator.TABLE_TYPE(), diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java index bb234024e79747..34dd259e55c47f 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java @@ -61,11 +61,6 @@ public interface Executor { */ ResultDescriptor executeQuery(SessionContext session, String query) throws SqlExecutionException; - /** - * Submits a Flink SQL update job (detached). - */ - void executeUpdate(SessionContext session, String query) throws SqlExecutionException; - /** * Asks for the next changelog results (non-blocking). */ diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java index a51c3e02d81084..ffd94936dc8e0b 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java @@ -49,8 +49,10 @@ import org.apache.flink.table.client.config.SourceSink; import org.apache.flink.table.client.gateway.SessionContext; import org.apache.flink.table.client.gateway.SqlExecutionException; -import org.apache.flink.table.connectors.TableSinkFactoryService; -import org.apache.flink.table.connectors.TableSourceFactoryService; +import org.apache.flink.table.connectors.TableFactoryService; +import org.apache.flink.table.connectors.TableSinkFactory; +import org.apache.flink.table.connectors.TableSourceFactory; +import org.apache.flink.table.descriptors.DescriptorProperties; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.sources.TableSource; import org.apache.flink.util.FlinkException; @@ -102,20 +104,30 @@ public ExecutionContext(Environment defaultEnvironment, SessionContext sessionCo tableSinks = new HashMap<>(); mergedEnv.getTables().forEach((name, descriptor) -> { if (descriptor instanceof Source) { + DescriptorProperties properties = new DescriptorProperties(true); + descriptor.addProperties(properties); tableSources.put(name, - (TableSource) TableSourceFactoryService.findAndCreateTableConnector( - descriptor, classLoader)); + ((TableSourceFactory) TableFactoryService.find( + TableSourceFactory.class, descriptor, classLoader)) + .createTableSource(properties.asMap())); } else if (descriptor instanceof Sink) { + DescriptorProperties properties = new DescriptorProperties(true); + descriptor.addProperties(properties); tableSinks.put(name, - (TableSink) TableSinkFactoryService.findAndCreateTableConnector( - descriptor, classLoader)); + ((TableSinkFactory) TableFactoryService.find( + TableSinkFactory.class, descriptor, classLoader)) + .createTableSink(properties.asMap())); } else if (descriptor instanceof SourceSink) { + DescriptorProperties properties = new DescriptorProperties(true); + descriptor.addProperties(properties); tableSources.put(name, - (TableSource) TableSourceFactoryService.findAndCreateTableConnector( - ((SourceSink) descriptor).toSource(), classLoader)); + ((TableSourceFactory) TableFactoryService.find( + TableSourceFactory.class, descriptor, classLoader)) + .createTableSource(properties.asMap())); tableSinks.put(name, - (TableSink) TableSinkFactoryService.findAndCreateTableConnector( - ((SourceSink) descriptor).toSink(), classLoader)); + ((TableSinkFactory) TableFactoryService.find( + TableSinkFactory.class, descriptor, classLoader)) + .createTableSink(properties.asMap())); } }); diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java index 130c76d943ffea..b0ea718f3e920e 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java @@ -225,12 +225,6 @@ public ResultDescriptor executeQuery(SessionContext session, String query) throw return executeQueryInternal(context, query); } - @Override - public void executeUpdate(SessionContext session, String query) throws SqlExecutionException { - final ExecutionContext context = getOrCreateExecutionContext(session); - executeUpdateInternal(context, query); - } - @Override public TypedResult>> retrieveResultChanges(SessionContext session, String resultId) throws SqlExecutionException { diff --git a/flink-libraries/flink-sql-client/src/test/assembly/test-table-source-factory.xml b/flink-libraries/flink-sql-client/src/test/assembly/test-table-source-factory.xml index 86e327ae307bcd..22570f2d5cae77 100644 --- a/flink-libraries/flink-sql-client/src/test/assembly/test-table-source-factory.xml +++ b/flink-libraries/flink-sql-client/src/test/assembly/test-table-source-factory.xml @@ -42,7 +42,7 @@ under the License. src/test/resources/test-factory-services-file META-INF/services - org.apache.flink.table.connectors.TableConnectorFactory + org.apache.flink.table.connectors.TableFactoryDiscoverable 0755 diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java index 0f0f2c84dc766a..0f892303b9792b 100644 --- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java +++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java @@ -46,7 +46,7 @@ public class DependencyTest { "table-connector-factory-test-jar.jar"; @Test - public void testTableConnectorFactoryDiscovery() throws Exception { + public void testTableFactoryDiscovery() throws Exception { // create environment final Map replaceVars = new HashMap<>(); replaceVars.put("$VAR_0", "test-connector"); diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java index 98a8328b26cd54..ef1ca8863b9b6f 100644 --- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java +++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java @@ -50,7 +50,6 @@ public void testMerging() throws Exception { final Set tables = new HashSet<>(); tables.add("TableNumber1"); tables.add("TableNumber2"); - tables.add("TableNumber3"); tables.add("NewTable"); assertEquals(tables, merged.getTables().keySet()); diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java index 95f651c05f6750..0144db126440a1 100644 --- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java +++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java @@ -43,13 +43,9 @@ import org.junit.BeforeClass; import org.junit.ClassRule; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import java.io.File; import java.net.URL; -import java.nio.file.Files; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -58,7 +54,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.junit.Assert.assertEquals; @@ -74,10 +69,6 @@ public class LocalExecutorITCase extends TestLogger { private static final int NUM_TMS = 2; private static final int NUM_SLOTS_PER_TM = 2; - - @Rule - public TemporaryFolder folder = new TemporaryFolder(); - @ClassRule public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( new MiniClusterResourceConfiguration.Builder() @@ -109,8 +100,7 @@ public void testListTables() throws Exception { final List actualTables = executor.listTables(session); - final List expectedTables = - Arrays.asList("TableNumber1", "TableNumber2", "TableNumber3"); + final List expectedTables = Arrays.asList("TableNumber1", "TableNumber2"); assertEquals(expectedTables, actualTables); } @@ -157,48 +147,15 @@ public void testTableSchema() throws Exception { assertEquals(expectedTableSchema, actualTableSchema); } - @Test(timeout = 30_000L) - public void testExecuteUpdate() throws Exception { - final URL url = getClass().getClassLoader().getResource("test-data.csv"); - Objects.requireNonNull(url); - File outputFile = folder.newFile("output.csv"); - final Map replaceVars = new HashMap<>(); - replaceVars.put("$VAR_0", url.getPath()); - replaceVars.put("$VAR_1", "/"); - replaceVars.put("$VAR_2", outputFile.getAbsolutePath()); - replaceVars.put("$VAR_3", "streaming"); - replaceVars.put("$VAR_4", "changelog"); - - final Executor executor = createModifiedExecutor(clusterClient, replaceVars); - final SessionContext session = new SessionContext("test-session", new Environment()); - - try { - executor.executeUpdate( - session, "INSERT INTO TableNumber3 SELECT * FROM TableNumber1"); - Thread.sleep(1000); - } finally { - executor.stop(session); - assertEquals("42,Hello World#" + - "22,Hello World#" + - "32,Hello World#" + - "32,Hello World#" + - "42,Hello World#" + - "52,Hello World!!!!", - Files.lines(outputFile.toPath()).collect(Collectors.joining("#"))); - } - } - @Test(timeout = 30_000L) public void testQueryExecutionChangelog() throws Exception { final URL url = getClass().getClassLoader().getResource("test-data.csv"); Objects.requireNonNull(url); - File outputFile = folder.newFile("output.csv"); final Map replaceVars = new HashMap<>(); replaceVars.put("$VAR_0", url.getPath()); replaceVars.put("$VAR_1", "/"); - replaceVars.put("$VAR_2", outputFile.getAbsolutePath()); - replaceVars.put("$VAR_3", "streaming"); - replaceVars.put("$VAR_4", "changelog"); + replaceVars.put("$VAR_2", "streaming"); + replaceVars.put("$VAR_3", "changelog"); final Executor executor = createModifiedExecutor(clusterClient, replaceVars); final SessionContext session = new SessionContext("test-session", new Environment()); @@ -242,13 +199,11 @@ public void testQueryExecutionChangelog() throws Exception { public void testStreamQueryExecutionTable() throws Exception { final URL url = getClass().getClassLoader().getResource("test-data.csv"); Objects.requireNonNull(url); - File outputFile = folder.newFile("output.csv"); final Map replaceVars = new HashMap<>(); replaceVars.put("$VAR_0", url.getPath()); replaceVars.put("$VAR_1", "/"); - replaceVars.put("$VAR_2", outputFile.getAbsolutePath()); - replaceVars.put("$VAR_3", "streaming"); - replaceVars.put("$VAR_4", "table"); + replaceVars.put("$VAR_2", "streaming"); + replaceVars.put("$VAR_3", "table"); final Executor executor = createModifiedExecutor(clusterClient, replaceVars); final SessionContext session = new SessionContext("test-session", new Environment()); @@ -279,13 +234,11 @@ public void testStreamQueryExecutionTable() throws Exception { public void testBatchQueryExecution() throws Exception { final URL url = getClass().getClassLoader().getResource("test-data.csv"); Objects.requireNonNull(url); - File outputFile = folder.newFile("output.csv"); final Map replaceVars = new HashMap<>(); replaceVars.put("$VAR_0", url.getPath()); replaceVars.put("$VAR_1", "/"); - replaceVars.put("$VAR_2", outputFile.getAbsolutePath()); - replaceVars.put("$VAR_3", "batch"); - replaceVars.put("$VAR_4", "table"); + replaceVars.put("$VAR_2", "batch"); + replaceVars.put("$VAR_3", "table"); final Executor executor = createModifiedExecutor(clusterClient, replaceVars); final SessionContext session = new SessionContext("test-session", new Environment()); diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactory.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactory.java index 47d73e10e368ab..38d4a58c25aac7 100644 --- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactory.java +++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactory.java @@ -23,9 +23,10 @@ import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.Types; import org.apache.flink.table.client.gateway.local.DependencyTest; -import org.apache.flink.table.connectors.TableConnectorFactory; +import org.apache.flink.table.connectors.TableFactoryDiscoverable; +import org.apache.flink.table.connectors.TableSinkFactory; import org.apache.flink.table.descriptors.DescriptorProperties; -import org.apache.flink.table.descriptors.TableDescriptorValidator; +import org.apache.flink.table.descriptors.SchemaValidator; import org.apache.flink.table.sinks.AppendStreamTableSink; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.types.Row; @@ -36,6 +37,7 @@ import java.util.Map; import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE; +import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_FROM; import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_TYPE; import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_TYPE; import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA; @@ -45,12 +47,7 @@ /** * Table sink factory for testing the classloading in {@link DependencyTest}. */ -public class TestTableSinkFactory implements TableConnectorFactory> { - - @Override - public String getType() { - return TableDescriptorValidator.TABLE_TYPE_VALUE_SINK(); - } +public class TestTableSinkFactory implements TableSinkFactory, TableFactoryDiscoverable { @Override public Map requiredContext() { @@ -66,17 +63,18 @@ public List supportedProperties() { properties.add(SCHEMA() + ".#." + SCHEMA_TYPE()); properties.add(SCHEMA() + ".#." + SCHEMA_NAME()); properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_TYPE()); + properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_FROM()); properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_TYPE()); return properties; } @Override - public TableSink create(Map properties) { + public TableSink createTableSink(Map properties) { final DescriptorProperties params = new DescriptorProperties(true); params.putProperties(properties); return new TestTableSink( - params.getTableSchema(SCHEMA()), - properties.get("connector.test-property")); + SchemaValidator.deriveTableSinkSchema(params), + properties.get("connector.test-property")); } // -------------------------------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactory.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactory.java index 99ae81fabd00ef..18b2c62ca94860 100644 --- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactory.java +++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactory.java @@ -24,10 +24,10 @@ import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.Types; import org.apache.flink.table.client.gateway.local.DependencyTest; -import org.apache.flink.table.connectors.TableConnectorFactory; +import org.apache.flink.table.connectors.TableFactoryDiscoverable; +import org.apache.flink.table.connectors.TableSourceFactory; import org.apache.flink.table.descriptors.DescriptorProperties; import org.apache.flink.table.descriptors.SchemaValidator; -import org.apache.flink.table.descriptors.TableDescriptorValidator; import org.apache.flink.table.sources.DefinedProctimeAttribute; import org.apache.flink.table.sources.DefinedRowtimeAttributes; import org.apache.flink.table.sources.RowtimeAttributeDescriptor; @@ -42,6 +42,7 @@ import java.util.Optional; import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE; +import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_FROM; import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_TYPE; import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_TYPE; import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA; @@ -51,12 +52,7 @@ /** * Table source factory for testing the classloading in {@link DependencyTest}. */ -public class TestTableSourceFactory implements TableConnectorFactory> { - - @Override - public String getType() { - return TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE(); - } +public class TestTableSourceFactory implements TableSourceFactory, TableFactoryDiscoverable { @Override public Map requiredContext() { @@ -72,18 +68,19 @@ public List supportedProperties() { properties.add(SCHEMA() + ".#." + SCHEMA_TYPE()); properties.add(SCHEMA() + ".#." + SCHEMA_NAME()); properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_TYPE()); + properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_FROM()); properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_TYPE()); return properties; } @Override - public TableSource create(Map properties) { + public TableSource createTableSource(Map properties) { final DescriptorProperties params = new DescriptorProperties(true); params.putProperties(properties); final Optional proctime = SchemaValidator.deriveProctimeAttribute(params); final List rowtime = SchemaValidator.deriveRowtimeAttributes(params); return new TestTableSource( - params.getTableSchema(SCHEMA()), + SchemaValidator.deriveTableSourceSchema(params), properties.get("connector.test-property"), proctime.orElse(null), rowtime); diff --git a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml index 1da158eb79146f..a9b4161987122c 100644 --- a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml +++ b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml @@ -62,34 +62,16 @@ tables: type: VARCHAR line-delimiter: "\n" comment-prefix: "#" - - name: TableNumber3 - type: sink - schema: - - name: IntegerField1 - type: INT - - name: StringField1 - type: VARCHAR - connector: - type: filesystem - path: "$VAR_2" - num-files: 1 - format: - type: csv - fields: - - name: IntegerField1 - type: INT - - name: StringField1 - type: VARCHAR execution: - type: "$VAR_3" + type: "$VAR_2" time-characteristic: event-time periodic-watermarks-interval: 99 parallelism: 1 max-parallelism: 16 min-idle-state-retention: 0 max-idle-state-retention: 0 - result-mode: "$VAR_4" + result-mode: "$VAR_3" deployment: response-timeout: 5000 diff --git a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-factory.yaml b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-factory.yaml index c0a15bf8454320..01ad63d9db0aec 100644 --- a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-factory.yaml +++ b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-factory.yaml @@ -35,7 +35,8 @@ tables: type: TIMESTAMP rowtime: timestamps: - type: from-source + type: from-field + from: rowtimeField watermarks: type: from-source connector: diff --git a/flink-libraries/flink-table/src/main/resources/META-INF/services/connectors.TableConnectorFactory b/flink-libraries/flink-table/src/main/resources/META-INF/services/org.apache.flink.table.connectors.TableFactoryDiscoverable similarity index 100% rename from flink-libraries/flink-table/src/main/resources/META-INF/services/connectors.TableConnectorFactory rename to flink-libraries/flink-table/src/main/resources/META-INF/services/org.apache.flink.table.connectors.TableFactoryDiscoverable diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala index bc26a1a073cb23..20f223dac130a9 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala @@ -142,15 +142,15 @@ case class CatalogAlreadyExistException( } /** - * Exception for not finding a [[org.apache.flink.table.connectors.TableConnectorFactory]] for the - * given properties. + * Exception for not finding a [[org.apache.flink.table.connectors.TableFactoryDiscoverable]] for + * the given properties. * * @param properties properties that describe the table connector * @param cause the cause */ -case class NoMatchingTableConnectorException(properties: Map[String, String], cause: Throwable) +case class NoMatchingTableFactoryException(properties: Map[String, String], cause: Throwable) extends RuntimeException( - s"Could not find a table connector factory in the classpath satisfying the " + + s"Could not find a table factory in the classpath satisfying the " + s"following properties: \n" + s"${DescriptorProperties.toString(properties)}", cause) { @@ -159,15 +159,15 @@ case class NoMatchingTableConnectorException(properties: Map[String, String], ca } /** - * Exception for finding more than one [[org.apache.flink.table.connectors.TableConnectorFactory]] - * for the given properties. + * Exception for finding more than one + * [[org.apache.flink.table.connectors.TableFactoryDiscoverable]] for the given properties. * - * @param properties properties that describe the table connector + * @param properties properties that describe the table factory * @param cause the cause */ -case class AmbiguousTableConnectorException(properties: Map[String, String], cause: Throwable) +case class AmbiguousTableFactoryException(properties: Map[String, String], cause: Throwable) extends RuntimeException( - s"More than one table connector factory in the classpath satisfying the " + + s"More than one table factory in the classpath satisfying the " + s"following properties: \n" + s"${DescriptorProperties.toString(properties)}", cause) { diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala index 486f7afb602c67..7e9ac21f0d519c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala @@ -19,12 +19,15 @@ package org.apache.flink.table.catalog import org.apache.flink.table.api._ -import org.apache.flink.table.connectors.TableSourceFactoryService +import org.apache.flink.table.connectors.{TableFactoryService, TableSourceFactory} +import org.apache.flink.table.descriptors.DescriptorProperties import org.apache.flink.table.plan.schema.{BatchTableSourceTable, StreamTableSourceTable, TableSourceSinkTable, TableSourceTable} import org.apache.flink.table.plan.stats.FlinkStatistic import org.apache.flink.table.sources.{BatchTableSource, StreamTableSource} import org.apache.flink.table.util.Logging +import _root_.scala.collection.JavaConverters._ + /** * The utility class is used to convert ExternalCatalogTable to TableSourceTable. */ @@ -40,7 +43,11 @@ object ExternalTableSourceUtil extends Logging { tableEnv: TableEnvironment, externalCatalogTable: ExternalCatalogTable) : TableSourceSinkTable[_, _] = { - val source = TableSourceFactoryService.findAndCreateTableConnector(externalCatalogTable) + val properties = new DescriptorProperties() + externalCatalogTable.addProperties(properties) + val source = TableFactoryService.find(classOf[TableSourceFactory[_]], externalCatalogTable) + .asInstanceOf[TableSourceFactory[_]] + .createTableSource(properties.asMap) tableEnv match { // check for a batch table source in this batch environment case _: BatchTableEnvironment => diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/TableConnectorFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/TableFactoryDiscoverable.scala similarity index 73% rename from flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/TableConnectorFactory.scala rename to flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/TableFactoryDiscoverable.scala index 356a0582d25701..d392de009db997 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/TableConnectorFactory.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/TableFactoryDiscoverable.scala @@ -20,19 +20,12 @@ package org.apache.flink.table.connectors import java.util -trait TableConnectorFactory[T] { +/** + * Common trait for all properties-based discoverable table factories. + */ +trait TableFactoryDiscoverable { /** - * Specify the type of the table connector, check - * [[org.apache.flink.table.descriptors.TableDescriptorValidator]] for all values. - * - * @return the table connector type. - */ - def getType() : String - - /** - * Specifies the context that this factory has been implemented for. The framework guarantees - * to only call the [[create()]] method of the factory if the specified set of properties and - * values are met. + * Specifies the context that this factory has been implemented for. T * * Typical properties might be: * - connector.type @@ -62,13 +55,4 @@ trait TableConnectorFactory[T] { * versions like "format.property-version" must not be part of the supported properties. */ def supportedProperties(): util.List[String] - - /** - * Creates and configures a [[org.apache.flink.table.sources.TableSource]] or - * [[org.apache.flink.table.sinks.TableSink]] using the given properties. - * - * @param properties normalized properties describing a table connector. - * @return the configured table connector - */ - def create(properties: util.Map[String, String]): T } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/TableConnectorFactoryService.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/TableFactoryService.scala similarity index 54% rename from flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/TableConnectorFactoryService.scala rename to flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/TableFactoryService.scala index e90d063c74375b..39a1e7ef94dbf6 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/TableConnectorFactoryService.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/TableFactoryService.scala @@ -26,95 +26,91 @@ import org.apache.flink.table.descriptors.FormatDescriptorValidator._ import org.apache.flink.table.descriptors.MetadataValidator._ import org.apache.flink.table.descriptors.StatisticsValidator._ import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator} -import org.apache.flink.table.sinks.TableSink -import org.apache.flink.table.sources.TableSource import org.apache.flink.table.util.Logging import _root_.scala.collection.JavaConverters._ import _root_.scala.collection.mutable /** - * Unified interface to create TableConnectors, e.g. [[org.apache.flink.table.sources.TableSource]] - * and [[org.apache.flink.table.sinks.TableSink]]. + * Unified interface to search for TableFactoryDiscoverable of provided type and properties. */ -class TableConnectorFactoryService[T] extends Logging { +object TableFactoryService extends Logging { - private lazy val defaultLoader = ServiceLoader.load(classOf[TableConnectorFactory[_]]) + private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactoryDiscoverable]) - def findAndCreateTableConnector(descriptor: TableDescriptor): T = { - findAndCreateTableConnector(descriptor, null) + def find(clz: Class[_], descriptor: TableDescriptor): TableFactoryDiscoverable = { + find(clz, descriptor, null) } - def findAndCreateTableConnector(descriptor: TableDescriptor, classLoader: ClassLoader) - : T = { + def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader) + : TableFactoryDiscoverable = { val properties = new DescriptorProperties() descriptor.addProperties(properties) - findAndCreateTableConnector(properties.asMap.asScala.toMap, classLoader) + find(clz, properties.asMap.asScala.toMap, classLoader) } - def findAndCreateTableConnector(properties: Map[String, String]): T = { - findAndCreateTableConnector(properties, null) + def find(clz: Class[_], properties: Map[String, String]): TableFactoryDiscoverable = { + find(clz: Class[_], properties, null) } - def findAndCreateTableConnector(properties: Map[String, String], - classLoader: ClassLoader) - : T = { + def find(clz: Class[_], properties: Map[String, String], + classLoader: ClassLoader): TableFactoryDiscoverable = { - var matchingFactory: Option[(TableConnectorFactory[T], Seq[String])] = None + var matchingFactory: Option[(TableFactoryDiscoverable, Seq[String])] = None try { val iter = if (classLoader == null) { defaultLoader.iterator() } else { - val customLoader = ServiceLoader.load(classOf[TableConnectorFactory[_]], classLoader) + val customLoader = ServiceLoader.load(classOf[TableFactoryDiscoverable], classLoader) customLoader.iterator() } while (iter.hasNext) { val factory = iter.next() - val requiredContextJava = try { - factory.requiredContext() - } catch { - case t: Throwable => - throw new TableException( - s"Table source factory '${factory.getClass.getCanonicalName}' caused an exception.", - t) - } + if (clz.isAssignableFrom(factory.getClass)) { + val requiredContextJava = try { + factory.requiredContext() + } catch { + case t: Throwable => + throw new TableException( + s"Table source factory '${factory.getClass.getCanonicalName}' caused an exception.", + t) + } - val requiredContext = if (requiredContextJava != null) { - // normalize properties - requiredContextJava.asScala.map(e => (e._1.toLowerCase, e._2)) - } else { - Map[String, String]() - } + val requiredContext = if (requiredContextJava != null) { + // normalize properties + requiredContextJava.asScala.map(e => (e._1.toLowerCase, e._2)) + } else { + Map[String, String]() + } - val plainContext = mutable.Map[String, String]() - plainContext ++= requiredContext - // we remove the versions for now until we have the first backwards compatibility case - // with the version we can provide mappings in case the format changes - plainContext.remove(CONNECTOR_PROPERTY_VERSION) - plainContext.remove(FORMAT_PROPERTY_VERSION) - plainContext.remove(METADATA_PROPERTY_VERSION) - plainContext.remove(STATISTICS_PROPERTY_VERSION) - - // check if required context is met - if (properties.get(TableDescriptorValidator.TABLE_TYPE).get.equals(factory.getType()) && - plainContext.forall(e => properties.contains(e._1) && properties(e._1) == e._2)) { - matchingFactory match { - case Some(_) => throw new AmbiguousTableConnectorException(properties) - case None => matchingFactory = - Some((factory.asInstanceOf[TableConnectorFactory[T]], requiredContext.keys.toSeq)) + val plainContext = mutable.Map[String, String]() + plainContext ++= requiredContext + // we remove the versions for now until we have the first backwards compatibility case + // with the version we can provide mappings in case the format changes + plainContext.remove(CONNECTOR_PROPERTY_VERSION) + plainContext.remove(FORMAT_PROPERTY_VERSION) + plainContext.remove(METADATA_PROPERTY_VERSION) + plainContext.remove(STATISTICS_PROPERTY_VERSION) + + if (plainContext.forall(e => properties.contains(e._1) && properties(e._1) == e._2)) { + matchingFactory match { + case Some(_) => throw new AmbiguousTableFactoryException(properties) + case None => matchingFactory = + Some((factory.asInstanceOf[TableFactoryDiscoverable], requiredContext.keys.toSeq)) + } } } } } catch { case e: ServiceConfigurationError => - LOG.error("Could not load service provider for table source factories.", e) - throw new TableException("Could not load service provider for table source factories.", e) + LOG.error("Could not load service provider for table factories.", e) + throw new TableException("Could not load service provider for table factories.", e) } val (factory, context) = matchingFactory - .getOrElse(throw new NoMatchingTableConnectorException(properties)) + .getOrElse(throw new NoMatchingTableFactoryException(properties)) val plainProperties = mutable.ArrayBuffer[String]() properties.keys.foreach { k => @@ -153,7 +149,7 @@ class TableConnectorFactoryService[T] extends Logging { // create the table connector try { - factory.create(properties.asJava) + factory } catch { case t: Throwable => throw new TableException( @@ -161,8 +157,4 @@ class TableConnectorFactoryService[T] extends Logging { t) } } -} - -object TableSourceFactoryService extends TableConnectorFactoryService[TableSource[_]] - -object TableSinkFactoryService extends TableConnectorFactoryService[TableSink[_]] +} \ No newline at end of file diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/TableSinkFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/TableSinkFactory.scala new file mode 100644 index 00000000000000..6346e389dfd448 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/TableSinkFactory.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connectors + +import org.apache.flink.table.sinks.TableSink + +import java.util + +trait TableSinkFactory[T] { + /** + * Creates and configures a [[org.apache.flink.table.sinks.TableSink]] + * using the given properties. + * + * @param properties normalized properties describing a table source. + * @return the configured table source. + */ + def createTableSink(properties: util.Map[String, String]): TableSink[T] +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/TableSourceFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/TableSourceFactory.scala new file mode 100644 index 00000000000000..bd3130a3f49bb6 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/TableSourceFactory.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connectors + +import org.apache.flink.table.sources.TableSource + +import java.util + +trait TableSourceFactory[T] { + /** + * Creates and configures a [[org.apache.flink.table.sources.TableSource]] + * using the given properties. + * + * @param properties normalized properties describing a table source. + * @return the configured table source. + */ + def createTableSource(properties: util.Map[String, String]): TableSource[T] +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala index 19c6d89027b7f2..155153fd9ffd78 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.descriptors import org.apache.flink.table.api.{BatchTableEnvironment, Table, TableException, ValidationException} -import org.apache.flink.table.connectors.TableSourceFactoryService +import org.apache.flink.table.connectors.{TableFactoryService, TableSourceFactory} import org.apache.flink.table.sources.{BatchTableSource, TableSource} class BatchTableSourceDescriptor(tableEnv: BatchTableEnvironment, connector: ConnectorDescriptor) @@ -44,7 +44,11 @@ class BatchTableSourceDescriptor(tableEnv: BatchTableEnvironment, connector: Con * Searches for the specified table source, configures it accordingly, and returns it. */ def toTableSource: TableSource[_] = { - val source = TableSourceFactoryService.findAndCreateTableConnector(this) + val properties = new DescriptorProperties() + addProperties(properties) + val source = TableFactoryService.find(classOf[TableSourceFactory[_]], this) + .asInstanceOf[TableSourceFactory[_]] + .createTableSource(properties.asMap) source match { case _: BatchTableSource[_] => source case _ => throw new TableException( diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala index 4b0b60d85d8ed9..388d87cfe20041 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala @@ -324,7 +324,6 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { if (fieldCount == 0) { return toJava(None) } - // validate fields and build schema val schemaBuilder = TableSchema.builder() for (i <- 0 until fieldCount) { @@ -336,7 +335,7 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { ), TypeStringUtils.readTypeInfo( properties.getOrElse(tpe, throw new ValidationException(s"Invalid table schema. " + - s"Could not find type for field '$key.$i'.")) + s"Could not find type for field '$key.$i'.")) ) ) } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala index 9cb3258d68abaa..4b18dfd110c27b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala @@ -21,7 +21,7 @@ package org.apache.flink.table.descriptors import java.util import java.util.Optional -import org.apache.flink.table.api.{TableSchema, ValidationException} +import org.apache.flink.table.api.{TableException, TableSchema, ValidationException} import org.apache.flink.table.descriptors.DescriptorProperties.{toJava, toScala} import org.apache.flink.table.descriptors.RowtimeValidator.{ROWTIME, ROWTIME_TIMESTAMPS_FROM, ROWTIME_TIMESTAMPS_TYPE, ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD} import org.apache.flink.table.descriptors.SchemaValidator._ @@ -136,6 +136,43 @@ object SchemaValidator { attributes.asJava } + def deriveTableSourceSchema(properties: DescriptorProperties): TableSchema = { + properties.getTableSchema(SCHEMA) + } + + def deriveTableSinkSchema(properties: DescriptorProperties): TableSchema = { + val builder = TableSchema.builder() + + val schema = properties.getTableSchema(SCHEMA) + + schema.getColumnNames.zip(schema.getTypes).zipWithIndex.foreach { case ((n, t), i) => + val isProctime = properties + .getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME") + .orElse(false) + val tsType = s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE" + val isRowtime = properties.containsKey(tsType) + if (!isProctime && !isRowtime) { + // check for a aliasing + val fieldName = properties.getOptionalString(s"$SCHEMA.$i.$SCHEMA_FROM") + .orElse(n) + builder.field(fieldName, t) + } + // only use the rowtime attribute if it references a field + else if (isRowtime) { + properties.getString(tsType) match { + case ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD => { + val field = properties.getString(s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_FROM") + builder.field(field, t) + } + case _ => throw new TableException(s"Unsupported rowtime type for sink table schema: " + + s"${properties.getString(tsType)}") + } + } + } + + builder.build() + } + /** * Finds a table source field mapping. */ diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala index 9c51ad91e82b5f..3e7c900befcffb 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.descriptors import org.apache.flink.table.api.{StreamTableEnvironment, Table, TableException, ValidationException} -import org.apache.flink.table.connectors.TableSourceFactoryService +import org.apache.flink.table.connectors.{TableFactoryService, TableSourceFactory} import org.apache.flink.table.sources.{StreamTableSource, TableSource} /** @@ -47,7 +47,11 @@ class StreamTableSourceDescriptor(tableEnv: StreamTableEnvironment, connector: C * Searches for the specified table source, configures it accordingly, and returns it. */ def toTableSource: TableSource[_] = { - val source = TableSourceFactoryService.findAndCreateTableConnector(this) + val properties = new DescriptorProperties() + addProperties(properties) + val source = TableFactoryService.find(classOf[TableSourceFactory[_]], this) + .asInstanceOf[TableSourceFactory[_]] + .createTableSource(properties.asMap) source match { case _: StreamTableSource[_] => source case _ => throw new TableException( diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala index d38705251d8ad4..f652274a570837 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala @@ -41,8 +41,8 @@ abstract class PhysicalTableSourceScan( val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] val streamingTable = table.unwrap(classOf[TableSourceSinkTable[_, _]]) match { case t: TableSourceSinkTable[_, _] => t.tableSourceTableOpt match { - case _: StreamTableSourceTable[_] => true - case _: BatchTableSourceTable[_] => false + case Some(_: StreamTableSourceTable[_]) => true + case Some(_: BatchTableSourceTable[_]) => false case _ => throw TableException(s"Unknown Table type ${t.getClass}.") } case t => throw TableException(s"Unknown Table type ${t.getClass}.") diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactory.scala index 5c53948dc98fa3..8d6ef3fdc579eb 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactory.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactory.scala @@ -21,7 +21,7 @@ package org.apache.flink.table.sinks import java.util import org.apache.flink.table.api.TableException -import org.apache.flink.table.connectors.TableConnectorFactory +import org.apache.flink.table.connectors.{TableFactoryDiscoverable, TableSinkFactory} import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ import org.apache.flink.table.descriptors.CsvValidator._ import org.apache.flink.table.descriptors.DescriptorProperties._ @@ -34,9 +34,7 @@ import org.apache.flink.types.Row /** * Factory for creating configured instances of [[CsvTableSink]]. */ -class CsvTableSinkFactory extends TableConnectorFactory[TableSink[Row]] { - - override def getType(): String = TableDescriptorValidator.TABLE_TYPE_VALUE_SINK +class CsvTableSinkFactory extends TableSinkFactory[Row] with TableFactoryDiscoverable { override def requiredContext(): util.Map[String, String] = { val context = new util.HashMap[String, String]() @@ -64,7 +62,7 @@ class CsvTableSinkFactory extends TableConnectorFactory[TableSink[Row]] { properties } - override def create(properties: util.Map[String, String]): TableSink[Row] = { + override def createTableSink(properties: util.Map[String, String]): TableSink[Row] = { val params = new DescriptorProperties() params.putProperties(properties) @@ -77,7 +75,7 @@ class CsvTableSinkFactory extends TableConnectorFactory[TableSink[Row]] { val csvTableSinkBuilder = new CsvTableSink.Builder val formatSchema = params.getTableSchema(FORMAT_FIELDS) - val tableSchema = params.getTableSchema(SCHEMA) + val tableSchema = SchemaValidator.deriveTableSinkSchema(params) if (!formatSchema.equals(tableSchema)) { throw new TableException( diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala index 1a4c2b9b5ee0ba..c74d8a013a879b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala @@ -21,7 +21,7 @@ package org.apache.flink.table.sources import java.util import org.apache.flink.table.api.TableException -import org.apache.flink.table.connectors.TableConnectorFactory +import org.apache.flink.table.connectors.{TableFactoryDiscoverable, TableSourceFactory} import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION, CONNECTOR_TYPE} import org.apache.flink.table.descriptors.CsvValidator._ import org.apache.flink.table.descriptors.DescriptorProperties.toScala @@ -34,9 +34,7 @@ import org.apache.flink.types.Row /** * Factory for creating configured instances of [[CsvTableSource]]. */ -class CsvTableSourceFactory extends TableConnectorFactory[TableSource[Row]] { - - override def getType: String = TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE +class CsvTableSourceFactory extends TableSourceFactory[Row] with TableFactoryDiscoverable { override def requiredContext(): util.Map[String, String] = { val context = new util.HashMap[String, String]() @@ -67,7 +65,7 @@ class CsvTableSourceFactory extends TableConnectorFactory[TableSource[Row]] { properties } - override def create(properties: util.Map[String, String]): TableSource[Row] = { + override def createTableSource(properties: util.Map[String, String]): TableSource[Row] = { val params = new DescriptorProperties() params.putProperties(properties) diff --git a/flink-libraries/flink-table/src/test/resources/META-INF/services/connectors.TableConnectorFactory b/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.connectors.TableFactoryDiscoverable similarity index 87% rename from flink-libraries/flink-table/src/test/resources/META-INF/services/connectors.TableConnectorFactory rename to flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.connectors.TableFactoryDiscoverable index 332cc3517072a8..e25e93de7743c4 100644 --- a/flink-libraries/flink-table/src/test/resources/META-INF/services/connectors.TableConnectorFactory +++ b/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.connectors.TableFactoryDiscoverable @@ -13,5 +13,5 @@ # See the License for the specific language governing permissions and # limitations under the License. -connectors.TestTableSinkFactory -connectors.TestTableSourceFactory \ No newline at end of file +org.apache.flink.table.connectors.TestTableSinkFactory +org.apache.flink.table.connectors.TestTableSourceFactory \ No newline at end of file diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TableSinkFactoryServiceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TableSinkFactoryServiceTest.scala index fbbb3330cad316..4617f2a19ad159 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TableSinkFactoryServiceTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TableSinkFactoryServiceTest.scala @@ -18,27 +18,28 @@ package org.apache.flink.table.connectors -import org.apache.flink.table.api.{NoMatchingTableConnectorException, TableException, ValidationException} +import org.apache.flink.table.api.{NoMatchingTableFactoryException, ValidationException} import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ import org.apache.flink.table.descriptors.FormatDescriptorValidator._ import org.apache.flink.table.descriptors.TableDescriptorValidator import org.junit.Assert._ import org.junit.Test +import scala.collection.JavaConverters._ import scala.collection.mutable class TableSinkFactoryServiceTest { @Test def testValidProperties(): Unit = { val props = properties() - assertTrue(TableSinkFactoryService.findAndCreateTableConnector(props.toMap) != null) + assertTrue(TableFactoryService.find(classOf[TableSinkFactory[_]], props.toMap) != null) } - @Test(expected = classOf[NoMatchingTableConnectorException]) + @Test(expected = classOf[NoMatchingTableFactoryException]) def testInvalidContext(): Unit = { val props = properties() props.put(CONNECTOR_TYPE, "FAIL") - TableSinkFactoryService.findAndCreateTableConnector(props.toMap) + TableFactoryService.find(classOf[TableSinkFactory[_]], props.toMap) } @Test @@ -46,21 +47,22 @@ class TableSinkFactoryServiceTest { val props = properties() props.put(CONNECTOR_PROPERTY_VERSION, "2") // the table source should still be found - assertTrue(TableSinkFactoryService.findAndCreateTableConnector(props.toMap) != null) + assertTrue(TableFactoryService.find(classOf[TableSinkFactory[_]], props.toMap) != null) } @Test(expected = classOf[ValidationException]) def testUnsupportedProperty(): Unit = { val props = properties() props.put("format.path_new", "/new/path") - TableSinkFactoryService.findAndCreateTableConnector(props.toMap) + TableFactoryService.find(classOf[TableSinkFactory[_]], props.toMap) } - @Test(expected = classOf[TableException]) + @Test(expected = classOf[IllegalArgumentException]) def testFailingFactory(): Unit = { val props = properties() props.put("failing", "true") - TableSinkFactoryService.findAndCreateTableConnector(props.toMap) + TableFactoryService.find(classOf[TableSinkFactory[_]], props.toMap) + .asInstanceOf[TableSinkFactory[_]].createTableSink(props.asJava) } private def properties(): mutable.Map[String, String] = { diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TableSourceFactoryServiceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TableSourceFactoryServiceTest.scala index 247c3c4aadb68f..1838587b827663 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TableSourceFactoryServiceTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TableSourceFactoryServiceTest.scala @@ -18,13 +18,14 @@ package org.apache.flink.table.connectors -import org.apache.flink.table.api.{NoMatchingTableConnectorException, TableException, ValidationException} +import org.apache.flink.table.api.{NoMatchingTableFactoryException, ValidationException} import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION, CONNECTOR_TYPE} import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_PROPERTY_VERSION, FORMAT_TYPE} import org.apache.flink.table.descriptors.TableDescriptorValidator import org.junit.Assert.assertTrue import org.junit.Test +import scala.collection.JavaConverters._ import scala.collection.mutable class TableSourceFactoryServiceTest { @@ -32,14 +33,14 @@ class TableSourceFactoryServiceTest { @Test def testValidProperties(): Unit = { val props = properties() - assertTrue(TableSourceFactoryService.findAndCreateTableConnector(props.toMap) != null) + assertTrue(TableFactoryService.find(classOf[TableSourceFactory[_]], props.toMap) != null) } - @Test(expected = classOf[NoMatchingTableConnectorException]) + @Test(expected = classOf[NoMatchingTableFactoryException]) def testInvalidContext(): Unit = { val props = properties() props.put(CONNECTOR_TYPE, "FAIL") - TableSourceFactoryService.findAndCreateTableConnector(props.toMap) + TableFactoryService.find(classOf[TableSourceFactory[_]], props.toMap) } @Test @@ -47,21 +48,23 @@ class TableSourceFactoryServiceTest { val props = properties() props.put(CONNECTOR_PROPERTY_VERSION, "2") // the table source should still be found - assertTrue(TableSourceFactoryService.findAndCreateTableConnector(props.toMap) != null) + assertTrue(TableFactoryService.find(classOf[TableSourceFactory[_]], props.toMap) != null) } @Test(expected = classOf[ValidationException]) def testUnsupportedProperty(): Unit = { val props = properties() props.put("format.path_new", "/new/path") - TableSourceFactoryService.findAndCreateTableConnector(props.toMap) + TableFactoryService.find(classOf[TableSourceFactory[_]], props.toMap) } - @Test(expected = classOf[TableException]) + @Test(expected = classOf[IllegalArgumentException]) def testFailingFactory(): Unit = { val props = properties() props.put("failing", "true") - TableSourceFactoryService.findAndCreateTableConnector(props.toMap) + TableFactoryService.find(classOf[TableSourceFactory[_]], props.toMap) + .asInstanceOf[TableSourceFactory[_]] + .createTableSource(props.asJava) } private def properties(): mutable.Map[String, String] = { diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TestTableSinkFactory.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TestTableSinkFactory.scala index 8981c59d4fe4c5..5d89317b458f02 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TestTableSinkFactory.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TestTableSinkFactory.scala @@ -27,7 +27,7 @@ import org.apache.flink.table.descriptors.TableDescriptorValidator import org.apache.flink.table.sinks.TableSink import org.apache.flink.types.Row -class TestTableSinkFactory extends TableConnectorFactory[TableSink[Row]] { +class TestTableSinkFactory extends TableSinkFactory[Row] with TableFactoryDiscoverable { override def requiredContext(): util.Map[String, String] = { val context = new util.HashMap[String, String]() @@ -48,7 +48,7 @@ class TestTableSinkFactory extends TableConnectorFactory[TableSink[Row]] { properties } - override def create(properties: util.Map[String, String]): TableSink[Row] = { + override def createTableSink(properties: util.Map[String, String]): TableSink[Row] = { if (properties.get("failing") == "true") { throw new IllegalArgumentException("Error in this factory.") } @@ -65,7 +65,5 @@ class TestTableSinkFactory extends TableConnectorFactory[TableSink[Row]] { throw new UnsupportedOperationException() } } - - override def getType(): String = TableDescriptorValidator.TABLE_TYPE_VALUE_SINK } diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TestTableSourceFactory.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TestTableSourceFactory.scala index ac7c7f98a8d3c5..656f2424da0278 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TestTableSourceFactory.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connectors/TestTableSourceFactory.scala @@ -24,14 +24,13 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.table.api.TableSchema import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION, CONNECTOR_TYPE} import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_PROPERTY_VERSION, FORMAT_TYPE} -import org.apache.flink.table.descriptors.TableDescriptorValidator import org.apache.flink.table.sources.TableSource import org.apache.flink.types.Row /** * Table source factory for testing. */ -class TestTableSourceFactory extends TableConnectorFactory[TableSource[Row]] { +class TestTableSourceFactory extends TableSourceFactory[Row] with TableFactoryDiscoverable { override def requiredContext(): util.Map[String, String] = { val context = new util.HashMap[String, String]() @@ -52,7 +51,7 @@ class TestTableSourceFactory extends TableConnectorFactory[TableSource[Row]] { properties } - override def create(properties: util.Map[String, String]): TableSource[Row] = { + override def createTableSource(properties: util.Map[String, String]): TableSource[Row] = { if (properties.get("failing") == "true") { throw new IllegalArgumentException("Error in this factory.") } @@ -62,6 +61,4 @@ class TestTableSourceFactory extends TableConnectorFactory[TableSource[Row]] { override def getReturnType: TypeInformation[Row] = throw new UnsupportedOperationException() } } - - override def getType(): String = TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE } diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala index bf7b84b8e769d3..f40c19ca4f7d13 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala @@ -20,7 +20,7 @@ package org.apache.flink.table.descriptors import java.util.Optional -import org.apache.flink.table.api.{TableSchema, Types} +import org.apache.flink.table.api.{TableException, TableSchema, Types} import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp} import org.apache.flink.table.sources.wmstrategies.PreserveWatermarks import org.junit.Assert.{assertEquals, assertTrue} @@ -78,6 +78,40 @@ class SchemaValidatorTest { assertEquals(expectedFormatSchema, formatSchema) } + @Test(expected = classOf[TableException]) + def testDeriveTableSinkSchemaWithRowtimeFromSource(): Unit = { + val desc1 = Schema() + .field("otherField", Types.STRING).from("csvField") + .field("abcField", Types.STRING) + .field("p", Types.SQL_TIMESTAMP).proctime() + .field("r", Types.SQL_TIMESTAMP).rowtime( + Rowtime().timestampsFromSource().watermarksFromSource()) + val props = new DescriptorProperties() + desc1.addProperties(props) + + SchemaValidator.deriveTableSinkSchema(props) + } + + @Test + def testDeriveTableSinkSchemaWithRowtimeFromField(): Unit = { + val desc1 = Schema() + .field("otherField", Types.STRING).from("csvField") + .field("abcField", Types.STRING) + .field("p", Types.SQL_TIMESTAMP).proctime() + .field("r", Types.SQL_TIMESTAMP).rowtime( + Rowtime().timestampsFromField("myTime").watermarksFromSource()) + val props = new DescriptorProperties() + desc1.addProperties(props) + + val expectedTableSinkSchema = TableSchema.builder() + .field("csvField", Types.STRING) // aliased + .field("abcField", Types.STRING) + .field("myTime", Types.SQL_TIMESTAMP) + .build() + + assertEquals(expectedTableSinkSchema, SchemaValidator.deriveTableSinkSchema(props)) + } + @Test def testSchemaWithRowtimeFromField(): Unit = { val desc1 = Schema() diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala index d60b11d74e3026..3a7b04d13b1683 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala @@ -27,13 +27,14 @@ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.table.api.{TableEnvironment, TableSchema, Types} import org.apache.flink.table.api.scala._ +import org.apache.flink.table.descriptors.{DescriptorProperties, Rowtime, Schema} import org.apache.flink.table.expressions.utils.SplitUDF import org.apache.flink.table.expressions.utils.Func15 import org.apache.flink.table.runtime.stream.sql.SqlITCase.TimestampAndWatermarkWithOffset import org.apache.flink.table.runtime.utils.TimeTestUtil.EventTimeSourceFunction import org.apache.flink.table.runtime.utils.{JavaUserDefinedTableFunctions, StreamITCase, StreamTestData, StreamingWithStateTestBase} import org.apache.flink.types.Row -import org.apache.flink.table.utils.MemoryTableSourceSinkUtil +import org.apache.flink.table.utils.{InMemoryTableSinkFactory, InMemoryTableFactory, MemoryTableSourceSinkUtil} import org.junit.Assert._ import org.junit._ @@ -722,37 +723,35 @@ class SqlITCase extends StreamingWithStateTestBase { var tEnv = TableEnvironment.getTableEnvironment(env) MemoryTableSourceSinkUtil.clear + val desc = Schema() + .field("a", Types.INT) + .field("e", Types.LONG) + .field("f", Types.STRING) + .field("t", Types.SQL_TIMESTAMP) + .rowtime(Rowtime().timestampsFromField("t").watermarksPeriodicAscending()) + .field("proctime", Types.SQL_TIMESTAMP).proctime() + val props = new DescriptorProperties() + desc.addProperties(props) + val t = StreamTestData.getSmall3TupleDataStream(env) .assignAscendingTimestamps(x => x._2) .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime) tEnv.registerTable("sourceTable", t) - val fieldNames = Array("a", "e", "f", "t") - val fieldTypes = Array(Types.INT, Types.LONG, Types.STRING, Types.SQL_TIMESTAMP) - .asInstanceOf[Array[TypeInformation[_]]] - - val tableSchema = new TableSchema( - Array("a", "e", "f", "t", "rowtime", "proctime"), - Array(Types.INT, Types.LONG, Types.STRING, Types.SQL_TIMESTAMP, - Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP)) - val returnType = new RowTypeInfo( - Array(Types.INT, Types.LONG, Types.STRING, Types.SQL_TIMESTAMP) - .asInstanceOf[Array[TypeInformation[_]]], - Array("a", "e", "f", "t")) - tEnv.registerTableSource("targetTable", new MemoryTableSourceSinkUtil.UnsafeMemoryTableSource( - tableSchema, returnType, "rowtime", 3)) + tEnv.registerTableSource("targetTable", + new InMemoryTableFactory().createTableSource(props.asMap)) tEnv.registerTableSink("targetTable", - new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink().configure(fieldNames, fieldTypes)) + new InMemoryTableFactory().createTableSink(props.asMap)) tEnv.sqlUpdate("INSERT INTO targetTable SELECT a, b, c, rowtime FROM sourceTable") - tEnv.sqlQuery("SELECT a, e, f, t, rowtime from targetTable") + tEnv.sqlQuery("SELECT a, e, f, t from targetTable") .addSink(new StreamITCase.StringSink[Row]) env.execute() val expected = List( - "1,1,Hi,1970-01-01 00:00:00.001,1970-01-01 00:00:00.001", - "2,2,Hello,1970-01-01 00:00:00.002,1970-01-01 00:00:00.002", - "3,2,Hello world,1970-01-01 00:00:00.002,1970-01-01 00:00:00.002") + "1,1,Hi,1970-01-01 00:00:00.001", + "2,2,Hello,1970-01-01 00:00:00.002", + "3,2,Hello world,1970-01-01 00:00:00.002") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala new file mode 100644 index 00000000000000..a2e2d14e65c4ac --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.utils + +import java.util + +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.table.connectors.{TableFactoryDiscoverable, TableSinkFactory, TableSourceFactory} +import org.apache.flink.table.sources.TableSource +import org.apache.flink.types.Row +import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION +import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE +import org.apache.flink.table.descriptors.{DescriptorProperties, SchemaValidator} +import org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_CLASS +import org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_FROM +import org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_SERIALIZED +import org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_TYPE +import org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_CLASS +import org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_DELAY +import org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_SERIALIZED +import org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_TYPE +import org.apache.flink.table.descriptors.SchemaValidator.SCHEMA +import org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_FROM +import org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_NAME +import org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_PROCTIME +import org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_TYPE +import org.apache.flink.table.sinks.TableSink + +class InMemoryTableFactory extends TableSourceFactory[Row] + with TableSinkFactory[Row] with TableFactoryDiscoverable { + override def createTableSink(properties: util.Map[String, String]): TableSink[Row] = { + val params: DescriptorProperties = new DescriptorProperties(true) + params.putProperties(properties) + + // validate + new SchemaValidator(true).validate(params) + + val tableSchema = SchemaValidator.deriveTableSinkSchema(params); + + new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink() + .configure(tableSchema.getColumnNames, tableSchema.getTypes) + } + + override def createTableSource(properties: util.Map[String, String]): TableSource[Row] = { + val params: DescriptorProperties = new DescriptorProperties(true) + params.putProperties(properties) + + // validate + new SchemaValidator(true).validate(params) + + val tableSchema = SchemaValidator.deriveTableSourceSchema(params); + + // proctime + val proctimeAttributeOpt = SchemaValidator.deriveProctimeAttribute(params) + + val (names, types) = tableSchema.getColumnNames.zip(tableSchema.getTypes) + .filter(_._1 != proctimeAttributeOpt.get()).unzip + // rowtime + val rowtimeDescriptors = SchemaValidator.deriveRowtimeAttributes(params) + new MemoryTableSourceSinkUtil.UnsafeMemoryTableSource( + tableSchema, + new RowTypeInfo(types, names), + rowtimeDescriptors, + proctimeAttributeOpt.get(), + 3) + } + + override def requiredContext(): util.Map[String, String] = { + val context: util.Map[String, String] = new util.HashMap[String, String] + context.put(CONNECTOR_TYPE, "memory") + context.put(CONNECTOR_PROPERTY_VERSION, "1") // backwards compatibility + + context + } + + override def supportedProperties(): util.List[String] = { + val properties = new util.ArrayList[String]() + + // schema + properties.add(SCHEMA + ".#." + SCHEMA_TYPE) + properties.add(SCHEMA + ".#." + SCHEMA_NAME) + properties.add(SCHEMA + ".#." + SCHEMA_FROM) + + // time attributes + properties.add(SCHEMA + ".#." + SCHEMA_PROCTIME) + properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_TYPE) + properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_FROM) + properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_CLASS) + properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_SERIALIZED) + properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_TYPE) + properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_CLASS) + properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_SERIALIZED) + properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_DELAY) + + properties + } +} diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala index bbfa43056a29b2..511fedf868de75 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala @@ -50,10 +50,12 @@ object MemoryTableSourceSinkUtil { MemoryTableSourceSinkUtil.tableData.clear() } - class UnsafeMemoryTableSource(tableSchema: TableSchema, - returnType: TypeInformation[Row], - rowtime: String, - val rowCount: Integer) + class UnsafeMemoryTableSource( + tableSchema: TableSchema, + returnType: TypeInformation[Row], + rowtimeAttributeDescriptor: util.List[RowtimeAttributeDescriptor], + proctime: String, + val rowCount: Integer) extends BatchTableSource[Row] with StreamTableSource[Row] with DefinedProctimeAttribute with DefinedRowtimeAttributes { @@ -73,7 +75,7 @@ object MemoryTableSourceSinkUtil { tableData.synchronized { if (tableData.size > 0) { val r = tableData.remove(0) - ctx.collectWithTimestamp(r, r.getField(3).asInstanceOf[Timestamp].getTime) + ctx.collect(r) count -= 1 } } @@ -85,18 +87,10 @@ object MemoryTableSourceSinkUtil { execEnv.addSource(new InMemorySourceFunction, returnType) } - override def getProctimeAttribute: String = "proctime" + override def getProctimeAttribute: String = proctime override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = { - // return a RowtimeAttributeDescriptor if rowtime attribute is defined - if (rowtime != null) { - Collections.singletonList(new RowtimeAttributeDescriptor( - rowtime, - new StreamRecordTimestamp, - new AscendingTimestamps)) - } else { - Collections.EMPTY_LIST.asInstanceOf[util.List[RowtimeAttributeDescriptor]] - } + rowtimeAttributeDescriptor } }