From 394a442776e662b7a5e2ad955a1c54c2cd1ce525 Mon Sep 17 00:00:00 2001 From: Jiatao Tao <245915794@qq.com> Date: Thu, 29 Oct 2020 15:15:07 +0800 Subject: [PATCH] [FLINK-18325] [table-common] fix potential NPE when calling SqlDataTypeSpec#getNullable. --- .../flink/sql/parser/hive/ddl/HiveDDLUtils.java | 3 ++- .../hive/type/ExtendedHiveStructTypeNameSpec.java | 2 +- .../apache/flink/sql/parser/ddl/SqlTableColumn.java | 4 ++-- .../sql/parser/type/ExtendedSqlRowTypeNameSpec.java | 2 +- .../flink/sql/parser/type/SqlMapTypeNameSpec.java | 4 ++-- .../apache/flink/sql/parser/FlinkDDLDataTypeTest.java | 2 +- .../table/planner/operations/MergeTableLikeUtil.java | 11 +++++++---- .../table/planner/utils/OperationConverterUtils.java | 3 ++- .../flink/table/sqlexec/SqlToOperationConverter.java | 5 ++++- 9 files changed, 22 insertions(+), 14 deletions(-) diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/HiveDDLUtils.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/HiveDDLUtils.java index 93e4c742519c41..be834c0653c7fd 100644 --- a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/HiveDDLUtils.java +++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/HiveDDLUtils.java @@ -181,7 +181,8 @@ private static SqlDataTypeSpec convertDataTypes(SqlDataTypeSpec typeSpec) throws SqlTypeNameSpec nameSpec = typeSpec.getTypeNameSpec(); SqlTypeNameSpec convertedNameSpec = convertDataTypes(nameSpec); if (nameSpec != convertedNameSpec) { - typeSpec = new SqlDataTypeSpec(convertedNameSpec, typeSpec.getTimeZone(), typeSpec.getNullable(), + boolean nullable = typeSpec.getNullable() == null ? true : typeSpec.getNullable(); + typeSpec = new SqlDataTypeSpec(convertedNameSpec, typeSpec.getTimeZone(), nullable, typeSpec.getParserPosition()); } return typeSpec; diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/type/ExtendedHiveStructTypeNameSpec.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/type/ExtendedHiveStructTypeNameSpec.java index 2af2ce4edc87f9..d3bdb764b4a74c 100644 --- a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/type/ExtendedHiveStructTypeNameSpec.java +++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/type/ExtendedHiveStructTypeNameSpec.java @@ -55,7 +55,7 @@ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { writer.sep(",", false); p.left.unparse(writer, 0, 0); p.right.unparse(writer, leftPrec, rightPrec); - if (!p.right.getNullable()) { + if (p.right.getNullable() != null && !p.right.getNullable()) { writer.keyword("NOT NULL"); } if (getComments().get(i) != null) { diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlTableColumn.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlTableColumn.java index 3878fa09d27f3d..5a3c4166e93a93 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlTableColumn.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlTableColumn.java @@ -124,7 +124,7 @@ public Optional getConstraint() { @Override protected void unparseColumn(SqlWriter writer, int leftPrec, int rightPrec) { type.unparse(writer, leftPrec, rightPrec); - if (!type.getNullable()) { + if (this.type.getNullable() != null && !this.type.getNullable()) { // Default is nullable. writer.keyword("NOT NULL"); } @@ -179,7 +179,7 @@ public boolean isVirtual() { @Override protected void unparseColumn(SqlWriter writer, int leftPrec, int rightPrec) { type.unparse(writer, leftPrec, rightPrec); - if (!type.getNullable()) { + if (this.type.getNullable() != null && !this.type.getNullable()) { // Default is nullable. writer.keyword("NOT NULL"); } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlRowTypeNameSpec.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlRowTypeNameSpec.java index c589326d456e76..ff8fa082ba6930 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlRowTypeNameSpec.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlRowTypeNameSpec.java @@ -111,7 +111,7 @@ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { writer.sep(",", false); p.left.unparse(writer, 0, 0); p.right.unparse(writer, leftPrec, rightPrec); - if (!p.right.getNullable()) { + if (p.right.getNullable() != null && !p.right.getNullable()) { writer.keyword("NOT NULL"); } if (comments.get(i) != null) { diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapTypeNameSpec.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapTypeNameSpec.java index 6b27335777fb10..12877131a7f483 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapTypeNameSpec.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapTypeNameSpec.java @@ -78,13 +78,13 @@ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { writer.sep(","); // configures the writer keyType.unparse(writer, leftPrec, rightPrec); // Default is nullable. - if (!keyType.getNullable()) { + if (keyType.getNullable() != null && !keyType.getNullable()) { writer.keyword("NOT NULL"); } writer.sep(","); valType.unparse(writer, leftPrec, rightPrec); // Default is nullable. - if (!valType.getNullable()) { + if (valType.getNullable() != null && !valType.getNullable()) { writer.keyword("NOT NULL"); } writer.endList(frame); diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java index 55b80f4af3b135..fdbfe7efca8adf 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java @@ -452,7 +452,7 @@ public void checkUnparsed(String sql, String expectedUnparsed) { // SqlDataTypeSpec does not take care of the nullable attribute unparse, // So we unparse nullable attribute specifically, this unparsing logic should // keep sync with SqlTableColumn. - if (!dataTypeSpec.getNullable()) { + if (dataTypeSpec.getNullable() != null && !dataTypeSpec.getNullable()) { sqlWriter.keyword("NOT NULL"); } assertEquals(expectedUnparsed, sqlWriter.toSqlString().getSql()); diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java index b08688efa50fc5..e69ea64e95edde 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java @@ -43,6 +43,7 @@ import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.sql.SqlDataTypeSpec; import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.validate.SqlValidator; @@ -432,8 +433,9 @@ private void appendDerivedColumns( } } - final RelDataType relType = metadataColumn.getType() - .deriveType(sqlValidator, metadataColumn.getType().getNullable()); + SqlDataTypeSpec type = metadataColumn.getType(); + boolean nullable = type.getNullable() == null ? true : type.getNullable(); + RelDataType relType = type.deriveType(sqlValidator, nullable); column = TableColumn.metadata( name, fromLogicalToDataType(toLogicalType(relType)), @@ -457,8 +459,9 @@ private void collectPhysicalFieldsTypes(List derivedColumns) { "A column named '%s' already exists in the base table.", name)); } - RelDataType relType = regularColumn.getType() - .deriveType(sqlValidator, regularColumn.getType().getNullable()); + SqlDataTypeSpec type = regularColumn.getType(); + boolean nullable = type.getNullable() == null ? true : type.getNullable(); + RelDataType relType = type.deriveType(sqlValidator, nullable); // add field name and field type to physical field list physicalFieldNamesToTypes.put(name, relType); } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java index d09ca8e5d976c6..cd21d771aff5b3 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java @@ -173,8 +173,9 @@ private static TableColumn toTableColumn(SqlTableColumn tableColumn, SqlValidato SqlRegularColumn regularColumn = (SqlRegularColumn) tableColumn; String name = regularColumn.getName().getSimple(); SqlDataTypeSpec typeSpec = regularColumn.getType(); + boolean nullable = typeSpec.getNullable() == null ? true : typeSpec.getNullable(); LogicalType logicalType = FlinkTypeFactory.toLogicalType( - typeSpec.deriveType(sqlValidator, typeSpec.getNullable())); + typeSpec.deriveType(sqlValidator, nullable)); DataType dataType = TypeConversions.fromLogicalToDataType(logicalType); return TableColumn.physical(name, dataType); } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java index 1e9056f1119684..53b2cae61f8a44 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java @@ -97,6 +97,7 @@ import org.apache.calcite.rel.RelRoot; import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.sql.SqlDataTypeSpec; import org.apache.calcite.sql.SqlDialect; import org.apache.calcite.sql.SqlExplain; import org.apache.calcite.sql.SqlExplainFormat; @@ -621,10 +622,12 @@ private TableSchema createTableSchema(SqlCreateTable sqlCreateTable) { .map(SqlRegularColumn.class::cast) .collect(Collectors.toList()); for (SqlRegularColumn regularColumn : physicalColumns) { + SqlDataTypeSpec type = regularColumn.getType(); + boolean nullable = type.getNullable() == null ? true : type.getNullable(); final RelDataType relType = regularColumn.getType() .deriveType( flinkPlanner.getOrCreateSqlValidator(), - regularColumn.getType().getNullable()); + nullable); builder.field(regularColumn.getName().getSimple(), TypeConversions.fromLegacyInfoToDataType(FlinkTypeFactory.toTypeInfo(relType))); }