Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
Aitozi committed Mar 9, 2023
1 parent a6d8d40 commit 5d00968
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ void testGenericTableSchema() throws Exception {
try {
catalog.createTable(tablePath, new ResolvedCatalogTable(origin, resolvedSchema), false);

assertThat(catalog.getTable(tablePath).getUnresolvedSchema()).isEqualTo(schema);
assertThat(catalog.getTable(tablePath).getUnresolvedSchema())
.isEqualTo(fromResolvedSchema(resolvedSchema));
} finally {
catalog.dropTable(tablePath, true);
}
Expand Down Expand Up @@ -147,7 +148,7 @@ void testTableSchemaCompatibility() throws Exception {
null);

assertThat(catalogBaseTable.getUnresolvedSchema())
.isEqualTo(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build());
.isEqualTo(fromResolvedSchema(resolvedSchema));

// table with character types
tablePath = new ObjectPath(db1, "generic2");
Expand Down Expand Up @@ -200,7 +201,7 @@ void testTableSchemaCompatibility() throws Exception {
new ArrayList<>(),
null);
assertThat(catalogBaseTable.getUnresolvedSchema())
.isEqualTo(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build());
.isEqualTo(fromResolvedSchema(resolvedSchema));

// table with date/time types
tablePath = new ObjectPath(db1, "generic3");
Expand Down Expand Up @@ -251,7 +252,7 @@ void testTableSchemaCompatibility() throws Exception {
DataTypes.INT(), () -> "ts"))),
null);
assertThat(catalogBaseTable.getUnresolvedSchema())
.isEqualTo(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build());
.isEqualTo(fromResolvedSchema(resolvedSchema));

// table with complex/misc types
tablePath = new ObjectPath(db1, "generic4");
Expand Down Expand Up @@ -321,7 +322,7 @@ void testTableSchemaCompatibility() throws Exception {
() -> "`ts` - INTERVAL '5' SECOND"))),
null);
assertThat(catalogBaseTable.getUnresolvedSchema())
.isEqualTo(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build());
.isEqualTo(fromResolvedSchema(resolvedSchema));
} finally {
catalog.dropDatabase(db1, true, true);
}
Expand Down Expand Up @@ -373,7 +374,7 @@ void testGenericTableWithoutConnectorProp() throws Exception {
catalog.createTable(path1, catalogTable, false);
CatalogTable retrievedTable = (CatalogTable) catalog.getTable(path1);
assertThat(retrievedTable.getUnresolvedSchema())
.isEqualTo(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build());
.isEqualTo(fromResolvedSchema(resolvedSchema));
assertThat(retrievedTable.getOptions()).isEmpty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@

package org.apache.flink.table.catalog.hive;

import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogFunctionImpl;
import org.apache.flink.table.catalog.CatalogTestBase;
import org.apache.flink.table.catalog.FunctionLanguage;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.expressions.SqlCallExpression;

import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -60,4 +65,50 @@ public void testCreateFunctionCaseInsensitive() throws Exception {
protected CatalogFunction createPythonFunction() {
return new CatalogFunctionImpl("test.func1", FunctionLanguage.PYTHON);
}

/**
* Convert the {@link ResolvedSchema} to a {@link Schema}, and also convert the {@link
* ResolvedExpression} back to its Unresolved state. This will enable direct comparison of the
* schema.
*/
public static Schema fromResolvedSchema(ResolvedSchema resolvedSchema) {
Schema tmp = Schema.newBuilder().fromResolvedSchema(resolvedSchema).build();
Schema.Builder builder = Schema.newBuilder();
if (!tmp.getWatermarkSpecs().isEmpty()) {
Schema.UnresolvedWatermarkSpec spec = tmp.getWatermarkSpecs().get(0);
if (spec.getWatermarkExpression() instanceof ResolvedExpression) {
String expression =
((ResolvedExpression) spec.getWatermarkExpression()).asSerializableString();
builder.watermark(spec.getColumnName(), new SqlCallExpression(expression));
} else {
builder.watermark(spec.getColumnName(), spec.getWatermarkExpression());
}
}

for (Schema.UnresolvedColumn column : tmp.getColumns()) {
if (column instanceof Schema.UnresolvedComputedColumn) {
Schema.UnresolvedComputedColumn computedColumn =
(Schema.UnresolvedComputedColumn) column;
Expression expression = computedColumn.getExpression();
if (computedColumn.getExpression() instanceof ResolvedExpression) {
expression =
new SqlCallExpression(
((ResolvedExpression) computedColumn.getExpression())
.asSerializableString());
}
builder.columnByExpression(column.getName(), expression);
} else if (column instanceof Schema.UnresolvedPhysicalColumn) {
Schema.UnresolvedPhysicalColumn pc = (Schema.UnresolvedPhysicalColumn) column;
builder.column(column.getName(), pc.getDataType());
} else if (column instanceof Schema.UnresolvedMetadataColumn) {
Schema.UnresolvedMetadataColumn mc = (Schema.UnresolvedMetadataColumn) column;
builder.columnByMetadata(
column.getName(), mc.getDataType(), mc.getMetadataKey(), mc.isVirtual());
}
}

tmp.getPrimaryKey()
.map(u -> builder.primaryKeyNamed(u.getConstraintName(), u.getColumnNames()));
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -611,9 +611,7 @@ private void addResolvedColumns(List<Column> columns) {
} else if (c instanceof ComputedColumn) {
final ComputedColumn computedColumn = (ComputedColumn) c;
columnByExpression(
computedColumn.getName(),
new SqlCallExpression(
computedColumn.getExpression().asSerializableString()));
computedColumn.getName(), computedColumn.getExpression());
} else if (c instanceof MetadataColumn) {
final MetadataColumn metadataColumn = (MetadataColumn) c;
columnByMetadata(
Expand All @@ -630,10 +628,7 @@ private void addResolvedWatermarkSpec(List<WatermarkSpec> specs) {
s ->
watermarkSpecs.add(
new UnresolvedWatermarkSpec(
s.getRowtimeAttribute(),
new SqlCallExpression(
s.getWatermarkExpression()
.asSerializableString()))));
s.getRowtimeAttribute(), s.getWatermarkExpression())));
}

private void addResolvedConstraint(UniqueConstraint constraint) {
Expand Down

0 comments on commit 5d00968

Please sign in to comment.