From 3daab43a55ee29116c04d533ba94252811748f16 Mon Sep 17 00:00:00 2001 From: wushengyeyouya <690574002@qq.com> Date: Mon, 16 Aug 2021 15:46:04 +0800 Subject: [PATCH] Optimize the code compatibility of flink EngineConn, and optimize the pom parent of flink module. --- .../engineconn-plugins/flink/pom.xml | 3 +- .../sql/operation/OperationFactory.java | 112 +---- .../sql/operation/OperationFactoryImpl.java | 128 ++++++ .../sql/operation/impl/DDLOperation.java | 2 +- .../flink/client/sql/parser/SqlCommand.java | 126 ++++++ .../client/sql/parser/SqlCommandCall.java | 56 +++ .../client/sql/parser/SqlCommandParser.java | 45 ++ .../sql/parser/SqlCommandParserImpl.java | 230 ++++++++++ .../flink/client/utils/SqlCommandParser.java | 411 ------------------ .../flink/util/RetryUtil.java | 72 --- .../flink/FlinkEngineConnPlugin.scala | 7 +- .../flink/config/FlinkEnvConfiguration.scala | 4 + .../executor/FlinkCodeOnceExecutor.scala | 39 +- .../FlinkSQLComputationExecutor.scala | 13 +- .../FlinkExecutorManager.scala | 2 +- .../factory/FlinkEngineConnFactory.scala | 11 +- .../factory/FlinkSQLExecutorFactory.scala | 12 - .../engineconnplugin/flink/ql/Grammar.scala | 30 -- .../ql/impl/CreateTableAsSelectGrammar.scala | 61 --- .../ClassUtil.scala} | 31 +- 20 files changed, 646 insertions(+), 749 deletions(-) create mode 100644 linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/client/sql/operation/OperationFactoryImpl.java create mode 100644 linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/client/sql/parser/SqlCommand.java create mode 100644 linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/client/sql/parser/SqlCommandCall.java create mode 100644 linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/client/sql/parser/SqlCommandParser.java create mode 100644 linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/client/sql/parser/SqlCommandParserImpl.java delete mode 100644 linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/client/utils/SqlCommandParser.java delete mode 100644 linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/util/RetryUtil.java delete mode 100644 linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/ql/Grammar.scala delete mode 100644 linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/ql/impl/CreateTableAsSelectGrammar.scala rename linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/{ql/GrammarFactory.scala => util/ClassUtil.scala} (50%) diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/pom.xml b/linkis-engineconn-plugins/engineconn-plugins/flink/pom.xml index e95e096f97..5268a5af6f 100644 --- a/linkis-engineconn-plugins/engineconn-plugins/flink/pom.xml +++ b/linkis-engineconn-plugins/engineconn-plugins/flink/pom.xml @@ -18,10 +18,9 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - linkis + linkis-engineconn-plugins com.webank.wedatasphere.linkis 1.0.2 - ../../pom.xml 4.0.0 diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/client/sql/operation/OperationFactory.java b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/client/sql/operation/OperationFactory.java index d3b834afb9..1d7d49d9d7 100644 --- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/client/sql/operation/OperationFactory.java +++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/client/sql/operation/OperationFactory.java @@ -19,115 +19,15 @@ package com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation; -import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.impl.CreateViewOperation; -import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.impl.DDLOperation; -import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.impl.DescribeTableOperation; -import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.impl.DropViewOperation; -import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.impl.ExplainOperation; -import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.impl.InsertOperation; -import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.impl.ResetOperation; -import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.impl.SelectOperation; -import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.impl.SetOperation; -import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.impl.ShowCatalogsOperation; -import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.impl.ShowCurrentCatalogOperation; -import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.impl.ShowCurrentDatabaseOperation; -import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.impl.ShowDatabasesOperation; -import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.impl.ShowFunctionsOperation; -import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.impl.ShowModulesOperation; -import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.impl.ShowTablesOperation; -import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.impl.ShowViewsOperation; -import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.impl.UseCatalogOperation; -import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.impl.UseDatabaseOperation; -import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.utils.SqlCommandParser.SqlCommandCall; +import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.parser.SqlCommandCall; import com.webank.wedatasphere.linkis.engineconnplugin.flink.context.FlinkEngineConnContext; import com.webank.wedatasphere.linkis.engineconnplugin.flink.exception.SqlParseException; -import com.webank.wedatasphere.linkis.engineconnplugin.flink.ql.GrammarFactory; -public class OperationFactory { +public interface OperationFactory { - public static Operation createOperation(SqlCommandCall call, FlinkEngineConnContext context) throws SqlParseException { - - Operation operation; - switch (call.command) { - case SELECT: - operation = new SelectOperation(context, call.operands[0]); - break; - case CREATE_VIEW: - operation = new CreateViewOperation(context, call.operands[0], call.operands[1]); - break; - case DROP_VIEW: - operation = new DropViewOperation(context, call.operands[0], Boolean.parseBoolean(call.operands[1])); - break; - case LINKIS_GRAMMAR: - operation = GrammarFactory.apply(call.operands[0], context); - break; - case CREATE_TABLE: - case DROP_TABLE: - case ALTER_TABLE: - case CREATE_DATABASE: - case DROP_DATABASE: - case ALTER_DATABASE: - operation = new DDLOperation(context, call.operands[0], call.command); - break; - case SET: - // list all properties - if (call.operands.length == 0) { - operation = new SetOperation(context); - } else { - // set a property - operation = new SetOperation(context, call.operands[0], call.operands[1]); - } - break; - case RESET: - if (call.operands.length > 0) { - throw new SqlParseException("Only RESET ALL is supported now"); - } - operation = new ResetOperation(context); - break; - case USE_CATALOG: - operation = new UseCatalogOperation(context, call.operands[0]); - break; - case USE: - operation = new UseDatabaseOperation(context, call.operands[0]); - break; - case INSERT_INTO: - case INSERT_OVERWRITE: - operation = new InsertOperation(context, call.operands[0], call.operands[1]); - break; - case SHOW_MODULES: - operation = new ShowModulesOperation(context); - break; - case SHOW_CATALOGS: - operation = new ShowCatalogsOperation(context); - break; - case SHOW_CURRENT_CATALOG: - operation = new ShowCurrentCatalogOperation(context); - break; - case SHOW_DATABASES: - operation = new ShowDatabasesOperation(context); - break; - case SHOW_CURRENT_DATABASE: - operation = new ShowCurrentDatabaseOperation(context); - break; - case SHOW_TABLES: - operation = new ShowTablesOperation(context); - break; - case SHOW_VIEWS: - operation = new ShowViewsOperation(context); - break; - case SHOW_FUNCTIONS: - operation = new ShowFunctionsOperation(context); - break; - case DESCRIBE_TABLE: - operation = new DescribeTableOperation(context, call.operands[0]); - break; - case EXPLAIN: - operation = new ExplainOperation(context, call.operands[0]); - break; - default: - throw new SqlParseException("Unsupported command call " + call + ". This is a bug."); - } - - return operation; + static OperationFactory getOperationFactory() { + return OperationFactoryImpl.getInstance(); } + + Operation createOperation(SqlCommandCall call, FlinkEngineConnContext context) throws SqlParseException; } diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/client/sql/operation/OperationFactoryImpl.java b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/client/sql/operation/OperationFactoryImpl.java new file mode 100644 index 0000000000..713ab4a69c --- /dev/null +++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/client/sql/operation/OperationFactoryImpl.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation; + +import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.impl.*; +import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.parser.SqlCommandCall; +import com.webank.wedatasphere.linkis.engineconnplugin.flink.context.FlinkEngineConnContext; +import com.webank.wedatasphere.linkis.engineconnplugin.flink.exception.SqlParseException; +import com.webank.wedatasphere.linkis.engineconnplugin.flink.util.ClassUtil; + + +public class OperationFactoryImpl implements OperationFactory { + + private OperationFactoryImpl() { + } + + @Override + public Operation createOperation(SqlCommandCall call, FlinkEngineConnContext context) throws SqlParseException { + Operation operation; + switch (call.command) { + case SELECT: + operation = new SelectOperation(context, call.operands[0]); + break; + case CREATE_VIEW: + operation = new CreateViewOperation(context, call.operands[0], call.operands[1]); + break; + case DROP_VIEW: + operation = new DropViewOperation(context, call.operands[0], Boolean.parseBoolean(call.operands[1])); + break; + case CREATE_TABLE: + case DROP_TABLE: + case ALTER_TABLE: + case CREATE_DATABASE: + case DROP_DATABASE: + case ALTER_DATABASE: + operation = new DDLOperation(context, call.operands[0], call.command); + break; + case SET: + // list all properties + if (call.operands.length == 0) { + operation = new SetOperation(context); + } else { + // set a property + operation = new SetOperation(context, call.operands[0], call.operands[1]); + } + break; + case RESET: + if (call.operands.length > 0) { + throw new SqlParseException("Only RESET ALL is supported now"); + } + operation = new ResetOperation(context); + break; + case USE_CATALOG: + operation = new UseCatalogOperation(context, call.operands[0]); + break; + case USE: + operation = new UseDatabaseOperation(context, call.operands[0]); + break; + case INSERT_INTO: + case INSERT_OVERWRITE: + operation = new InsertOperation(context, call.operands[0], call.operands[1]); + break; + case SHOW_MODULES: + operation = new ShowModulesOperation(context); + break; + case SHOW_CATALOGS: + operation = new ShowCatalogsOperation(context); + break; + case SHOW_CURRENT_CATALOG: + operation = new ShowCurrentCatalogOperation(context); + break; + case SHOW_DATABASES: + operation = new ShowDatabasesOperation(context); + break; + case SHOW_CURRENT_DATABASE: + operation = new ShowCurrentDatabaseOperation(context); + break; + case SHOW_TABLES: + operation = new ShowTablesOperation(context); + break; + case SHOW_VIEWS: + operation = new ShowViewsOperation(context); + break; + case SHOW_FUNCTIONS: + operation = new ShowFunctionsOperation(context); + break; + case DESCRIBE_TABLE: + operation = new DescribeTableOperation(context, call.operands[0]); + break; + case EXPLAIN: + operation = new ExplainOperation(context, call.operands[0]); + break; + default: + throw new SqlParseException("Unsupported command call " + call + ". This is a bug."); + } + return operation; + } + + private static OperationFactory operationFactory; + + public static OperationFactory getInstance() { + if(operationFactory == null) { + synchronized (OperationFactory.class) { + if(operationFactory == null) { + operationFactory = ClassUtil.getInstance(OperationFactory.class, new OperationFactoryImpl()); + } + } + } + return operationFactory; + } + +} diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/client/sql/operation/impl/DDLOperation.java b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/client/sql/operation/impl/DDLOperation.java index 52b04e3e9d..4baf53d041 100644 --- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/client/sql/operation/impl/DDLOperation.java +++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/client/sql/operation/impl/DDLOperation.java @@ -22,7 +22,7 @@ import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.NonJobOperation; import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.OperationUtil; import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.result.ResultSet; -import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.utils.SqlCommandParser.SqlCommand; +import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.parser.SqlCommand; import com.webank.wedatasphere.linkis.engineconnplugin.flink.context.FlinkEngineConnContext; import com.webank.wedatasphere.linkis.engineconnplugin.flink.exception.SqlExecutionException; import org.apache.flink.table.api.TableEnvironment; diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/client/sql/parser/SqlCommand.java b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/client/sql/parser/SqlCommand.java new file mode 100644 index 0000000000..fdcd21a110 --- /dev/null +++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/client/sql/parser/SqlCommand.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.parser; + +import java.util.Optional; +import java.util.function.Function; +import java.util.regex.Pattern; + +public enum SqlCommand { + + LINKIS_GRAMMAR, + + SELECT, + + INSERT_INTO, + + INSERT_OVERWRITE, + + CREATE_TABLE, + + ALTER_TABLE, + + DROP_TABLE, + + CREATE_VIEW, + + DROP_VIEW, + + CREATE_DATABASE, + + ALTER_DATABASE, + + DROP_DATABASE, + + USE_CATALOG, + + USE, + + SHOW_CATALOGS, + + SHOW_DATABASES, + + SHOW_TABLES, + + SHOW_FUNCTIONS, + + EXPLAIN, + + DESCRIBE_TABLE, + + RESET, + + SET( + "SET", + Inner_Config.NO_OPERANDS), + + SHOW_MODULES( + "SHOW\\s+MODULES", + Inner_Config.NO_OPERANDS), + + SHOW_VIEWS( + "SHOW\\s+VIEWS", + Inner_Config.NO_OPERANDS), + + SHOW_CURRENT_CATALOG( + "SHOW\\s+CURRENT\\s+CATALOG", + Inner_Config.NO_OPERANDS), + + SHOW_CURRENT_DATABASE( + "SHOW\\s+CURRENT\\s+DATABASE", + Inner_Config.NO_OPERANDS); + + private final Pattern pattern; + private final Function> operandConverter; + + SqlCommand(String matchingRegex, Function> operandConverter) { + this.pattern = Pattern.compile(matchingRegex, Inner_Config.DEFAULT_PATTERN_FLAGS); + this.operandConverter = operandConverter; + } + + SqlCommand() { + this.pattern = null; + this.operandConverter = null; + } + + @Override + public String toString() { + return super.toString().replace('_', ' '); + } + + boolean hasPattern() { + return pattern != null && operandConverter != null; + } + + Pattern getPattern() { + return pattern; + } + + Function> getOperandConverter() { + return operandConverter; + } + + static class Inner_Config { + private static final Function> NO_OPERANDS = + (operands) -> Optional.of(new String[0]); + + private static final int DEFAULT_PATTERN_FLAGS = Pattern.CASE_INSENSITIVE | Pattern.DOTALL; + } + +} diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/client/sql/parser/SqlCommandCall.java b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/client/sql/parser/SqlCommandCall.java new file mode 100644 index 0000000000..e292c5cd82 --- /dev/null +++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/client/sql/parser/SqlCommandCall.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.parser; + +import java.util.Arrays; +import java.util.Objects; + +public class SqlCommandCall { + public final SqlCommand command; + public final String[] operands; + + public SqlCommandCall(SqlCommand command, String[] operands) { + this.command = command; + this.operands = operands; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SqlCommandCall that = (SqlCommandCall) o; + return command == that.command && Arrays.equals(operands, that.operands); + } + + @Override + public int hashCode() { + int result = Objects.hash(command); + result = 31 * result + Arrays.hashCode(operands); + return result; + } + + @Override + public String toString() { + return command + "(" + Arrays.toString(operands) + ")"; + } +} diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/client/sql/parser/SqlCommandParser.java b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/client/sql/parser/SqlCommandParser.java new file mode 100644 index 0000000000..8fcba36b17 --- /dev/null +++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/client/sql/parser/SqlCommandParser.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.parser; + +import com.webank.wedatasphere.linkis.engineconnplugin.flink.exception.SqlParseException; + +import java.util.Optional; + +public interface SqlCommandParser { + + /** + * Parse the given statement and return corresponding SqlCommandCall. + * + *

only `set`, `show modules`, `show current catalog` and `show current database` + * are parsed through regex matching, other commands are parsed through sql parser. + * + *

throw {@link SqlParseException} if the statement contains multiple sub-statements separated by semicolon + * or there is a parse error. + * + *

NOTE: sql parser only parses the statement to get the corresponding SqlCommand, + * do not check whether the statement is valid here. + */ + Optional parse(String stmt, boolean isBlinkPlanner) throws SqlParseException; + + static SqlCommandParser getSqlCommandParser() { + return SqlCommandParserImpl.getInstance(); + } + +} diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/client/sql/parser/SqlCommandParserImpl.java b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/client/sql/parser/SqlCommandParserImpl.java new file mode 100644 index 0000000000..aa9aad62b1 --- /dev/null +++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/client/sql/parser/SqlCommandParserImpl.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.parser; + +import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.OperationFactory; +import com.webank.wedatasphere.linkis.engineconnplugin.flink.exception.SqlParseException; +import com.webank.wedatasphere.linkis.engineconnplugin.flink.util.ClassUtil; +import org.apache.calcite.config.Lex; +import org.apache.calcite.sql.*; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.flink.sql.parser.ddl.*; +import org.apache.flink.sql.parser.dml.RichSqlInsert; +import org.apache.flink.sql.parser.dql.*; +import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl; +import org.apache.flink.sql.parser.validate.FlinkSqlConformance; + +import java.lang.reflect.Field; +import java.util.Optional; +import java.util.regex.Matcher; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class SqlCommandParserImpl implements SqlCommandParser { + + @Override + public Optional parse(String stmt, boolean isBlinkPlanner) throws SqlParseException { + // normalize + String stmtForRegexMatch = stmt.trim(); + // remove ';' at the end + if (stmtForRegexMatch.endsWith(";")) { + stmtForRegexMatch = stmtForRegexMatch.substring(0, stmtForRegexMatch.length() - 1).trim(); + } + + // only parse gateway specific statements + for (SqlCommand cmd : SqlCommand.values()) { + if (cmd.hasPattern()) { + final Matcher matcher = cmd.getPattern().matcher(stmtForRegexMatch); + if (matcher.matches()) { + final String[] groups = new String[matcher.groupCount()]; + for (int i = 0; i < groups.length; i++) { + groups[i] = matcher.group(i + 1); + } + return cmd.getOperandConverter().apply(groups) + .map((operands) -> new SqlCommandCall(cmd, operands)); + } + } + } + + return parseStmt(stmt, isBlinkPlanner); + } + + /** + * Flink Parser only supports partial Operations, so we directly use Calcite Parser here. + * Once Flink Parser supports all Operations, we should use Flink Parser instead of Calcite Parser. + */ + private Optional parseStmt(String stmt, boolean isBlinkPlanner) throws SqlParseException { + SqlParser.Config config = createSqlParserConfig(isBlinkPlanner); + SqlParser sqlParser = SqlParser.create(stmt, config); + SqlNodeList sqlNodes; + try { + sqlNodes = sqlParser.parseStmtList(); + // no need check the statement is valid here + } catch (org.apache.calcite.sql.parser.SqlParseException e) { + throw new SqlParseException("Failed to parse statement.", e); + } + if (sqlNodes.size() != 1) { + throw new SqlParseException("Only single statement is supported now"); + } + + final String[] operands; + final SqlCommand cmd; + SqlNode node = sqlNodes.get(0); + if (node.getKind().belongsTo(SqlKind.QUERY)) { + cmd = SqlCommand.SELECT; + operands = new String[] { stmt }; + } else if (node instanceof RichSqlInsert) { + RichSqlInsert insertNode = (RichSqlInsert) node; + cmd = insertNode.isOverwrite() ? SqlCommand.INSERT_OVERWRITE : SqlCommand.INSERT_INTO; + operands = new String[] { stmt, insertNode.getTargetTable().toString() }; + } else if (node instanceof SqlShowTables) { + cmd = SqlCommand.SHOW_TABLES; + operands = new String[0]; + } else if (node instanceof SqlCreateTable) { + cmd = SqlCommand.CREATE_TABLE; + operands = new String[] { stmt }; + } else if (node instanceof SqlDropTable) { + cmd = SqlCommand.DROP_TABLE; + operands = new String[] { stmt }; + } else if (node instanceof SqlAlterTable) { + cmd = SqlCommand.ALTER_TABLE; + operands = new String[] { stmt }; + } else if (node instanceof SqlCreateView) { + // TableEnvironment currently does not support creating view + // so we have to perform the modification here + SqlCreateView createViewNode = (SqlCreateView) node; + cmd = SqlCommand.CREATE_VIEW; + operands = new String[] { + createViewNode.getViewName().toString(), + createViewNode.getQuery().toString() + }; + } else if (node instanceof SqlDropView) { + // TableEnvironment currently does not support dropping view + // so we have to perform the modification here + SqlDropView dropViewNode = (SqlDropView) node; + + Field ifExistsField; + try { + ifExistsField = SqlDrop.class.getDeclaredField("ifExists"); + } catch (NoSuchFieldException e) { + throw new SqlParseException("Failed to parse drop view statement.", e); + } + ifExistsField.setAccessible(true); + boolean ifExists; + try { + ifExists = ifExistsField.getBoolean(dropViewNode); + } catch (IllegalAccessException e) { + throw new SqlParseException("Failed to parse drop view statement.", e); + } + + cmd = SqlCommand.DROP_VIEW; + operands = new String[] { dropViewNode.getViewName().toString(), String.valueOf(ifExists) }; + } else if (node instanceof SqlShowDatabases) { + cmd = SqlCommand.SHOW_DATABASES; + operands = new String[0]; + } else if (node instanceof SqlCreateDatabase) { + cmd = SqlCommand.CREATE_DATABASE; + operands = new String[] { stmt }; + } else if (node instanceof SqlDropDatabase) { + cmd = SqlCommand.DROP_DATABASE; + operands = new String[] { stmt }; + } else if (node instanceof SqlAlterDatabase) { + cmd = SqlCommand.ALTER_DATABASE; + operands = new String[] { stmt }; + } else if (node instanceof SqlShowCatalogs) { + cmd = SqlCommand.SHOW_CATALOGS; + operands = new String[0]; + } else if (node instanceof SqlShowFunctions) { + cmd = SqlCommand.SHOW_FUNCTIONS; + operands = new String[0]; + } else if (node instanceof SqlUseCatalog) { + cmd = SqlCommand.USE_CATALOG; + operands = new String[] { ((SqlUseCatalog) node).getCatalogName().getSimple() }; + } else if (node instanceof SqlUseDatabase) { + cmd = SqlCommand.USE; + operands = new String[] { ((SqlUseDatabase) node).getDatabaseName().toString() }; + } else if (node instanceof SqlRichDescribeTable) { + cmd = SqlCommand.DESCRIBE_TABLE; + // TODO support describe extended + String[] fullTableName = ((SqlRichDescribeTable) node).fullTableName(); + String escapedName = + Stream.of(fullTableName).map(s -> "`" + s + "`").collect(Collectors.joining(".")); + operands = new String[] { escapedName }; + } else if (node instanceof SqlExplain) { + cmd = SqlCommand.EXPLAIN; + // TODO support explain details + operands = new String[] { ((SqlExplain) node).getExplicandum().toString() }; + } else if (node instanceof SqlSetOption) { + SqlSetOption setNode = (SqlSetOption) node; + // refer to SqlSetOption#unparseAlterOperation + if (setNode.getValue() != null) { + cmd = SqlCommand.SET; + operands = new String[] { setNode.getName().toString(), setNode.getValue().toString() }; + } else { + cmd = SqlCommand.RESET; + if ("ALL".equals(setNode.getName().toString().toUpperCase())) { + operands = new String[0]; + } else { + operands = new String[] { setNode.getName().toString() }; + } + } + } else { + cmd = null; + operands = new String[0]; + } + + if (cmd == null) { + return Optional.empty(); + } else { + // use the origin given statement to make sure + // users can find the correct line number when parsing failed + return Optional.of(new SqlCommandCall(cmd, operands)); + } + } + + /** + * A temporary solution. We can't get the default SqlParser config through table environment now. + */ + private SqlParser.Config createSqlParserConfig(boolean isBlinkPlanner) { + SqlParser.Config config = SqlParser + .config() + .withParserFactory(FlinkSqlParserImpl.FACTORY) + .withConformance(FlinkSqlConformance.DEFAULT) + .withLex(Lex.JAVA); + if (isBlinkPlanner) { + return config.withIdentifierMaxLength(256); + } else { + return config; + } + } + + private static SqlCommandParser sqlCommandParser; + + public static SqlCommandParser getInstance() { + if(sqlCommandParser == null) { + synchronized (OperationFactory.class) { + if(sqlCommandParser == null) { + sqlCommandParser = ClassUtil.getInstance(SqlCommandParser.class, new SqlCommandParserImpl()); + } + } + } + return sqlCommandParser; + } + +} diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/client/utils/SqlCommandParser.java b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/client/utils/SqlCommandParser.java deleted file mode 100644 index 9ca6b5459a..0000000000 --- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/client/utils/SqlCommandParser.java +++ /dev/null @@ -1,411 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.webank.wedatasphere.linkis.engineconnplugin.flink.client.utils; - -import com.webank.wedatasphere.linkis.engineconnplugin.flink.exception.SqlParseException; -import com.webank.wedatasphere.linkis.engineconnplugin.flink.ql.GrammarFactory; -import java.lang.reflect.Field; -import java.util.Arrays; -import java.util.Objects; -import java.util.Optional; -import java.util.function.Function; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import org.apache.calcite.config.Lex; -import org.apache.calcite.sql.SqlDrop; -import org.apache.calcite.sql.SqlExplain; -import org.apache.calcite.sql.SqlKind; -import org.apache.calcite.sql.SqlNode; -import org.apache.calcite.sql.SqlNodeList; -import org.apache.calcite.sql.SqlSetOption; -import org.apache.calcite.sql.parser.SqlParser; -import org.apache.flink.sql.parser.ddl.SqlAlterDatabase; -import org.apache.flink.sql.parser.ddl.SqlAlterTable; -import org.apache.flink.sql.parser.ddl.SqlCreateDatabase; -import org.apache.flink.sql.parser.ddl.SqlCreateTable; -import org.apache.flink.sql.parser.ddl.SqlCreateView; -import org.apache.flink.sql.parser.ddl.SqlDropDatabase; -import org.apache.flink.sql.parser.ddl.SqlDropTable; -import org.apache.flink.sql.parser.ddl.SqlDropView; -import org.apache.flink.sql.parser.ddl.SqlUseCatalog; -import org.apache.flink.sql.parser.ddl.SqlUseDatabase; -import org.apache.flink.sql.parser.dml.RichSqlInsert; -import org.apache.flink.sql.parser.dql.SqlRichDescribeTable; -import org.apache.flink.sql.parser.dql.SqlShowCatalogs; -import org.apache.flink.sql.parser.dql.SqlShowDatabases; -import org.apache.flink.sql.parser.dql.SqlShowFunctions; -import org.apache.flink.sql.parser.dql.SqlShowTables; -import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl; -import org.apache.flink.sql.parser.validate.FlinkSqlConformance; - -/** - * Simple parser for determining the type of command and its parameters. - */ -public final class SqlCommandParser { - - private SqlCommandParser() { - // private - } - - /** - * Parse the given statement and return corresponding SqlCommandCall. - * - *

only `set`, `show modules`, `show current catalog` and `show current database` - * are parsed through regex matching, other commands are parsed through sql parser. - * - *

throw {@link SqlParseException} if the statement contains multiple sub-statements separated by semicolon - * or there is a parse error. - * - *

NOTE: sql parser only parses the statement to get the corresponding SqlCommand, - * do not check whether the statement is valid here. - */ - public static Optional parse(String stmt, boolean isBlinkPlanner) throws SqlParseException { - Optional call = Arrays.stream(GrammarFactory.getGrammars()).filter(grammar -> grammar.canParse(stmt)).findFirst() - .map(grammar -> new SqlCommandCall(SqlCommand.LINKIS_GRAMMAR, new String[] {stmt})); - if(call.isPresent()) { - return call; - } - // normalize - String stmtForRegexMatch = stmt.trim(); - // remove ';' at the end - if (stmtForRegexMatch.endsWith(";")) { - stmtForRegexMatch = stmtForRegexMatch.substring(0, stmtForRegexMatch.length() - 1).trim(); - } - - // only parse gateway specific statements - for (SqlCommand cmd : SqlCommand.values()) { - if (cmd.hasPattern()) { - final Matcher matcher = cmd.pattern.matcher(stmtForRegexMatch); - if (matcher.matches()) { - final String[] groups = new String[matcher.groupCount()]; - for (int i = 0; i < groups.length; i++) { - groups[i] = matcher.group(i + 1); - } - return cmd.operandConverter.apply(groups) - .map((operands) -> new SqlCommandCall(cmd, operands)); - } - } - } - - return parseStmt(stmt, isBlinkPlanner); - } - - /** - * Flink Parser only supports partial Operations, so we directly use Calcite Parser here. - * Once Flink Parser supports all Operations, we should use Flink Parser instead of Calcite Parser. - */ - private static Optional parseStmt(String stmt, boolean isBlinkPlanner) throws SqlParseException { - SqlParser.Config config = createSqlParserConfig(isBlinkPlanner); - SqlParser sqlParser = SqlParser.create(stmt, config); - SqlNodeList sqlNodes; - try { - sqlNodes = sqlParser.parseStmtList(); - // no need check the statement is valid here - } catch (org.apache.calcite.sql.parser.SqlParseException e) { - throw new SqlParseException("Failed to parse statement.", e); - } - if (sqlNodes.size() != 1) { - throw new SqlParseException("Only single statement is supported now"); - } - - final String[] operands; - final SqlCommand cmd; - SqlNode node = sqlNodes.get(0); - if (node.getKind().belongsTo(SqlKind.QUERY)) { - cmd = SqlCommand.SELECT; - operands = new String[] { stmt }; - } else if (node instanceof RichSqlInsert) { - RichSqlInsert insertNode = (RichSqlInsert) node; - cmd = insertNode.isOverwrite() ? SqlCommand.INSERT_OVERWRITE : SqlCommand.INSERT_INTO; - operands = new String[] { stmt, insertNode.getTargetTable().toString() }; - } else if (node instanceof SqlShowTables) { - cmd = SqlCommand.SHOW_TABLES; - operands = new String[0]; - } else if (node instanceof SqlCreateTable) { - cmd = SqlCommand.CREATE_TABLE; - operands = new String[] { stmt }; - } else if (node instanceof SqlDropTable) { - cmd = SqlCommand.DROP_TABLE; - operands = new String[] { stmt }; - } else if (node instanceof SqlAlterTable) { - cmd = SqlCommand.ALTER_TABLE; - operands = new String[] { stmt }; - } else if (node instanceof SqlCreateView) { - // TableEnvironment currently does not support creating view - // so we have to perform the modification here - SqlCreateView createViewNode = (SqlCreateView) node; - cmd = SqlCommand.CREATE_VIEW; - operands = new String[] { - createViewNode.getViewName().toString(), - createViewNode.getQuery().toString() - }; - } else if (node instanceof SqlDropView) { - // TableEnvironment currently does not support dropping view - // so we have to perform the modification here - SqlDropView dropViewNode = (SqlDropView) node; - - // TODO: we can't get this field from SqlDropView normally until FLIP-71 is implemented - Field ifExistsField; - try { - ifExistsField = SqlDrop.class.getDeclaredField("ifExists"); - } catch (NoSuchFieldException e) { - throw new SqlParseException("Failed to parse drop view statement.", e); - } - ifExistsField.setAccessible(true); - boolean ifExists; - try { - ifExists = ifExistsField.getBoolean(dropViewNode); - } catch (IllegalAccessException e) { - throw new SqlParseException("Failed to parse drop view statement.", e); - } - - cmd = SqlCommand.DROP_VIEW; - operands = new String[] { dropViewNode.getViewName().toString(), String.valueOf(ifExists) }; - } else if (node instanceof SqlShowDatabases) { - cmd = SqlCommand.SHOW_DATABASES; - operands = new String[0]; - } else if (node instanceof SqlCreateDatabase) { - cmd = SqlCommand.CREATE_DATABASE; - operands = new String[] { stmt }; - } else if (node instanceof SqlDropDatabase) { - cmd = SqlCommand.DROP_DATABASE; - operands = new String[] { stmt }; - } else if (node instanceof SqlAlterDatabase) { - cmd = SqlCommand.ALTER_DATABASE; - operands = new String[] { stmt }; - } else if (node instanceof SqlShowCatalogs) { - cmd = SqlCommand.SHOW_CATALOGS; - operands = new String[0]; - } else if (node instanceof SqlShowFunctions) { - cmd = SqlCommand.SHOW_FUNCTIONS; - operands = new String[0]; - } else if (node instanceof SqlUseCatalog) { - cmd = SqlCommand.USE_CATALOG; - operands = new String[] { ((SqlUseCatalog) node).getCatalogName().getSimple() }; - } else if (node instanceof SqlUseDatabase) { - cmd = SqlCommand.USE; - operands = new String[] { ((SqlUseDatabase) node).getDatabaseName().toString() }; - } else if (node instanceof SqlRichDescribeTable) { - cmd = SqlCommand.DESCRIBE_TABLE; - // TODO support describe extended - String[] fullTableName = ((SqlRichDescribeTable) node).fullTableName(); - String escapedName = - Stream.of(fullTableName).map(s -> "`" + s + "`").collect(Collectors.joining(".")); - operands = new String[] { escapedName }; - } else if (node instanceof SqlExplain) { - cmd = SqlCommand.EXPLAIN; - // TODO support explain details - operands = new String[] { ((SqlExplain) node).getExplicandum().toString() }; - } else if (node instanceof SqlSetOption) { - SqlSetOption setNode = (SqlSetOption) node; - // refer to SqlSetOption#unparseAlterOperation - if (setNode.getValue() != null) { - cmd = SqlCommand.SET; - operands = new String[] { setNode.getName().toString(), setNode.getValue().toString() }; - } else { - cmd = SqlCommand.RESET; - if (setNode.getName().toString().toUpperCase().equals("ALL")) { - operands = new String[0]; - } else { - operands = new String[] { setNode.getName().toString() }; - } - } - } else { - cmd = null; - operands = new String[0]; - } - - if (cmd == null) { - return Optional.empty(); - } else { - // use the origin given statement to make sure - // users can find the correct line number when parsing failed - return Optional.of(new SqlCommandCall(cmd, operands)); - } - } - - /** - * A temporary solution. We can't get the default SqlParser config through table environment now. - */ - private static SqlParser.Config createSqlParserConfig(boolean isBlinkPlanner) { - if (isBlinkPlanner) { - return SqlParser - .configBuilder() - .setParserFactory(FlinkSqlParserImpl.FACTORY) - .setConformance(FlinkSqlConformance.DEFAULT) - .setLex(Lex.JAVA) - .setIdentifierMaxLength(256) - .build(); - } else { - return SqlParser - .configBuilder() - .setParserFactory(FlinkSqlParserImpl.FACTORY) - .setConformance(FlinkSqlConformance.DEFAULT) - .setLex(Lex.JAVA) - .build(); - } - } - - // -------------------------------------------------------------------------------------------- - - private static final Function> NO_OPERANDS = - (operands) -> Optional.of(new String[0]); - - private static final int DEFAULT_PATTERN_FLAGS = Pattern.CASE_INSENSITIVE | Pattern.DOTALL; - - /** - * Supported SQL commands. - */ - public enum SqlCommand { - LINKIS_GRAMMAR, - - SELECT, - - INSERT_INTO, - - INSERT_OVERWRITE, - - CREATE_TABLE, - - ALTER_TABLE, - - DROP_TABLE, - - CREATE_VIEW, - - DROP_VIEW, - - CREATE_DATABASE, - - ALTER_DATABASE, - - DROP_DATABASE, - - USE_CATALOG, - - USE, - - SHOW_CATALOGS, - - SHOW_DATABASES, - - SHOW_TABLES, - - SHOW_FUNCTIONS, - - EXPLAIN, - - DESCRIBE_TABLE, - - RESET, - - // the following commands are not supported by SQL parser but are needed by users - - SET( - "SET", - // `SET` with operands can be parsed by SQL parser - // we keep `SET` with no operands here to print all properties - NO_OPERANDS), - - // the following commands will be supported by SQL parser in the future - // remove them once they're supported - - // FLINK-17396 - SHOW_MODULES( - "SHOW\\s+MODULES", - NO_OPERANDS), - - // FLINK-17111 - SHOW_VIEWS( - "SHOW\\s+VIEWS", - NO_OPERANDS), - - // the following commands are not supported by SQL parser but are needed by JDBC driver - // these should not be exposed to the user and should be used internally - - SHOW_CURRENT_CATALOG( - "SHOW\\s+CURRENT\\s+CATALOG", - NO_OPERANDS), - - SHOW_CURRENT_DATABASE( - "SHOW\\s+CURRENT\\s+DATABASE", - NO_OPERANDS); - - public final Pattern pattern; - public final Function> operandConverter; - - SqlCommand(String matchingRegex, Function> operandConverter) { - this.pattern = Pattern.compile(matchingRegex, DEFAULT_PATTERN_FLAGS); - this.operandConverter = operandConverter; - } - - SqlCommand() { - this.pattern = null; - this.operandConverter = null; - } - - @Override - public String toString() { - return super.toString().replace('_', ' '); - } - - boolean hasPattern() { - return pattern != null; - } - } - - /** - * Call of SQL command with operands and command type. - */ - public static class SqlCommandCall { - public final SqlCommand command; - public final String[] operands; - - public SqlCommandCall(SqlCommand command, String[] operands) { - this.command = command; - this.operands = operands; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - SqlCommandCall that = (SqlCommandCall) o; - return command == that.command && Arrays.equals(operands, that.operands); - } - - @Override - public int hashCode() { - int result = Objects.hash(command); - result = 31 * result + Arrays.hashCode(operands); - return result; - } - - @Override - public String toString() { - return command + "(" + Arrays.toString(operands) + ")"; - } - } -} diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/util/RetryUtil.java b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/util/RetryUtil.java deleted file mode 100644 index fbaf364708..0000000000 --- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/util/RetryUtil.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.webank.wedatasphere.linkis.engineconnplugin.flink.util; - -import com.github.rholder.retry.RetryException; -import com.github.rholder.retry.Retryer; -import com.github.rholder.retry.RetryerBuilder; -import com.github.rholder.retry.StopStrategies; -import com.github.rholder.retry.WaitStrategies; -import com.google.common.base.Predicate; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A retry Util-class based on guava-retry(基于guava-retry的重试工具类) - * Retry Tool class based on guava-retry - */ -public class RetryUtil { - - private static final Logger logger = LoggerFactory.getLogger(RetryUtil.class); - - /** - * @param task task pending for retry(重试执行得任务) - * @param predicate retry predication(符合预期结果需要重试) - * @param fixedWaitTime time interval between retries(本次重试与上次重试之间的固定间隔时长) - * @param maxEachExecuTime maxium time for a retry(一次重试的最大执行的时间) - * @param attemptNumber number of retry attempts(重试次数) - */ - - public static T retry(Callable task, Predicate predicate, long fixedWaitTime, long maxEachExecuTime, - TimeUnit timeUnit, int attemptNumber) { - Retryer retryer = RetryerBuilder - .newBuilder() - // Will retry: runtime exception; checked exception. May not retry: error (抛出runtime异常、checked异常时都会重试,但是抛出error不会重试。) - .retryIfException() - // if predication is met, then retry(对执行结果的预期。符合预期就重试) - .retryIfResult(predicate) - // fixed waiting time for retry(每次重试固定等待fixedWaitTime时间) - .withWaitStrategy(WaitStrategies.fixedWait(fixedWaitTime, timeUnit)) - // number of retry attempts(尝试次数) - .withStopStrategy(StopStrategies.stopAfterAttempt(attemptNumber)) - .build(); - T t = null; - try { - t = retryer.call(task); - } catch (ExecutionException e) { - logger.error("", e); - } catch (RetryException e) { - logger.error("", e); - } - return t; - } - -} \ No newline at end of file diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/FlinkEngineConnPlugin.scala b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/FlinkEngineConnPlugin.scala index 71aa76f03e..9765c28054 100644 --- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/FlinkEngineConnPlugin.scala +++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/FlinkEngineConnPlugin.scala @@ -36,12 +36,7 @@ class FlinkEngineConnPlugin extends EngineConnPlugin { private val EP_CONTEXT_CONSTRUCTOR_LOCK = new Object() - override def init(params: java.util.Map[String, Any]): Unit = { - //do noting -// engineResourceFactory = new FlinkEngineConnResourceFactory -// engineConnLaunchBuilder = new FlinkEngineConnLaunchBuilder -// engineConnFactory = new FlinkEngineConnFactory - } + override def init(params: java.util.Map[String, Any]): Unit = {} override def getEngineResourceFactory: EngineResourceFactory = { EP_CONTEXT_CONSTRUCTOR_LOCK.synchronized{ diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala index edd32fda17..772b6c74c5 100644 --- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala +++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala @@ -52,4 +52,8 @@ object FlinkEnvConfiguration { val FLINK_CLIENT_REQUEST_TIMEOUT = CommonVars("flink.client.request.timeout", new TimeType("30s")) val FLINK_ONCE_APP_STATUS_FETCH_INTERVAL = CommonVars("flink.app.fetch.status.interval", new TimeType("5s")) + val FLINK_REPORTER_ENABLE = CommonVars("linkis.flink.reporter.enable", false) + val FLINK_REPORTER_CLASS = CommonVars("linkis.flink.reporter.class", "") + val FLINK_REPORTER_INTERVAL = CommonVars("linkis.flink.reporter.interval", new TimeType("60s")) + } diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/executor/FlinkCodeOnceExecutor.scala b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/executor/FlinkCodeOnceExecutor.scala index 14b303f158..04a630de02 100644 --- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/executor/FlinkCodeOnceExecutor.scala +++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/executor/FlinkCodeOnceExecutor.scala @@ -26,7 +26,6 @@ import com.webank.wedatasphere.linkis.engineconn.once.executor.OnceExecutorExecu import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.deployment.YarnPerJobClusterDescriptorAdapter import com.webank.wedatasphere.linkis.engineconnplugin.flink.context.FlinkEngineConnContext import com.webank.wedatasphere.linkis.engineconnplugin.flink.exception.FlinkInitFailedException -import com.webank.wedatasphere.linkis.engineconnplugin.flink.ql.GrammarFactory import com.webank.wedatasphere.linkis.governance.common.paser.{CodeParserFactory, CodeType} import com.webank.wedatasphere.linkis.protocol.constants.TaskConstant import com.webank.wedatasphere.linkis.scheduler.executer.ErrorExecuteResponse @@ -63,7 +62,7 @@ class FlinkCodeOnceExecutor(override val id: Long, future = Utils.defaultScheduler.submit(new Runnable { override def run(): Unit = { info("Try to execute codes.") - Utils.tryCatch(runCode()){ t => + Utils.tryCatch(CodeParserFactory.getCodeParser(CodeType.SQL).parse(codes).filter(StringUtils.isNotBlank).foreach(runCode)){ t => error("Run code failed!", t) setResponse(ErrorExecuteResponse("Run code failed!", t)) tryFailed() @@ -81,30 +80,28 @@ class FlinkCodeOnceExecutor(override val id: Long, /** * Only support to execute sql in order, so it is problematic if more than one insert sql is submitted. */ - private def runCode(): Unit = CodeParserFactory.getCodeParser(CodeType.SQL).parse(codes).filter(StringUtils.isNotBlank).foreach { code => + protected def runCode(code: String): Unit = { if(isClosed) return val trimmedCode = StringUtils.trim(code) info(s"$getId >> " + trimmedCode) val startTime = System.currentTimeMillis - GrammarFactory.getGrammar(trimmedCode, flinkEngineConnContext).map(_.execute).getOrElse { - val tableResult = flinkEngineConnContext.getExecutionContext.wrapClassLoader(new Supplier[TableResult]{ - override def get(): TableResult = flinkEngineConnContext.getExecutionContext.getTableEnvironment.executeSql(trimmedCode) - }) - if(tableResult.getJobClient.isPresent) { - val jobClient = tableResult.getJobClient.get - jobClient match { - case adaptor: ClusterClientJobClientAdapter[ApplicationId] => - info(s"jobId is ${jobClient.getJobID.toHexString}") - clusterDescriptor.deployCluster(jobClient.getJobID, FlinkCodeOnceExecutor.getClusterClient(adaptor)) - } - this synchronized notify() - tableResult.await() - } - tableResult.getResultKind match { - case ResultKind.SUCCESS_WITH_CONTENT => - tableResult.print() - case _ => + val tableResult = flinkEngineConnContext.getExecutionContext.wrapClassLoader(new Supplier[TableResult]{ + override def get(): TableResult = flinkEngineConnContext.getExecutionContext.getTableEnvironment.executeSql(trimmedCode) + }) + if(tableResult.getJobClient.isPresent) { + val jobClient = tableResult.getJobClient.get + jobClient match { + case adaptor: ClusterClientJobClientAdapter[ApplicationId] => + info(s"jobId is ${jobClient.getJobID.toHexString}") + clusterDescriptor.deployCluster(jobClient.getJobID, FlinkCodeOnceExecutor.getClusterClient(adaptor)) } + this synchronized notify() + tableResult.await() + } + tableResult.getResultKind match { + case ResultKind.SUCCESS_WITH_CONTENT => + tableResult.print() + case _ => } info(s"Costs ${ByteTimeUtils.msDurationToString(System.currentTimeMillis - startTime)} to complete.") } diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala index 1d3f6dcd4f..3ccea0f169 100644 --- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala +++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala @@ -26,13 +26,12 @@ import com.webank.wedatasphere.linkis.engineconn.computation.executor.execute.{C import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.deployment.{ClusterDescriptorAdapterFactory, YarnSessionClusterDescriptorAdapter} import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.result.ResultKind import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.{AbstractJobOperation, JobOperation, OperationFactory} -import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.utils.SqlCommandParser +import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.parser.{SqlCommand, SqlCommandParser} import com.webank.wedatasphere.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration import com.webank.wedatasphere.linkis.engineconnplugin.flink.context.FlinkEngineConnContext import com.webank.wedatasphere.linkis.engineconnplugin.flink.exception.{ExecutorInitException, SqlParseException} import com.webank.wedatasphere.linkis.engineconnplugin.flink.listener.RowsType.RowsType import com.webank.wedatasphere.linkis.engineconnplugin.flink.listener.{FlinkStreamingResultSetListener, InteractiveFlinkStatusListener} -import com.webank.wedatasphere.linkis.manager.label.entity.cluster.EnvLabel import com.webank.wedatasphere.linkis.protocol.engine.JobProgressInfo import com.webank.wedatasphere.linkis.scheduler.executer.{ErrorExecuteResponse, ExecuteResponse, SuccessExecuteResponse} import com.webank.wedatasphere.linkis.storage.resultset.ResultSetFactory @@ -68,17 +67,17 @@ class FlinkSQLComputationExecutor(id: Long, } override def executeLine(engineExecutionContext: EngineExecutionContext, code: String): ExecuteResponse = { - val callOpt = SqlCommandParser.parse(code.trim, true) + val callOpt = SqlCommandParser.getSqlCommandParser.parse(code.trim, true) val callSQL = if (!callOpt.isPresent) throw new SqlParseException("Unknown statement: " + code) else callOpt.get RelMetadataQueryBase.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(FlinkDefaultRelMetadataProvider.INSTANCE)) - val operation = OperationFactory.createOperation(callSQL, flinkEngineConnContext) + val operation = OperationFactory.getOperationFactory.createOperation(callSQL, flinkEngineConnContext) operation match { case jobOperation: JobOperation => jobOperation.setClusterDescriptorAdapter(clusterDescriptor) this.operation = jobOperation jobOperation.addFlinkListener(new FlinkSQLStatusListener(jobOperation, engineExecutionContext)) - if(callSQL.command == SqlCommandParser.SqlCommand.SELECT) { + if(callSQL.command == SqlCommand.SELECT) { jobOperation.addFlinkListener(new FlinkSQLStreamingResultSetListener(jobOperation, engineExecutionContext)) val properties: util.Map[String, String] = engineExecutionContext.getProperties.map { case (k, v: String) => (k, v) @@ -132,10 +131,6 @@ class FlinkSQLComputationExecutor(id: Long, override def getProgressInfo: Array[JobProgressInfo] = Array.empty - - - - override def getId: String = "FlinkComputationSQL_"+ id override def close(): Unit = { diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/executormanager/FlinkExecutorManager.scala b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/executormanager/FlinkExecutorManager.scala index 2be1cccc8d..996a4425d0 100644 --- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/executormanager/FlinkExecutorManager.scala +++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/executormanager/FlinkExecutorManager.scala @@ -27,7 +27,7 @@ import com.webank.wedatasphere.linkis.manager.label.entity.Label class FlinkExecutorManager extends LabelExecutorManagerImpl{ override def getReportExecutor: Executor = if (getExecutors.isEmpty) { val labels = defaultFactory match { - case onceExecutorFactory: OnceExecutorFactory => + case _: OnceExecutorFactory => if (null == engineConn.getEngineCreationContext.getLabels()) Array.empty[Label[_]] else engineConn.getEngineCreationContext.getLabels().toArray[Label[_]](Array.empty[Label[_]]) case labelExecutorFactory: CodeLanguageLabelExecutorFactory => diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala index b52c2d70b7..459bb947ef 100644 --- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala +++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala @@ -32,6 +32,7 @@ import com.webank.wedatasphere.linkis.engineconnplugin.flink.config.FlinkEnvConf import com.webank.wedatasphere.linkis.engineconnplugin.flink.config.FlinkResourceConfiguration._ import com.webank.wedatasphere.linkis.engineconnplugin.flink.context.{EnvironmentContext, FlinkEngineConnContext} import com.webank.wedatasphere.linkis.engineconnplugin.flink.exception.FlinkInitFailedException +import com.webank.wedatasphere.linkis.engineconnplugin.flink.util.ClassUtil import com.webank.wedatasphere.linkis.manager.engineplugin.common.conf.EnvConfiguration import com.webank.wedatasphere.linkis.manager.engineplugin.common.creation.{ExecutorFactory, MultiExecutorEngineConnFactory} import com.webank.wedatasphere.linkis.manager.label.entity.Label @@ -97,8 +98,10 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging flinkConfig.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse(jobManagerMemory)) flinkConfig.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse(taskManagerMemory)) flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numberOfTaskSlots) - flinkConfig.set(MetricOptions.REPORTER_CLASS, "com.webank.ims.reporter.IMSReporter"); - flinkConfig.set(MetricOptions.REPORTER_INTERVAL, Duration.ofSeconds(60)) + if(FLINK_REPORTER_ENABLE.getValue) { + flinkConfig.set(MetricOptions.REPORTER_CLASS, FLINK_REPORTER_CLASS.getValue) + flinkConfig.set(MetricOptions.REPORTER_INTERVAL, Duration.ofMillis(FLINK_REPORTER_INTERVAL.getValue.toLong)) + } //set savePoint(设置 savePoint) val savePointPath = FLINK_SAVE_POINT_PATH.getValue(options) if (StringUtils.isNotBlank(savePointPath)) { @@ -180,7 +183,9 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging override protected def getEngineConnType: EngineType = EngineType.FLINK - private val executorFactoryArray = Array[ExecutorFactory](new FlinkSQLExecutorFactory, new FlinkApplicationExecutorFactory, new FlinkCodeExecutorFactory) + private val executorFactoryArray = Array[ExecutorFactory](ClassUtil.getInstance(classOf[FlinkSQLExecutorFactory], new FlinkSQLExecutorFactory), + ClassUtil.getInstance(classOf[FlinkApplicationExecutorFactory], new FlinkApplicationExecutorFactory), + ClassUtil.getInstance(classOf[FlinkCodeExecutorFactory], new FlinkCodeExecutorFactory)) override def getExecutorFactories: Array[ExecutorFactory] = executorFactoryArray } diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/factory/FlinkSQLExecutorFactory.scala b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/factory/FlinkSQLExecutorFactory.scala index f51b5a75a9..2d53e66a62 100644 --- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/factory/FlinkSQLExecutorFactory.scala +++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/factory/FlinkSQLExecutorFactory.scala @@ -22,17 +22,13 @@ import com.webank.wedatasphere.linkis.engineconn.common.creation.EngineCreationC import com.webank.wedatasphere.linkis.engineconn.common.engineconn.EngineConn import com.webank.wedatasphere.linkis.engineconn.computation.executor.creation.ComputationExecutorFactory import com.webank.wedatasphere.linkis.engineconn.computation.executor.execute.ComputationExecutor -import com.webank.wedatasphere.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration import com.webank.wedatasphere.linkis.engineconnplugin.flink.context.FlinkEngineConnContext import com.webank.wedatasphere.linkis.engineconnplugin.flink.executor.FlinkSQLComputationExecutor import com.webank.wedatasphere.linkis.manager.label.entity.Label - import com.webank.wedatasphere.linkis.manager.label.entity.engine.RunType import com.webank.wedatasphere.linkis.manager.label.entity.engine.RunType.RunType import org.apache.flink.yarn.configuration.YarnConfigOptions -import scala.collection.JavaConversions._ - class FlinkSQLExecutorFactory extends ComputationExecutorFactory { @@ -43,14 +39,6 @@ class FlinkSQLExecutorFactory extends ComputationExecutorFactory { case context: FlinkEngineConnContext => context.getEnvironmentContext.getFlinkConfig.set(YarnConfigOptions.PROPERTIES_FILE_LOCATION, EngineConnConf.getWorkHome) val executor = new FlinkSQLComputationExecutor(id, context) -// val containsEnvLabel = if(labels != null) labels.exists(_.isInstanceOf[EnvLabel]) else false -// if(!containsEnvLabel) { -// executor.getExecutorLabels().add(getEnvLabel(engineCreationContext)) -// } -// if(executor.getEnvLabel.getEnvType == EnvLabel.DEV) { -// context.getEnvironmentContext.getDefaultEnv -// .setExecution(Map("max-table-reFlinkSQLComputationExecutorsult-rows" -> FlinkEnvConfiguration.FLINK_SQL_DEV_SELECT_MAX_LINES.getValue.asInstanceOf[Object])) -// } executor } diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/ql/Grammar.scala b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/ql/Grammar.scala deleted file mode 100644 index d3b968ef0a..0000000000 --- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/ql/Grammar.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.webank.wedatasphere.linkis.engineconnplugin.flink.ql - -import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.Operation -import com.webank.wedatasphere.linkis.engineconnplugin.flink.context.FlinkEngineConnContext - - -trait Grammar extends Operation { - - def canParse(sql: String): Boolean - - def copy(context: FlinkEngineConnContext, sql: String): Grammar - -} diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/ql/impl/CreateTableAsSelectGrammar.scala b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/ql/impl/CreateTableAsSelectGrammar.scala deleted file mode 100644 index 4057ab1b2a..0000000000 --- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/ql/impl/CreateTableAsSelectGrammar.scala +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.webank.wedatasphere.linkis.engineconnplugin.flink.ql.impl - -import com.webank.wedatasphere.linkis.common.utils.Logging -import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.OperationUtil -import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.result.ResultSet -import com.webank.wedatasphere.linkis.engineconnplugin.flink.context.FlinkEngineConnContext -import com.webank.wedatasphere.linkis.engineconnplugin.flink.ql.Grammar -import org.apache.flink.table.api.internal.TableEnvironmentInternal - -import scala.util.matching.Regex - - -class CreateTableAsSelectGrammar(context: FlinkEngineConnContext, sql: String) extends Grammar with Logging { - - def this() = this(null, null) - - override def canParse(sql: String): Boolean = - CreateTableAsSelectGrammar.CREATE_TABLE_AS_SELECT_GRAMMAR.unapplySeq(sql).isDefined - - /** - * Execute the command and return the result. - */ - override def execute(): ResultSet = sql match { - case CreateTableAsSelectGrammar.CREATE_TABLE_AS_SELECT_GRAMMAR(_, _, tableName, _, select, sql) => - val realSql = select + " " + sql - info(s"Ready to create a table $tableName, the sql is: $realSql.") - val function = new java.util.function.Function[TableEnvironmentInternal, Unit] { - override def apply(t: TableEnvironmentInternal): Unit = { - val table = t.sqlQuery(realSql) - t.createTemporaryView(tableName, table) - } - } - context.getExecutionContext.wrapClassLoader[Unit](function) - OperationUtil.OK - } - - override def copy(context: FlinkEngineConnContext, sql: String): CreateTableAsSelectGrammar = new CreateTableAsSelectGrammar(context, sql) -} - -object CreateTableAsSelectGrammar { - - val CREATE_TABLE_AS_SELECT_GRAMMAR: Regex = "(create|CREATE)\\s+(table|TABLE)\\s+([a-zA-Z_][a-zA-Z_0-9]*)\\s+(as|AS)?\\s*(select|SELECT)(.+)".r - -} \ No newline at end of file diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/ql/GrammarFactory.scala b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/util/ClassUtil.scala similarity index 50% rename from linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/ql/GrammarFactory.scala rename to linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/util/ClassUtil.scala index baedc41886..9e62c4dddf 100644 --- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/ql/GrammarFactory.scala +++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/util/ClassUtil.scala @@ -15,24 +15,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.webank.wedatasphere.linkis.engineconnplugin.flink.ql -import com.webank.wedatasphere.linkis.common.utils.ClassUtils -import com.webank.wedatasphere.linkis.engineconnplugin.flink.context.FlinkEngineConnContext -import com.webank.wedatasphere.linkis.engineconnplugin.flink.exception.SqlExecutionException +package com.webank.wedatasphere.linkis.engineconnplugin.flink.util -import scala.collection.convert.WrapAsScala._ +import com.webank.wedatasphere.linkis.common.utils.{ClassUtils, Utils} +import com.webank.wedatasphere.linkis.engineconnplugin.flink.exception.JobExecutionException +import scala.collection.convert.wrapAsScala._ -object GrammarFactory { +object ClassUtil { - private val grammars = ClassUtils.reflections.getSubTypesOf(classOf[Grammar]).filterNot(ClassUtils.isInterfaceOrAbstract).map(_.newInstance).toArray - - def getGrammars: Array[Grammar] = grammars - - def apply(sql: String, context: FlinkEngineConnContext): Grammar = getGrammar(sql, context) - .getOrElse(throw new SqlExecutionException("Not support grammar " + sql)) - - def getGrammar(sql: String, context: FlinkEngineConnContext): Option[Grammar] = grammars.find(_.canParse(sql)).map(_.copy(context, sql)) + def getInstance[T](clazz: Class[T] , defaultValue: T): T = { + val classes = ClassUtils.reflections.getSubTypesOf(clazz).filterNot(ClassUtils.isInterfaceOrAbstract).toArray + if(classes.length <= 1) defaultValue + else if(classes.length == 2) { + val realClass = if(classes(0) == defaultValue.getClass) classes(1) else classes(0); + Utils.tryThrow(realClass.newInstance) { t => + new JobExecutionException(s"New a instance of ${clazz.getSimpleName} failed!", t); + } + } else { + throw new JobExecutionException(s"Too many subClasses of ${clazz.getSimpleName}, list: $classes."); + } + } }