Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;

/** Utilities to {@link TableSchema}. */
Expand All @@ -48,13 +49,30 @@ public class TableSchemaUtils {
* additional columns.
*/
public static TableSchema getPhysicalSchema(TableSchema tableSchema) {
return getTableSchema(tableSchema, TableColumn::isPhysical);
}

/**
* Return {@link TableSchema} which consists of all persisted columns. That means, the virtual
* computed columns and metadata columns are filtered out.
*
* <p>Its difference from {@link TableSchemaUtils#getPhysicalSchema(TableSchema)} is that it
* includes of all physical columns and metadata columns without virtual keyword.
*/
public static TableSchema getPersistedSchema(TableSchema tableSchema) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we'd better extract a common method for this and getPhysicalSchema since only one condition differs

return getTableSchema(tableSchema, TableColumn::isPersisted);
}

/** Build a {@link TableSchema} with columns filtered by a given columnFilter. */
private static TableSchema getTableSchema(
TableSchema tableSchema, Function<TableColumn, Boolean> columnFilter) {
Preconditions.checkNotNull(tableSchema);
TableSchema.Builder builder = new TableSchema.Builder();
tableSchema
.getTableColumns()
.forEach(
tableColumn -> {
if (tableColumn.isPhysical()) {
if (columnFilter.apply(tableColumn)) {
builder.field(tableColumn.getName(), tableColumn.getType());
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,18 @@ class FlinkTypeFactory(
buildRelNodeRowType(TableSchemaUtils.getPhysicalSchema(tableSchema))
}

/**
* Creats a struct type with the persisted columns using FlinkTypeFactory
*
* @param tableSchema
* schema to convert to Calcite's specific one
* @return
* a struct type with the input fieldsNames, input fieldTypes.
*/
def buildPersistedRelNodeRowType(tableSchema: TableSchema): RelDataType = {
buildRelNodeRowType(TableSchemaUtils.getPersistedSchema(tableSchema))
}

/**
* Creates a struct type with the input fieldNames and input fieldTypes using FlinkTypeFactory.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,10 +371,10 @@ object PreValidateReWriter {
table.unwrap(classOf[FlinkPreparingTableBase]) match {
case t: CatalogSourceTable =>
val schema = t.getCatalogTable.getSchema
typeFactory.asInstanceOf[FlinkTypeFactory].buildPhysicalRelNodeRowType(schema)
typeFactory.asInstanceOf[FlinkTypeFactory].buildPersistedRelNodeRowType(schema)
case t: LegacyCatalogSourceTable[_] =>
val schema = t.catalogTable.getSchema
typeFactory.asInstanceOf[FlinkTypeFactory].buildPhysicalRelNodeRowType(schema)
typeFactory.asInstanceOf[FlinkTypeFactory].buildPersistedRelNodeRowType(schema)
case _ =>
table.getRowType
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,4 +536,42 @@ Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c, d,
]]>
</Resource>
</TestCase>
<TestCase name="testPartialInsertWithPersistedMetadata[isBatch: true]">
<Resource name="sql">
<![CDATA[INSERT INTO metadata_sink (a,b,c,d,e,f) SELECT a,b,c,d,e,123 FROM MyTable]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalSink(table=[default_catalog.default_database.metadata_sink], fields=[a, b, c, d, e, f])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[CAST(123:BIGINT):BIGINT])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
Sink(table=[default_catalog.default_database.metadata_sink], fields=[a, b, c, d, e, f])
+- Calc(select=[a, b, c, d, e, CAST(123 AS BIGINT) AS f])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e])
]]>
</Resource>
</TestCase>
<TestCase name="testPartialInsertWithPersistedMetadata[isBatch: false]">
<Resource name="sql">
<![CDATA[INSERT INTO metadata_sink (a,b,c,d,e,f) SELECT a,b,c,d,e,123 FROM MyTable]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalSink(table=[default_catalog.default_database.metadata_sink], fields=[a, b, c, d, e, f])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[CAST(123:BIGINT):BIGINT])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
Sink(table=[default_catalog.default_database.metadata_sink], fields=[a, b, c, d, e, f])
+- Calc(select=[a, b, c, d, e, CAST(123 AS BIGINT) AS f])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e])
]]>
</Resource>
</TestCase>
</Root>
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,21 @@ class PartialInsertTest(isBatch: Boolean) extends TableTestBase {
|)
|""".stripMargin)

util.tableEnv.executeSql(s"""create table metadata_sink (
| `a` INT,
| `b` STRING,
| `c` STRING,
| `d` STRING,
| `e` DOUBLE,
| `f` BIGINT METADATA,
| `g` INT METADATA VIRTUAL,
| `h` AS `a` + 1
|) with (
| 'connector' = 'values',
| 'sink-insert-only' = 'false',
| 'writable-metadata' = 'f:BIGINT, g:INT'
|)""".stripMargin)

@Test
def testPartialInsertWithComplexReorder(): Unit = {
util.verifyRelPlanInsert(
Expand Down Expand Up @@ -118,6 +133,36 @@ class PartialInsertTest(isBatch: Boolean) extends TableTestBase {
"INSERT INTO partitioned_sink (e,a,g,f,c,d) " +
"SELECT e,a,456,123,c,d FROM MyTable ORDER BY a,e,c,d")
}

@Test
def testPartialInsertWithPersistedMetadata(): Unit = {
util.verifyRelPlanInsert(
"INSERT INTO metadata_sink (a,b,c,d,e,f) " +
"SELECT a,b,c,d,e,123 FROM MyTable"
)
}

@Test
def testPartialInsertWithVirtualMetaDataColumn(): Unit = {
expectedException.expect(classOf[ValidationException])
expectedException.expectMessage(
"SQL validation failed. At line 1, column 38: Unknown target column 'g'")
util.verifyRelPlanInsert(
"INSERT INTO metadata_sink (a,b,c,d,e,g) " +
"SELECT a,b,c,d,e,123 FROM MyTable"
)
}

@Test
def testPartialInsertWithComputedColumn(): Unit = {
expectedException.expect(classOf[ValidationException])
expectedException.expectMessage(
"SQL validation failed. At line 1, column 38: Unknown target column 'h'")
util.verifyRelPlanInsert(
"INSERT INTO metadata_sink (a,b,c,d,e,h) " +
"SELECT a,b,c,d,e,123 FROM MyTable"
)
}
}

object PartialInsertTest {
Expand Down