From 6f69a241ae555398580857e8b1aca5522a284ac4 Mon Sep 17 00:00:00 2001 From: Timo Walther Date: Tue, 24 Jul 2018 11:40:36 +0200 Subject: [PATCH] [FLINK-9934] [table] Fix invalid field mapping by Kafka table source factory --- .../KafkaTableSourceSinkFactoryBase.java | 14 ++++++++----- .../KafkaTableSourceSinkFactoryTestBase.java | 5 +++++ .../table/descriptors/SchemaValidator.scala | 21 ++++++++++++------- .../descriptors/SchemaValidatorTest.scala | 4 ++-- 4 files changed, 30 insertions(+), 14 deletions(-) diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java index 3307994bb0545..27b2e67ce0bfe 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java @@ -132,18 +132,20 @@ public List supportedProperties() { public StreamTableSource createStreamTableSource(Map properties) { final DescriptorProperties descriptorProperties = getValidatedProperties(properties); - final TableSchema schema = descriptorProperties.getTableSchema(SCHEMA()); final String topic = descriptorProperties.getString(CONNECTOR_TOPIC); + final DeserializationSchema deserializationSchema = getDeserializationSchema(properties); final StartupOptions startupOptions = getStartupOptions(descriptorProperties, topic); return createKafkaTableSource( - schema, + descriptorProperties.getTableSchema(SCHEMA()), SchemaValidator.deriveProctimeAttribute(descriptorProperties), SchemaValidator.deriveRowtimeAttributes(descriptorProperties), - SchemaValidator.deriveFieldMapping(descriptorProperties, Optional.of(schema)), + SchemaValidator.deriveFieldMapping( + descriptorProperties, + Optional.of(deserializationSchema.getProducedType())), topic, getKafkaProperties(descriptorProperties), - getDeserializationSchema(properties), + deserializationSchema, startupOptions.startupMode, startupOptions.specificOffsets); } @@ -318,7 +320,9 @@ private FlinkKafkaPartitioner getFlinkKafkaPartitioner() { } private boolean checkForCustomFieldMapping(DescriptorProperties descriptorProperties, TableSchema schema) { - final Map fieldMapping = SchemaValidator.deriveFieldMapping(descriptorProperties, Optional.of(schema)); + final Map fieldMapping = SchemaValidator.deriveFieldMapping( + descriptorProperties, + Optional.of(schema.toRowType())); // until FLINK-9870 is fixed we assume that the table schema is the output type return fieldMapping.size() != schema.getColumnNames().length || !fieldMapping.entrySet().stream().allMatch(mapping -> mapping.getKey().equals(mapping.getValue())); } diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java index d8e8f7d37c3c7..504bed16a74db 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java @@ -50,6 +50,7 @@ import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.sources.RowtimeAttributeDescriptor; import org.apache.flink.table.sources.TableSource; +import org.apache.flink.table.sources.TableSourceUtil; import org.apache.flink.table.sources.tsextractors.ExistingField; import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps; import org.apache.flink.types.Row; @@ -115,7 +116,9 @@ public void testTableSource() { final Map fieldMapping = new HashMap<>(); fieldMapping.put(FRUIT_NAME, NAME); + fieldMapping.put(NAME, NAME); fieldMapping.put(COUNT, COUNT); + fieldMapping.put(TIME, TIME); final Map specificOffsets = new HashMap<>(); specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_0), OFFSET_0); @@ -141,6 +144,8 @@ public void testTableSource() { StartupMode.SPECIFIC_OFFSETS, specificOffsets); + TableSourceUtil.validateTableSource(expected); + // construct table source using descriptors and table source factory final TestTableDescriptor testDesc = new TestTableDescriptor( diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala index af2baba25eac4..f6cbb2b423b65 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala @@ -21,6 +21,8 @@ package org.apache.flink.table.descriptors import java.util import java.util.Optional +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.table.api.{TableException, TableSchema, ValidationException} import org.apache.flink.table.descriptors.DescriptorProperties.{toJava, toScala} import org.apache.flink.table.descriptors.RowtimeValidator._ @@ -222,23 +224,28 @@ object SchemaValidator { /** * Finds a table source field mapping. + * + * @param properties The properties describing a schema. + * @param inputType The input type that a connector and/or format produces. This parameter + * can be used to resolve a rowtime field against an input field. */ def deriveFieldMapping( properties: DescriptorProperties, - sourceSchema: Optional[TableSchema]) + inputType: Optional[TypeInformation[_]]) : util.Map[String, String] = { val mapping = mutable.Map[String, String]() val schema = properties.getTableSchema(SCHEMA) - // add all source fields first because rowtime might reference one of them - toScala(sourceSchema).map(_.getColumnNames).foreach { names => - names.foreach { name => - mapping.put(name, name) - } + val columnNames = toScala(inputType) match { + case Some(composite: CompositeType[_]) => composite.getFieldNames.toSeq + case _ => Seq[String]() } + // add all source fields first because rowtime might reference one of them + columnNames.foreach(name => mapping.put(name, name)) + // add all schema fields first for implicit mappings schema.getColumnNames.foreach { name => mapping.put(name, name) @@ -266,7 +273,7 @@ object SchemaValidator { mapping.remove(name) } // check for invalid fields - else if (toScala(sourceSchema).forall(s => !s.getColumnNames.contains(name))) { + else if (!columnNames.contains(name)) { throw new ValidationException(s"Could not map the schema field '$name' to a field " + s"from source. Please specify the source field from which it can be derived.") } diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala index c55805705868a..a2eec4c8e6b99 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala @@ -67,7 +67,7 @@ class SchemaValidatorTest { "myField" -> "myField").asJava assertEquals( expectedMapping, - SchemaValidator.deriveFieldMapping(props, Optional.of(inputSchema))) + SchemaValidator.deriveFieldMapping(props, Optional.of(inputSchema.toRowType))) // test field format val formatSchema = SchemaValidator.deriveFormatFields(props) @@ -148,7 +148,7 @@ class SchemaValidatorTest { "myTime" -> "myTime").asJava assertEquals( expectedMapping, - SchemaValidator.deriveFieldMapping(props, Optional.of(inputSchema))) + SchemaValidator.deriveFieldMapping(props, Optional.of(inputSchema.toRowType))) // test field format val formatSchema = SchemaValidator.deriveFormatFields(props)