Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -430,15 +430,15 @@ public void testTableSchema() throws Exception {
assertEquals(
tableResult.getTableSchema(),
TableSchema.builder().fields(
new String[] { "name", "type", "null", "key", "computed column", "watermark" },
new String[] { "name", "type", "null", "key", "computed column", "watermark" ,"comment"},
new DataType[] { DataTypes.STRING(), DataTypes.STRING(), DataTypes.BOOLEAN(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING() }
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}
).build()
);
List<Row> schemaData = Arrays.asList(
Row.of("IntegerField2", "INT", true, null, null, null),
Row.of("StringField2", "STRING", true, null, null, null),
Row.of("TimestampField2", "TIMESTAMP(3)", true, null, null, null)
Row.of("IntegerField2", "INT", true, null, null, null, null),
Row.of("StringField2", "STRING", true, null, null, null, null),
Row.of("TimestampField2", "TIMESTAMP(3)", true, null, null, null, null)
);
assertEquals(schemaData, CollectionUtil.iteratorToList(tableResult.collect()));
executor.closeSession(sessionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public TableSchema resolve(TableSchema tableSchema) {

String[] fieldNames = tableSchema.getFieldNames();
DataType[] fieldTypes = tableSchema.getFieldDataTypes();
String[] fieldComments = tableSchema.getFieldComments();

TableSchema.Builder builder = TableSchema.builder();
for (int i = 0; i < tableSchema.getFieldCount(); ++i) {
Expand All @@ -98,9 +99,9 @@ public TableSchema resolve(TableSchema tableSchema) {
}

if (tableColumn.isGenerated()) {
builder.field(fieldNames[i], fieldType, tableColumn.getExpr().get());
builder.fieldWithComment(fieldNames[i], fieldType, tableColumn.getExpr().get(), fieldComments[i]);
} else {
builder.field(fieldNames[i], fieldType);
builder.fieldWithComment(fieldNames[i], fieldType, fieldComments[i]);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1118,12 +1118,13 @@ private TableResult buildDescribeResult(TableSchema schema) {
logicalType.isNullable(),
fieldToPrimaryKey.getOrDefault(c.getName(), null),
c.getExpr().orElse(null),
fieldToWatermark.getOrDefault(c.getName(), null)};
fieldToWatermark.getOrDefault(c.getName(), null),
c.getComment().orElse(null)};
}).toArray(Object[][]::new);

return buildResult(
new String[]{"name", "type", "null", "key", "computed column", "watermark"},
new DataType[]{DataTypes.STRING(), DataTypes.STRING(), DataTypes.BOOLEAN(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()},
new String[]{"name", "type", "null", "key", "computed column", "watermark", "comment"},
new DataType[]{DataTypes.STRING(), DataTypes.STRING(), DataTypes.BOOLEAN(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()},
rows);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public class TableColumn {
private final DataType type;
@Nullable
private final String expr;
@Nullable
private final String comment;

//~ Constructors -----------------------------------------------------------

Expand All @@ -49,14 +51,17 @@ public class TableColumn {
* @param name Column name
* @param type Column data type
* @param expr Column computation expression if it is a computed column
* @param comment Column comment
*/
private TableColumn(
String name,
DataType type,
@Nullable String expr) {
String name,
DataType type,
@Nullable String expr,
@Nullable String comment) {
this.name = name;
this.type = type;
this.expr = expr;
this.comment = comment;
}

//~ Methods ----------------------------------------------------------------
Expand All @@ -67,7 +72,16 @@ private TableColumn(
public static TableColumn of(String name, DataType type) {
Preconditions.checkNotNull(name, "Column name can not be null!");
Preconditions.checkNotNull(type, "Column type can not be null!");
return new TableColumn(name, type, null);
return new TableColumn(name, type, null, null);
}

/**
* Creates a table column from given name , data type and comment.
*/
public static TableColumn ofWithComment(String name, DataType type, String comment) {
Preconditions.checkNotNull(name, "Column name can not be null!");
Preconditions.checkNotNull(type, "Column type can not be null!");
return new TableColumn(name, type, null, comment);
}

/**
Expand All @@ -80,7 +94,20 @@ public static TableColumn of(String name, DataType type, String expression) {
Preconditions.checkNotNull(name, "Column name can not be null!");
Preconditions.checkNotNull(type, "Column type can not be null!");
Preconditions.checkNotNull(expression, "Column expression can not be null!");
return new TableColumn(name, type, expression);
return new TableColumn(name, type, expression, null);
}

/**
* Creates a table column from given name , data type , computation expression and comment.
*
* @param name Name of the column
* @param expression SQL-style expression
*/
public static TableColumn ofWithComment(String name, DataType type, String expression, String comment) {
Preconditions.checkNotNull(name, "Column name can not be null!");
Preconditions.checkNotNull(type, "Column type can not be null!");
Preconditions.checkNotNull(expression, "Column expression can not be null!");
return new TableColumn(name, type, expression, comment);
}

@Override
Expand All @@ -94,12 +121,13 @@ public boolean equals(Object o) {
TableColumn that = (TableColumn) o;
return Objects.equals(this.name, that.name)
&& Objects.equals(this.type, that.type)
&& Objects.equals(this.expr, that.expr);
&& Objects.equals(this.expr, that.expr)
&& Objects.equals(this.comment, that.comment);
}

@Override
public int hashCode() {
return Objects.hash(this.name, this.type, this.expr);
return Objects.hash(this.name, this.type, this.expr, this.comment);
}

//~ Getter/Setter ----------------------------------------------------------
Expand All @@ -120,6 +148,12 @@ public Optional<String> getExpr() {
return Optional.ofNullable(this.expr);
}

/** Returns comment of this column. Or empty if this column
* has no comment. */
public Optional<String> getComment() {
return Optional.ofNullable(this.comment);
}

/**
* Returns if this column is a computed column that is generated from an expression.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,15 @@ public Optional<String> getFieldName(int fieldIndex) {
return Optional.of(this.columns.get(fieldIndex).getName());
}

/**
* Returns all field comments as an array.
*/
public String[] getFieldComments() {
return this.columns.stream()
.map((f) -> f.getComment().isPresent() ? f.getComment().get() : null)
.toArray(String[]::new);
}

/**
* Returns the {@link TableColumn} instance for the given field index.
*
Expand Down Expand Up @@ -298,6 +307,9 @@ public String toString() {
if (column.getExpr().isPresent()) {
sb.append(" AS ").append(column.getExpr().get());
}
if (column.getComment().isPresent()) {
sb.append(" COMMENT ").append(column.getComment().get());
}
sb.append('\n');
}
if (!watermarkSpecs.isEmpty()) {
Expand Down Expand Up @@ -529,6 +541,19 @@ public Builder field(String name, DataType dataType) {
return this;
}

/**
* Add a field with name , data type and comment.
*
* <p>The call order of this method determines the order of fields in the schema.
*/
public Builder fieldWithComment(String name, DataType dataType, String comment) {
Preconditions.checkNotNull(name);
Preconditions.checkNotNull(dataType);
columns.add(TableColumn.ofWithComment(name, dataType, comment));
return this;
}


/**
* Add a computed field which is generated by the given expression.
* This also defines the field name and the data type.
Expand Down Expand Up @@ -561,6 +586,39 @@ public Builder field(String name, DataType dataType, String expression) {
return this;
}

/**
* Add a computed field which is generated by the given expression.
* This also defines the field name and the data type.
*
* <p>The call order of this method determines the order of fields in the schema.
*
* @param name Field name
* @param dataType Field data type
* @param expression Computed column expression, it should be a SQL-style expression whose
* identifiers should be all quoted and expanded.
* @param comment Field comment
*
* It should be expanded because this expression may be persisted
* then deserialized from the catalog, an expanded identifier would
* avoid the ambiguity if there are same name UDF referenced from
* different paths. For example, if there is a UDF named "my_udf" from
* path "my_catalog.my_database", you could pass in an expression like
* "`my_catalog`.`my_database`.`my_udf`(`f0`) + 1";
*
* It should be quoted because user could use a reserved keyword as the
* identifier, and we have no idea if it is quoted when deserialize from
* the catalog, so we force to use quoted identifier here. But framework
* will not check whether it is qualified and quoted or not.
*
*/
public Builder fieldWithComment(String name, DataType dataType, String expression, String comment) {
Preconditions.checkNotNull(name);
Preconditions.checkNotNull(dataType);
Preconditions.checkNotNull(expression);
columns.add(TableColumn.ofWithComment(name, dataType, expression, comment));
return this;
}

/**
* Adds a {@link TableColumn} to this builder.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ public class DescriptorProperties {

public static final String EXPR = "expr";

public static final String COMMENT = "comment";

public static final String PARTITION_KEYS = "partition.keys";

public static final String WATERMARK = "watermark";
Expand Down Expand Up @@ -214,19 +216,23 @@ public void putTableSchema(String key, TableSchema schema) {
final String[] fieldExpressions = schema.getTableColumns().stream()
.map(column -> column.getExpr().orElse(null))
.toArray(String[]::new);
final String[] fieldComments = schema.getTableColumns().stream()
.map(column -> column.getComment().orElse(null))
.toArray(String[]::new);

final List<List<String>> values = new ArrayList<>();
for (int i = 0; i < schema.getFieldCount(); i++) {
values.add(
Arrays.asList(
fieldNames[i],
fieldTypes[i].getLogicalType().asSerializableString(),
fieldExpressions[i]));
fieldExpressions[i],
fieldComments[i]));
}

putIndexedOptionalProperties(
key,
Arrays.asList(NAME, DATA_TYPE, EXPR),
Arrays.asList(NAME, DATA_TYPE, EXPR, COMMENT),
values);

if (!schema.getWatermarkSpecs().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,15 +398,18 @@ private void appendDerivedColumns(
call.operand(0),
physicalFieldNamesToTypes);
final RelDataType validatedType = sqlValidator.getValidatedNodeType(validatedExpr);
column = TableColumn.of(
String comment = call.getOperandList().size() > 2 ? call.getOperandList().get(2).toString() : null;
column = TableColumn.ofWithComment(
fieldName,
fromLogicalToDataType(toLogicalType(validatedType)),
escapeExpressions.apply(validatedExpr));
escapeExpressions.apply(validatedExpr), comment);
computedFieldNamesToTypes.put(fieldName, validatedType);
} else {
String name = ((SqlTableColumn) derivedColumn).getName().getSimple();
SqlTableColumn sqlTableColumn = ((SqlTableColumn) derivedColumn);
String name = sqlTableColumn.getName().getSimple();
String comment = sqlTableColumn.getComment().isPresent() ? sqlTableColumn.getComment().get().getValue().toString() : null;
LogicalType logicalType = FlinkTypeFactory.toLogicalType(physicalFieldNamesToTypes.get(name));
column = TableColumn.of(name, TypeConversions.fromLogicalToDataType(logicalType));
column = TableColumn.ofWithComment(name, TypeConversions.fromLogicalToDataType(logicalType), comment);
}
columns.put(column.getName(), column);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -637,8 +637,9 @@ private TableSchema createTableSchema(SqlCreateTable sqlCreateTable) {
.deriveType(
flinkPlanner.getOrCreateSqlValidator(),
column.getType().getNullable());
builder.field(column.getName().getSimple(),
TypeConversions.fromLegacyInfoToDataType(FlinkTypeFactory.toTypeInfo(relType)));
builder.fieldWithComment(column.getName().getSimple(),
TypeConversions.fromLegacyInfoToDataType(FlinkTypeFactory.toTypeInfo(relType)),
column.getComment().isPresent() ? column.getComment().get().getValue().toString() : null);
physicalSchema = builder.build();
}
assert physicalSchema != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -822,7 +822,7 @@ abstract class TableEnvImpl(
val columns = schema.getPrimaryKey.get.getColumns.asScala
columns.foreach(c => fieldToPrimaryKey.put(c, s"PRI(${columns.mkString(", ")})"))
}
val data = Array.ofDim[Object](schema.getFieldCount, 6)
val data = Array.ofDim[Object](schema.getFieldCount, 7)
schema.getTableColumns.asScala.zipWithIndex.foreach {
case (c, i) => {
val logicalType = c.getType.getLogicalType
Expand All @@ -832,12 +832,13 @@ abstract class TableEnvImpl(
data(i)(3) = fieldToPrimaryKey.getOrDefault(c.getName, null)
data(i)(4) = c.getExpr.orElse(null)
data(i)(5) = fieldToWatermark.getOrDefault(c.getName, null)
data(i)(6) = c.getComment.orElse(null)
}
}
buildResult(
Array("name", "type", "null", "key", "computed column", "watermark"),
Array("name", "type", "null", "key", "computed column", "watermark", "comment"),
Array(DataTypes.STRING, DataTypes.STRING, DataTypes.BOOLEAN, DataTypes.STRING,
DataTypes.STRING, DataTypes.STRING),
DataTypes.STRING, DataTypes.STRING, DataTypes.STRING),
data)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,9 +567,37 @@ class BatchTableEnvironmentTest extends TableTestBase {
assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind)
checkData(
java.util.Arrays.asList(
Row.of("a", "BIGINT", Boolean.box(true), null, null, null),
Row.of("b", "INT", Boolean.box(true), null, null, null),
Row.of("c", "STRING", Boolean.box(true), null, null, null)
Row.of("a", "BIGINT", Boolean.box(true), null, null, null, null),
Row.of("b", "INT", Boolean.box(true), null, null, null, null),
Row.of("c", "STRING", Boolean.box(true), null, null, null, null)
).iterator(),
tableResult2.collect())
}

@Test
def testExecuteSqlWithDescribeForComment(): Unit = {
val testUtil = batchTestUtil()
val createTableStmt =
"""
|CREATE TABLE tbl1 (
| a bigint COMMENT 'c1',
| b int COMMENT 'c2',
| c varchar COMMENT 'c3'
|) with (
| 'connector' = 'COLLECTION',
| 'is-bounded' = 'false'
|)
""".stripMargin
val tableResult1 = testUtil.tableEnv.executeSql(createTableStmt)
assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)

val tableResult2 = testUtil.tableEnv.executeSql("DESCRIBE tbl1")
assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind)
checkData(
java.util.Arrays.asList(
Row.of("a", "BIGINT", Boolean.box(true), null, null, null, "\'c1\'"),
Row.of("b", "INT", Boolean.box(true), null, null, null, "\'c2\'"),
Row.of("c", "STRING", Boolean.box(true), null, null, null, "\'c3\'")
).iterator(),
tableResult2.collect())
}
Expand Down