From 659c9d77f951dbc5b9006a737e3250da7b7ad5ca Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Thu, 14 May 2026 18:49:35 +0530 Subject: [PATCH] [FLINK-38262][table] add CreateConnectionOperation and converter --- .../ddl/CreateConnectionOperation.java | 93 ++++++++++++++ .../SqlCreateConnectionConverter.java | 55 ++++++++ .../converters/SqlNodeConverters.java | 5 + .../SqlConnectionOperationConverterTest.java | 121 ++++++++++++++++++ 4 files changed, 274 insertions(+) create mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateConnectionOperation.java create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateConnectionConverter.java create mode 100644 flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlConnectionOperationConverterTest.java diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateConnectionOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateConnectionOperation.java new file mode 100644 index 0000000000000..725c597ba26f6 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateConnectionOperation.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.ddl; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.internal.TableResultImpl; +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.catalog.CatalogConnection; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.OperationUtils; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; + +/** Operation to describe a CREATE CONNECTION statement. */ +@Internal +public class CreateConnectionOperation implements CreateOperation { + + private final ObjectIdentifier connectionIdentifier; + private final CatalogConnection catalogConnection; + private final boolean ignoreIfExists; + private final boolean isTemporary; + + public CreateConnectionOperation( + ObjectIdentifier connectionIdentifier, + CatalogConnection catalogConnection, + boolean ignoreIfExists, + boolean isTemporary) { + this.connectionIdentifier = connectionIdentifier; + this.catalogConnection = catalogConnection; + this.ignoreIfExists = ignoreIfExists; + this.isTemporary = isTemporary; + } + + public ObjectIdentifier getConnectionIdentifier() { + return connectionIdentifier; + } + + public CatalogConnection getCatalogConnection() { + return catalogConnection; + } + + public boolean isIgnoreIfExists() { + return ignoreIfExists; + } + + public boolean isTemporary() { + return isTemporary; + } + + @Override + public String asSummaryString() { + Map params = new LinkedHashMap<>(); + params.put("catalogConnection", catalogConnection.getOptions()); + params.put("identifier", connectionIdentifier); + params.put("ignoreIfExists", ignoreIfExists); + params.put("isTemporary", isTemporary); + + return OperationUtils.formatWithChildren( + "CREATE CONNECTION", params, Collections.emptyList(), Operation::asSummaryString); + } + + @Override + public TableResultInternal execute(Context ctx) { + if (isTemporary) { + ctx.getCatalogManager() + .createTemporaryConnection( + catalogConnection, connectionIdentifier, ignoreIfExists); + } else { + ctx.getCatalogManager() + .createConnection(catalogConnection, connectionIdentifier, ignoreIfExists); + } + return TableResultImpl.TABLE_RESULT_OK; + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateConnectionConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateConnectionConverter.java new file mode 100644 index 0000000000000..5dc255072113e --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateConnectionConverter.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations.converters; + +import org.apache.flink.sql.parser.ddl.connection.SqlCreateConnection; +import org.apache.flink.table.catalog.CatalogConnection; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.ddl.CreateConnectionOperation; + +import org.apache.calcite.sql.SqlCharStringLiteral; + +import java.util.Map; + +/** A converter for {@link SqlCreateConnection}. */ +public class SqlCreateConnectionConverter implements SqlNodeConverter { + + @Override + public Operation convertSqlNode( + SqlCreateConnection sqlCreateConnection, ConvertContext context) { + UnresolvedIdentifier unresolvedIdentifier = + UnresolvedIdentifier.of(sqlCreateConnection.getFullName()); + ObjectIdentifier identifier = + context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier); + + Map options = sqlCreateConnection.getProperties(); + SqlCharStringLiteral commentLiteral = sqlCreateConnection.getComment(); + String comment = commentLiteral == null ? null : commentLiteral.getValueAs(String.class); + + CatalogConnection catalogConnection = CatalogConnection.of(options, comment); + + return new CreateConnectionOperation( + identifier, + catalogConnection, + sqlCreateConnection.isIfNotExists(), + sqlCreateConnection.isTemporary()); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java index 256dd03ccc1f7..33bb3dcd7c44e 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java @@ -88,6 +88,7 @@ public class SqlNodeConverters { register(new SqlShowProcedureConverter()); registerCatalogConverters(); + registerConnectionConverters(); registerMaterializedTableConverters(); registerModelConverters(); registerTableConverters(); @@ -136,6 +137,10 @@ private static void registerCatalogConverters() { register(new SqlShowCreateCatalogConverter()); } + private static void registerConnectionConverters() { + register(new SqlCreateConnectionConverter()); + } + private static void registerMaterializedTableConverters() { register(new SqlAlterMaterializedTableAddDistributionConverter()); register(new SqlAlterMaterializedTableAddSchemaConverter()); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlConnectionOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlConnectionOperationConverterTest.java new file mode 100644 index 0000000000000..e9ef092cc17b0 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlConnectionOperationConverterTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations; + +import org.apache.flink.sql.parser.error.SqlValidateException; +import org.apache.flink.table.catalog.CatalogConnection; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.ddl.CreateConnectionOperation; + +import org.apache.calcite.sql.parser.SqlParseException; +import org.junit.jupiter.api.Test; + +import java.util.LinkedHashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for converting connection statements to operations. */ +class SqlConnectionOperationConverterTest extends SqlNodeToOperationConversionTestBase { + + @Test + void testCreateConnection() { + Operation operation = parse("CREATE CONNECTION my_conn WITH ('k' = 'v')"); + assertThat(operation).isInstanceOf(CreateConnectionOperation.class); + CreateConnectionOperation op = (CreateConnectionOperation) operation; + + assertThat(op.getConnectionIdentifier()) + .isEqualTo(ObjectIdentifier.of("builtin", "default", "my_conn")); + assertThat(op.getCatalogConnection().getOptions()).isEqualTo(Map.of("k", "v")); + assertThat(op.getCatalogConnection().getComment()).isNull(); + assertThat(op.isIgnoreIfExists()).isFalse(); + assertThat(op.isTemporary()).isFalse(); + } + + @Test + void testCreateConnectionIfNotExists() { + Operation operation = parse("CREATE CONNECTION IF NOT EXISTS my_conn WITH ('k' = 'v')"); + CreateConnectionOperation op = (CreateConnectionOperation) operation; + assertThat(op.isIgnoreIfExists()).isTrue(); + assertThat(op.isTemporary()).isFalse(); + } + + @Test + void testCreateTemporaryConnection() { + Operation operation = parse("CREATE TEMPORARY CONNECTION my_conn WITH ('k' = 'v')"); + CreateConnectionOperation op = (CreateConnectionOperation) operation; + assertThat(op.isTemporary()).isTrue(); + assertThat(op.isIgnoreIfExists()).isFalse(); + } + + @Test + void testCreateTemporarySystemConnection() { + Operation operation = + parse("CREATE TEMPORARY SYSTEM CONNECTION my_conn WITH ('k' = 'v')"); + CreateConnectionOperation op = (CreateConnectionOperation) operation; + assertThat(op.isTemporary()).isTrue(); + } + + @Test + void testCreateConnectionWithComment() { + Operation operation = + parse("CREATE CONNECTION my_conn COMMENT 'hi there' WITH ('k' = 'v')"); + CreateConnectionOperation op = (CreateConnectionOperation) operation; + CatalogConnection conn = op.getCatalogConnection(); + assertThat(conn.getComment()).isEqualTo("hi there"); + } + + @Test + void testCreateConnectionWithFullyQualifiedName() { + Operation operation = parse("CREATE CONNECTION cat1.db1.my_conn WITH ('k' = 'v')"); + CreateConnectionOperation op = (CreateConnectionOperation) operation; + assertThat(op.getConnectionIdentifier()) + .isEqualTo(ObjectIdentifier.of("cat1", "db1", "my_conn")); + } + + @Test + void testCreateConnectionPreservesOptionOrder() { + Operation operation = + parse("CREATE CONNECTION my_conn WITH ('k1' = 'v1', 'k2' = 'v2', 'k3' = 'v3')"); + CreateConnectionOperation op = (CreateConnectionOperation) operation; + Map expected = new LinkedHashMap<>(); + expected.put("k1", "v1"); + expected.put("k2", "v2"); + expected.put("k3", "v3"); + assertThat(op.getCatalogConnection().getOptions()) + .containsExactlyEntriesOf(expected); + } + + @Test + void testCreateSystemConnectionWithoutTemporaryRejected() { + assertThatThrownBy(() -> parse("CREATE SYSTEM CONNECTION my_conn WITH ('k' = 'v')")) + .hasRootCauseInstanceOf(SqlValidateException.class) + .hasMessageContaining("CREATE SYSTEM CONNECTION"); + } + + @Test + void testCreateConnectionWithEmptyOptionsRejected() { + assertThatThrownBy(() -> parse("CREATE CONNECTION my_conn WITH ()")) + .satisfiesAnyOf( + t -> assertThat(t).isInstanceOf(SqlParseException.class), + t -> assertThat(t.getCause()).isInstanceOf(SqlValidateException.class)); + } +}