diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeContext.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeContext.java index cc3436916c628..91cf08f2e1396 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeContext.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeContext.java @@ -105,6 +105,9 @@ private ExecNodeContext(@Nullable Integer id, String name, Integer version) { public ExecNodeContext(String value) { this.id = null; String[] split = value.split("_"); + if ("null".equals(split[0]) || "null".equals(split[1])) { + throw new TableException(String.format("Unsupported exec node type: '%s'.", value)); + } this.name = split[0]; this.version = Integer.valueOf(split[1]); } @@ -167,12 +170,19 @@ public ExecNodeContext withId(int id) { */ @JsonValue public String getTypeAsString() { + if (name == null || version == null) { + throw new TableException( + String.format( + "Can not serialize ExecNode with id: %d. Missing type, this is a bug," + + " please file a ticket.", + getId())); + } return name + "_" + version; } @Override public String toString() { - return getId() + "_" + getTypeAsString(); + return getId() + "_" + getName() + "_" + getVersion(); } public static > ExecNodeContext newContext(Class execNodeClass) { @@ -181,7 +191,8 @@ public static > ExecNodeContext newContext(Class execNo if (!ExecNodeMetadataUtil.isUnsupported(execNodeClass)) { throw new IllegalStateException( String.format( - "ExecNode: %s is not listed in the unsupported classes since it is not annotated with: %s.", + "ExecNode: %s is not listed in the unsupported classes and" + + " it is not annotated with: %s.", execNodeClass.getCanonicalName(), ExecNodeMetadata.class.getSimpleName())); } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/UnsupportedNodesInPlanTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/UnsupportedNodesInPlanTest.java new file mode 100644 index 0000000000000..6f1a0adce9de5 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/UnsupportedNodesInPlanTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec; + +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.PlanReference; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.planner.utils.TableTestBase; + +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for deserialising invalid {@link org.apache.flink.table.api.CompiledPlan}. */ +public class UnsupportedNodesInPlanTest extends TableTestBase { + + @Test + public void testInvalidType() { + final TableEnvironment tEnv = + TableEnvironment.create(EnvironmentSettings.inStreamingMode()); + assertThatThrownBy( + () -> + tEnv.loadPlan( + PlanReference.fromResource( + "/jsonplan/testInvalidTypeJsonPlan.json"))) + .hasRootCauseMessage("Unsupported exec node type: 'null_null'."); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeGraphJsonSerializerTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeGraphJsonSerializerTest.java new file mode 100644 index 0000000000000..d7dc963a26439 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeGraphJsonSerializerTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.serde; + +import org.apache.flink.FlinkVersion; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.delegation.PlannerBase; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter; + +import org.junit.jupiter.api.Test; + +import java.util.Collections; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link ExecNodeGraphJsonSerializer}. */ +class ExecNodeGraphJsonSerializerTest { + + @Test + void testSerializingUnsupportedNode() { + final ObjectWriter objectWriter = + JsonSerdeUtil.createObjectWriter(JsonSerdeTestUtil.configuredSerdeContext()); + assertThatThrownBy( + () -> + objectWriter.writeValueAsString( + new ExecNodeGraph( + FlinkVersion.v1_18, + Collections.singletonList(new NoAnnotationNode())))) + .hasMessageContaining( + "Can not serialize ExecNode with id: 10. Missing type, this is a bug, please file a ticket"); + } + + private static class NoAnnotationNode extends ExecNodeBase { + + NoAnnotationNode() { + super( + 10, + ExecNodeContext.newContext(NoAnnotationNode.class), + new Configuration(), + Collections.emptyList(), + DataTypes.ROW(DataTypes.FIELD("a", DataTypes.INT())).getLogicalType(), + ""); + setInputEdges(Collections.emptyList()); + } + + @Override + protected Transformation translateToPlanInternal( + PlannerBase planner, ExecNodeConfig config) { + return null; + } + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtilTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtilTest.java index 9301f79e72516..b35fd5c2c5b57 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtilTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtilTest.java @@ -221,7 +221,7 @@ public void testNewContext() { .hasMessage( "ExecNode: org.apache.flink.table.planner.plan.utils." + "ExecNodeMetadataUtilTest.DummyNodeNoAnnotation is not " - + "listed in the unsupported classes since it is not annotated " + + "listed in the unsupported classes and it is not annotated " + "with: ExecNodeMetadata."); assertThatThrownBy(() -> ExecNodeContext.newContext(DummyNode.class)) diff --git a/flink-table/flink-table-planner/src/test/resources/jsonplan/testInvalidTypeJsonPlan.json b/flink-table/flink-table-planner/src/test/resources/jsonplan/testInvalidTypeJsonPlan.json new file mode 100644 index 0000000000000..957899f9eb12c --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/jsonplan/testInvalidTypeJsonPlan.json @@ -0,0 +1,216 @@ +{ + "flinkVersion": "1.18", + "nodes": [ + { + "id": 1, + "type": "stream-exec-table-source-scan_1", + "scanTableSource": { + "table": { + "identifier": "`default_catalog`.`default_database`.`MyTable`", + "resolvedTable": { + "schema": { + "columns": [ + { + "name": "a", + "dataType": "BIGINT" + }, + { + "name": "b", + "dataType": "INT NOT NULL" + }, + { + "name": "c", + "dataType": "VARCHAR(2147483647)" + }, + { + "name": "d", + "dataType": "TIMESTAMP(3)" + } + ], + "watermarkSpecs": [] + }, + "partitionKeys": [], + "options": { + "connector": "values", + "bounded": "false" + } + } + }, + "abilities": [ + { + "type": "ProjectPushDown", + "projectedFields": [ + [ + 0 + ], + [ + 1 + ] + ], + "producedType": "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL" + }, + { + "type": "ReadingMetadata", + "metadataKeys": [], + "producedType": "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL" + } + ] + }, + "outputType": "ROW<`a` BIGINT, `b` INT NOT NULL>", + "description": "TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b], metadata=[]]], fields=[a, b])", + "inputProperties": [] + }, + { + "id": 2, + "type": "stream-exec-exchange_1", + "inputProperties": [ + { + "requiredDistribution": { + "type": "SINGLETON" + }, + "damBehavior": "PIPELINED", + "priority": 0 + } + ], + "outputType": "ROW<`a` BIGINT, `b` INT NOT NULL>", + "description": "Exchange(distribution=[single])" + }, + { + "id": 3, + "type": "null_null", + "orderBy": { + "fields": [ + { + "index": 1, + "isAscending": true, + "nullIsLast": false + } + ] + }, + "inputProperties": [ + { + "requiredDistribution": { + "type": "UNKNOWN" + }, + "damBehavior": "PIPELINED", + "priority": 0 + } + ], + "outputType": "ROW<`a` BIGINT, `b` INT NOT NULL>", + "description": "Sort(orderBy=[b ASC])" + }, + { + "id": 4, + "type": "stream-exec-calc_1", + "projection": [ + { + "kind": "INPUT_REF", + "inputIndex": 0, + "type": "BIGINT" + }, + { + "kind": "INPUT_REF", + "inputIndex": 0, + "type": "BIGINT" + } + ], + "condition": null, + "inputProperties": [ + { + "requiredDistribution": { + "type": "UNKNOWN" + }, + "damBehavior": "PIPELINED", + "priority": 0 + } + ], + "outputType": "ROW<`a` BIGINT, `a1` BIGINT>", + "description": "Calc(select=[a, a AS a1])" + }, + { + "id": 5, + "type": "stream-exec-sink_1", + "configuration": { + "table.exec.sink.keyed-shuffle": "AUTO", + "table.exec.sink.not-null-enforcer": "ERROR", + "table.exec.sink.rowtime-inserter": "ENABLED", + "table.exec.sink.type-length-enforcer": "IGNORE", + "table.exec.sink.upsert-materialize": "AUTO" + }, + "dynamicTableSink": { + "table": { + "identifier": "`default_catalog`.`default_database`.`MySink`", + "resolvedTable": { + "schema": { + "columns": [ + { + "name": "a", + "dataType": "BIGINT" + }, + { + "name": "b", + "dataType": "BIGINT" + } + ], + "watermarkSpecs": [] + }, + "partitionKeys": [], + "options": { + "sink-insert-only": "false", + "table-sink-class": "DEFAULT", + "connector": "values" + } + } + } + }, + "inputChangelogMode": [ + "INSERT" + ], + "inputProperties": [ + { + "requiredDistribution": { + "type": "UNKNOWN" + }, + "damBehavior": "PIPELINED", + "priority": 0 + } + ], + "outputType": "ROW<`a` BIGINT, `a1` BIGINT>", + "description": "Sink(table=[default_catalog.default_database.MySink], fields=[a, a1])" + } + ], + "edges": [ + { + "source": 1, + "target": 2, + "shuffle": { + "type": "FORWARD" + }, + "shuffleMode": "PIPELINED" + }, + { + "source": 2, + "target": 3, + "shuffle": { + "type": "FORWARD" + }, + "shuffleMode": "PIPELINED" + }, + { + "source": 3, + "target": 4, + "shuffle": { + "type": "FORWARD" + }, + "shuffleMode": "PIPELINED" + }, + { + "source": 4, + "target": 5, + "shuffle": { + "type": "FORWARD" + }, + "shuffleMode": "PIPELINED" + } + ] +} diff --git a/pom.xml b/pom.xml index 2a03f2ee0dce0..f2bc4af25a578 100644 --- a/pom.xml +++ b/pom.xml @@ -1558,6 +1558,7 @@ under the License. flink-table/flink-sql-client/src/test/resources/**/*.out flink-table/flink-table-planner/src/test/resources/**/*.out flink-table/flink-table-planner/src/test/resources/json/*.json + flink-table/flink-table-planner/src/test/resources/jsonplan/*.json flink-yarn/src/test/resources/krb5.keytab flink-end-to-end-tests/test-scripts/test-data/** flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/hadoop/config/keystore.jks