diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java index c23c68664f3ab..90aa855e17dec 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Optional; +import java.util.function.Function; import java.util.stream.Collectors; /** Utilities to {@link TableSchema}. */ @@ -48,13 +49,30 @@ public class TableSchemaUtils { * additional columns. */ public static TableSchema getPhysicalSchema(TableSchema tableSchema) { + return getTableSchema(tableSchema, TableColumn::isPhysical); + } + + /** + * Return {@link TableSchema} which consists of all persisted columns. That means, the virtual + * computed columns and metadata columns are filtered out. + * + *

Its difference from {@link TableSchemaUtils#getPhysicalSchema(TableSchema)} is that it + * includes of all physical columns and metadata columns without virtual keyword. + */ + public static TableSchema getPersistedSchema(TableSchema tableSchema) { + return getTableSchema(tableSchema, TableColumn::isPersisted); + } + + /** Build a {@link TableSchema} with columns filtered by a given columnFilter. */ + private static TableSchema getTableSchema( + TableSchema tableSchema, Function columnFilter) { Preconditions.checkNotNull(tableSchema); TableSchema.Builder builder = new TableSchema.Builder(); tableSchema .getTableColumns() .forEach( tableColumn -> { - if (tableColumn.isPhysical()) { + if (columnFilter.apply(tableColumn)) { builder.field(tableColumn.getName(), tableColumn.getType()); } }); diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala index 20f70c089e211..e7cfb468a601f 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala @@ -284,6 +284,18 @@ class FlinkTypeFactory( buildRelNodeRowType(TableSchemaUtils.getPhysicalSchema(tableSchema)) } + /** + * Creats a struct type with the persisted columns using FlinkTypeFactory + * + * @param tableSchema + * schema to convert to Calcite's specific one + * @return + * a struct type with the input fieldsNames, input fieldTypes. + */ + def buildPersistedRelNodeRowType(tableSchema: TableSchema): RelDataType = { + buildRelNodeRowType(TableSchemaUtils.getPersistedSchema(tableSchema)) + } + /** * Creates a struct type with the input fieldNames and input fieldTypes using FlinkTypeFactory. * diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala index 86e922b568da8..ea53f092469e9 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala @@ -371,10 +371,10 @@ object PreValidateReWriter { table.unwrap(classOf[FlinkPreparingTableBase]) match { case t: CatalogSourceTable => val schema = t.getCatalogTable.getSchema - typeFactory.asInstanceOf[FlinkTypeFactory].buildPhysicalRelNodeRowType(schema) + typeFactory.asInstanceOf[FlinkTypeFactory].buildPersistedRelNodeRowType(schema) case t: LegacyCatalogSourceTable[_] => val schema = t.catalogTable.getSchema - typeFactory.asInstanceOf[FlinkTypeFactory].buildPhysicalRelNodeRowType(schema) + typeFactory.asInstanceOf[FlinkTypeFactory].buildPersistedRelNodeRowType(schema) case _ => table.getRowType } diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/PartialInsertTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/PartialInsertTest.xml index 1ede5abe8397a..4bbe9087b9260 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/PartialInsertTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/PartialInsertTest.xml @@ -536,4 +536,42 @@ Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c, d, ]]> + + + + + + + + + + + + + + + + + + + + + + diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/PartialInsertTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/PartialInsertTest.scala index e9b8aea3207d2..a6f3edcfd5b6c 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/PartialInsertTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/PartialInsertTest.scala @@ -59,6 +59,21 @@ class PartialInsertTest(isBatch: Boolean) extends TableTestBase { |) |""".stripMargin) + util.tableEnv.executeSql(s"""create table metadata_sink ( + | `a` INT, + | `b` STRING, + | `c` STRING, + | `d` STRING, + | `e` DOUBLE, + | `f` BIGINT METADATA, + | `g` INT METADATA VIRTUAL, + | `h` AS `a` + 1 + |) with ( + | 'connector' = 'values', + | 'sink-insert-only' = 'false', + | 'writable-metadata' = 'f:BIGINT, g:INT' + |)""".stripMargin) + @Test def testPartialInsertWithComplexReorder(): Unit = { util.verifyRelPlanInsert( @@ -118,6 +133,36 @@ class PartialInsertTest(isBatch: Boolean) extends TableTestBase { "INSERT INTO partitioned_sink (e,a,g,f,c,d) " + "SELECT e,a,456,123,c,d FROM MyTable ORDER BY a,e,c,d") } + + @Test + def testPartialInsertWithPersistedMetadata(): Unit = { + util.verifyRelPlanInsert( + "INSERT INTO metadata_sink (a,b,c,d,e,f) " + + "SELECT a,b,c,d,e,123 FROM MyTable" + ) + } + + @Test + def testPartialInsertWithVirtualMetaDataColumn(): Unit = { + expectedException.expect(classOf[ValidationException]) + expectedException.expectMessage( + "SQL validation failed. At line 1, column 38: Unknown target column 'g'") + util.verifyRelPlanInsert( + "INSERT INTO metadata_sink (a,b,c,d,e,g) " + + "SELECT a,b,c,d,e,123 FROM MyTable" + ) + } + + @Test + def testPartialInsertWithComputedColumn(): Unit = { + expectedException.expect(classOf[ValidationException]) + expectedException.expectMessage( + "SQL validation failed. At line 1, column 38: Unknown target column 'h'") + util.verifyRelPlanInsert( + "INSERT INTO metadata_sink (a,b,c,d,e,h) " + + "SELECT a,b,c,d,e,123 FROM MyTable" + ) + } } object PartialInsertTest {