From 85b23a48dcbab1b9ac78200a6271924ce23a16e0 Mon Sep 17 00:00:00 2001 From: shuiqiangchen Date: Thu, 9 Feb 2023 10:50:33 +0800 Subject: [PATCH 1/2] [FLINK-30922] Apply persisted columns when doing appendPartitionAndNullsProjects --- .../flink/table/utils/TableSchemaUtils.java | 30 +++++++++++++++ .../planner/calcite/FlinkTypeFactory.scala | 12 ++++++ .../planner/calcite/PreValidateReWriter.scala | 4 +- .../planner/plan/common/PartialInsertTest.xml | 38 +++++++++++++++++++ .../plan/common/PartialInsertTest.scala | 33 ++++++++++++++++ 5 files changed, 115 insertions(+), 2 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java index c23c68664f3ab..44c52cc511a3c 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java @@ -68,6 +68,36 @@ public static TableSchema getPhysicalSchema(TableSchema tableSchema) { return builder.build(); } + /** + * Return {@link TableSchema} which consists of all persisted columns. That means, the virtual + * computed columns and metadata columns are filterd out. + * + *

Readers(or writers) such as {@link TableSource} and {@link TableSink} should use this + * persisted schema to generate {@link TableSource#getProducedDataType()} and {@link + * TableSource#getTableSchema()} rather than using the raw TableSchema which may contains + * additional columns. + */ + public static TableSchema getPersistedSchema(TableSchema tableSchema) { + Preconditions.checkNotNull(tableSchema); + TableSchema.Builder builder = new TableSchema.Builder(); + tableSchema + .getTableColumns() + .forEach( + tableColumn -> { + if (tableColumn.isPersisted()) { + builder.field(tableColumn.getName(), tableColumn.getType()); + } + }); + tableSchema + .getPrimaryKey() + .ifPresent( + uniqueConstraint -> + builder.primaryKey( + uniqueConstraint.getName(), + uniqueConstraint.getColumns().toArray(new String[0]))); + return builder.build(); + } + /** Returns true if there are only physical columns in the given {@link TableSchema}. */ public static boolean containsPhysicalColumnsOnly(TableSchema schema) { Preconditions.checkNotNull(schema); diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala index 20f70c089e211..e7cfb468a601f 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala @@ -284,6 +284,18 @@ class FlinkTypeFactory( buildRelNodeRowType(TableSchemaUtils.getPhysicalSchema(tableSchema)) } + /** + * Creats a struct type with the persisted columns using FlinkTypeFactory + * + * @param tableSchema + * schema to convert to Calcite's specific one + * @return + * a struct type with the input fieldsNames, input fieldTypes. + */ + def buildPersistedRelNodeRowType(tableSchema: TableSchema): RelDataType = { + buildRelNodeRowType(TableSchemaUtils.getPersistedSchema(tableSchema)) + } + /** * Creates a struct type with the input fieldNames and input fieldTypes using FlinkTypeFactory. * diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala index 86e922b568da8..ea53f092469e9 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala @@ -371,10 +371,10 @@ object PreValidateReWriter { table.unwrap(classOf[FlinkPreparingTableBase]) match { case t: CatalogSourceTable => val schema = t.getCatalogTable.getSchema - typeFactory.asInstanceOf[FlinkTypeFactory].buildPhysicalRelNodeRowType(schema) + typeFactory.asInstanceOf[FlinkTypeFactory].buildPersistedRelNodeRowType(schema) case t: LegacyCatalogSourceTable[_] => val schema = t.catalogTable.getSchema - typeFactory.asInstanceOf[FlinkTypeFactory].buildPhysicalRelNodeRowType(schema) + typeFactory.asInstanceOf[FlinkTypeFactory].buildPersistedRelNodeRowType(schema) case _ => table.getRowType } diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/PartialInsertTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/PartialInsertTest.xml index 1ede5abe8397a..4bbe9087b9260 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/PartialInsertTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/PartialInsertTest.xml @@ -536,4 +536,42 @@ Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c, d, ]]> + + + + + + + + + + + + + + + + + + + + + + diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/PartialInsertTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/PartialInsertTest.scala index e9b8aea3207d2..fef12dfc02efd 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/PartialInsertTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/PartialInsertTest.scala @@ -59,6 +59,20 @@ class PartialInsertTest(isBatch: Boolean) extends TableTestBase { |) |""".stripMargin) + util.tableEnv.executeSql(s"""create table metadata_sink ( + | `a` INT, + | `b` STRING, + | `c` STRING, + | `d` STRING, + | `e` DOUBLE, + | `f` BIGINT METADATA, + | `g` INT METADATA VIRTUAL + |) with ( + | 'connector' = 'values', + | 'sink-insert-only' = 'false', + | 'writable-metadata' = 'f:BIGINT, g:INT' + |)""".stripMargin) + @Test def testPartialInsertWithComplexReorder(): Unit = { util.verifyRelPlanInsert( @@ -118,6 +132,25 @@ class PartialInsertTest(isBatch: Boolean) extends TableTestBase { "INSERT INTO partitioned_sink (e,a,g,f,c,d) " + "SELECT e,a,456,123,c,d FROM MyTable ORDER BY a,e,c,d") } + + @Test + def testPartialInsertWithPersistedMetadata(): Unit = { + util.verifyRelPlanInsert( + "INSERT INTO metadata_sink (a,b,c,d,e,f) " + + "SELECT a,b,c,d,e,123 FROM MyTable" + ) + } + + @Test + def testPartialInsertWithVirtualMetaData(): Unit = { + expectedException.expect(classOf[ValidationException]) + expectedException.expectMessage( + "SQL validation failed. At line 1, column 38: Unknown target column 'g'") + util.verifyRelPlanInsert( + "INSERT INTO metadata_sink (a,b,c,d,e,g) " + + "SELECT a,b,c,d,e,123 FROM MyTable" + ) + } } object PartialInsertTest { From d6ddc18a025098031361045cfa08031400352a4f Mon Sep 17 00:00:00 2001 From: shuiqiangchen Date: Mon, 6 Mar 2023 19:14:52 +0800 Subject: [PATCH 2/2] [FLINK-30922][table-planner] Address review comments and add tests --- .../flink/table/utils/TableSchemaUtils.java | 36 +++++++------------ .../plan/common/PartialInsertTest.scala | 16 +++++++-- 2 files changed, 26 insertions(+), 26 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java index 44c52cc511a3c..90aa855e17dec 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Optional; +import java.util.function.Function; import java.util.stream.Collectors; /** Utilities to {@link TableSchema}. */ @@ -48,43 +49,30 @@ public class TableSchemaUtils { * additional columns. */ public static TableSchema getPhysicalSchema(TableSchema tableSchema) { - Preconditions.checkNotNull(tableSchema); - TableSchema.Builder builder = new TableSchema.Builder(); - tableSchema - .getTableColumns() - .forEach( - tableColumn -> { - if (tableColumn.isPhysical()) { - builder.field(tableColumn.getName(), tableColumn.getType()); - } - }); - tableSchema - .getPrimaryKey() - .ifPresent( - uniqueConstraint -> - builder.primaryKey( - uniqueConstraint.getName(), - uniqueConstraint.getColumns().toArray(new String[0]))); - return builder.build(); + return getTableSchema(tableSchema, TableColumn::isPhysical); } /** * Return {@link TableSchema} which consists of all persisted columns. That means, the virtual - * computed columns and metadata columns are filterd out. + * computed columns and metadata columns are filtered out. * - *

Readers(or writers) such as {@link TableSource} and {@link TableSink} should use this - * persisted schema to generate {@link TableSource#getProducedDataType()} and {@link - * TableSource#getTableSchema()} rather than using the raw TableSchema which may contains - * additional columns. + *

Its difference from {@link TableSchemaUtils#getPhysicalSchema(TableSchema)} is that it + * includes of all physical columns and metadata columns without virtual keyword. */ public static TableSchema getPersistedSchema(TableSchema tableSchema) { + return getTableSchema(tableSchema, TableColumn::isPersisted); + } + + /** Build a {@link TableSchema} with columns filtered by a given columnFilter. */ + private static TableSchema getTableSchema( + TableSchema tableSchema, Function columnFilter) { Preconditions.checkNotNull(tableSchema); TableSchema.Builder builder = new TableSchema.Builder(); tableSchema .getTableColumns() .forEach( tableColumn -> { - if (tableColumn.isPersisted()) { + if (columnFilter.apply(tableColumn)) { builder.field(tableColumn.getName(), tableColumn.getType()); } }); diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/PartialInsertTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/PartialInsertTest.scala index fef12dfc02efd..a6f3edcfd5b6c 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/PartialInsertTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/PartialInsertTest.scala @@ -66,7 +66,8 @@ class PartialInsertTest(isBatch: Boolean) extends TableTestBase { | `d` STRING, | `e` DOUBLE, | `f` BIGINT METADATA, - | `g` INT METADATA VIRTUAL + | `g` INT METADATA VIRTUAL, + | `h` AS `a` + 1 |) with ( | 'connector' = 'values', | 'sink-insert-only' = 'false', @@ -142,7 +143,7 @@ class PartialInsertTest(isBatch: Boolean) extends TableTestBase { } @Test - def testPartialInsertWithVirtualMetaData(): Unit = { + def testPartialInsertWithVirtualMetaDataColumn(): Unit = { expectedException.expect(classOf[ValidationException]) expectedException.expectMessage( "SQL validation failed. At line 1, column 38: Unknown target column 'g'") @@ -151,6 +152,17 @@ class PartialInsertTest(isBatch: Boolean) extends TableTestBase { "SELECT a,b,c,d,e,123 FROM MyTable" ) } + + @Test + def testPartialInsertWithComputedColumn(): Unit = { + expectedException.expect(classOf[ValidationException]) + expectedException.expectMessage( + "SQL validation failed. At line 1, column 38: Unknown target column 'h'") + util.verifyRelPlanInsert( + "INSERT INTO metadata_sink (a,b,c,d,e,h) " + + "SELECT a,b,c,d,e,123 FROM MyTable" + ) + } } object PartialInsertTest {