> NO_OPERANDS =
+ (operands) -> Optional.of(new String[0]);
+
+ private static final int DEFAULT_PATTERN_FLAGS = Pattern.CASE_INSENSITIVE | Pattern.DOTALL;
+ }
+
+}
diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/client/sql/parser/SqlCommandCall.java b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/client/sql/parser/SqlCommandCall.java
new file mode 100644
index 0000000000..e292c5cd82
--- /dev/null
+++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/client/sql/parser/SqlCommandCall.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.parser;
+
+import java.util.Arrays;
+import java.util.Objects;
+
+public class SqlCommandCall {
+ public final SqlCommand command;
+ public final String[] operands;
+
+ public SqlCommandCall(SqlCommand command, String[] operands) {
+ this.command = command;
+ this.operands = operands;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SqlCommandCall that = (SqlCommandCall) o;
+ return command == that.command && Arrays.equals(operands, that.operands);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = Objects.hash(command);
+ result = 31 * result + Arrays.hashCode(operands);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return command + "(" + Arrays.toString(operands) + ")";
+ }
+}
diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/client/sql/parser/SqlCommandParser.java b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/client/sql/parser/SqlCommandParser.java
new file mode 100644
index 0000000000..8fcba36b17
--- /dev/null
+++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/client/sql/parser/SqlCommandParser.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.parser;
+
+import com.webank.wedatasphere.linkis.engineconnplugin.flink.exception.SqlParseException;
+
+import java.util.Optional;
+
+public interface SqlCommandParser {
+
+ /**
+ * Parse the given statement and return corresponding SqlCommandCall.
+ *
+ * only `set`, `show modules`, `show current catalog` and `show current database`
+ * are parsed through regex matching, other commands are parsed through sql parser.
+ *
+ *
throw {@link SqlParseException} if the statement contains multiple sub-statements separated by semicolon
+ * or there is a parse error.
+ *
+ *
NOTE: sql parser only parses the statement to get the corresponding SqlCommand,
+ * do not check whether the statement is valid here.
+ */
+ Optional parse(String stmt, boolean isBlinkPlanner) throws SqlParseException;
+
+ static SqlCommandParser getSqlCommandParser() {
+ return SqlCommandParserImpl.getInstance();
+ }
+
+}
diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/client/sql/parser/SqlCommandParserImpl.java b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/client/sql/parser/SqlCommandParserImpl.java
new file mode 100644
index 0000000000..aa9aad62b1
--- /dev/null
+++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/client/sql/parser/SqlCommandParserImpl.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.parser;
+
+import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.OperationFactory;
+import com.webank.wedatasphere.linkis.engineconnplugin.flink.exception.SqlParseException;
+import com.webank.wedatasphere.linkis.engineconnplugin.flink.util.ClassUtil;
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.sql.*;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.flink.sql.parser.ddl.*;
+import org.apache.flink.sql.parser.dml.RichSqlInsert;
+import org.apache.flink.sql.parser.dql.*;
+import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl;
+import org.apache.flink.sql.parser.validate.FlinkSqlConformance;
+
+import java.lang.reflect.Field;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class SqlCommandParserImpl implements SqlCommandParser {
+
+ @Override
+ public Optional parse(String stmt, boolean isBlinkPlanner) throws SqlParseException {
+ // normalize
+ String stmtForRegexMatch = stmt.trim();
+ // remove ';' at the end
+ if (stmtForRegexMatch.endsWith(";")) {
+ stmtForRegexMatch = stmtForRegexMatch.substring(0, stmtForRegexMatch.length() - 1).trim();
+ }
+
+ // only parse gateway specific statements
+ for (SqlCommand cmd : SqlCommand.values()) {
+ if (cmd.hasPattern()) {
+ final Matcher matcher = cmd.getPattern().matcher(stmtForRegexMatch);
+ if (matcher.matches()) {
+ final String[] groups = new String[matcher.groupCount()];
+ for (int i = 0; i < groups.length; i++) {
+ groups[i] = matcher.group(i + 1);
+ }
+ return cmd.getOperandConverter().apply(groups)
+ .map((operands) -> new SqlCommandCall(cmd, operands));
+ }
+ }
+ }
+
+ return parseStmt(stmt, isBlinkPlanner);
+ }
+
+ /**
+ * Flink Parser only supports partial Operations, so we directly use Calcite Parser here.
+ * Once Flink Parser supports all Operations, we should use Flink Parser instead of Calcite Parser.
+ */
+ private Optional parseStmt(String stmt, boolean isBlinkPlanner) throws SqlParseException {
+ SqlParser.Config config = createSqlParserConfig(isBlinkPlanner);
+ SqlParser sqlParser = SqlParser.create(stmt, config);
+ SqlNodeList sqlNodes;
+ try {
+ sqlNodes = sqlParser.parseStmtList();
+ // no need check the statement is valid here
+ } catch (org.apache.calcite.sql.parser.SqlParseException e) {
+ throw new SqlParseException("Failed to parse statement.", e);
+ }
+ if (sqlNodes.size() != 1) {
+ throw new SqlParseException("Only single statement is supported now");
+ }
+
+ final String[] operands;
+ final SqlCommand cmd;
+ SqlNode node = sqlNodes.get(0);
+ if (node.getKind().belongsTo(SqlKind.QUERY)) {
+ cmd = SqlCommand.SELECT;
+ operands = new String[] { stmt };
+ } else if (node instanceof RichSqlInsert) {
+ RichSqlInsert insertNode = (RichSqlInsert) node;
+ cmd = insertNode.isOverwrite() ? SqlCommand.INSERT_OVERWRITE : SqlCommand.INSERT_INTO;
+ operands = new String[] { stmt, insertNode.getTargetTable().toString() };
+ } else if (node instanceof SqlShowTables) {
+ cmd = SqlCommand.SHOW_TABLES;
+ operands = new String[0];
+ } else if (node instanceof SqlCreateTable) {
+ cmd = SqlCommand.CREATE_TABLE;
+ operands = new String[] { stmt };
+ } else if (node instanceof SqlDropTable) {
+ cmd = SqlCommand.DROP_TABLE;
+ operands = new String[] { stmt };
+ } else if (node instanceof SqlAlterTable) {
+ cmd = SqlCommand.ALTER_TABLE;
+ operands = new String[] { stmt };
+ } else if (node instanceof SqlCreateView) {
+ // TableEnvironment currently does not support creating view
+ // so we have to perform the modification here
+ SqlCreateView createViewNode = (SqlCreateView) node;
+ cmd = SqlCommand.CREATE_VIEW;
+ operands = new String[] {
+ createViewNode.getViewName().toString(),
+ createViewNode.getQuery().toString()
+ };
+ } else if (node instanceof SqlDropView) {
+ // TableEnvironment currently does not support dropping view
+ // so we have to perform the modification here
+ SqlDropView dropViewNode = (SqlDropView) node;
+
+ Field ifExistsField;
+ try {
+ ifExistsField = SqlDrop.class.getDeclaredField("ifExists");
+ } catch (NoSuchFieldException e) {
+ throw new SqlParseException("Failed to parse drop view statement.", e);
+ }
+ ifExistsField.setAccessible(true);
+ boolean ifExists;
+ try {
+ ifExists = ifExistsField.getBoolean(dropViewNode);
+ } catch (IllegalAccessException e) {
+ throw new SqlParseException("Failed to parse drop view statement.", e);
+ }
+
+ cmd = SqlCommand.DROP_VIEW;
+ operands = new String[] { dropViewNode.getViewName().toString(), String.valueOf(ifExists) };
+ } else if (node instanceof SqlShowDatabases) {
+ cmd = SqlCommand.SHOW_DATABASES;
+ operands = new String[0];
+ } else if (node instanceof SqlCreateDatabase) {
+ cmd = SqlCommand.CREATE_DATABASE;
+ operands = new String[] { stmt };
+ } else if (node instanceof SqlDropDatabase) {
+ cmd = SqlCommand.DROP_DATABASE;
+ operands = new String[] { stmt };
+ } else if (node instanceof SqlAlterDatabase) {
+ cmd = SqlCommand.ALTER_DATABASE;
+ operands = new String[] { stmt };
+ } else if (node instanceof SqlShowCatalogs) {
+ cmd = SqlCommand.SHOW_CATALOGS;
+ operands = new String[0];
+ } else if (node instanceof SqlShowFunctions) {
+ cmd = SqlCommand.SHOW_FUNCTIONS;
+ operands = new String[0];
+ } else if (node instanceof SqlUseCatalog) {
+ cmd = SqlCommand.USE_CATALOG;
+ operands = new String[] { ((SqlUseCatalog) node).getCatalogName().getSimple() };
+ } else if (node instanceof SqlUseDatabase) {
+ cmd = SqlCommand.USE;
+ operands = new String[] { ((SqlUseDatabase) node).getDatabaseName().toString() };
+ } else if (node instanceof SqlRichDescribeTable) {
+ cmd = SqlCommand.DESCRIBE_TABLE;
+ // TODO support describe extended
+ String[] fullTableName = ((SqlRichDescribeTable) node).fullTableName();
+ String escapedName =
+ Stream.of(fullTableName).map(s -> "`" + s + "`").collect(Collectors.joining("."));
+ operands = new String[] { escapedName };
+ } else if (node instanceof SqlExplain) {
+ cmd = SqlCommand.EXPLAIN;
+ // TODO support explain details
+ operands = new String[] { ((SqlExplain) node).getExplicandum().toString() };
+ } else if (node instanceof SqlSetOption) {
+ SqlSetOption setNode = (SqlSetOption) node;
+ // refer to SqlSetOption#unparseAlterOperation
+ if (setNode.getValue() != null) {
+ cmd = SqlCommand.SET;
+ operands = new String[] { setNode.getName().toString(), setNode.getValue().toString() };
+ } else {
+ cmd = SqlCommand.RESET;
+ if ("ALL".equals(setNode.getName().toString().toUpperCase())) {
+ operands = new String[0];
+ } else {
+ operands = new String[] { setNode.getName().toString() };
+ }
+ }
+ } else {
+ cmd = null;
+ operands = new String[0];
+ }
+
+ if (cmd == null) {
+ return Optional.empty();
+ } else {
+ // use the origin given statement to make sure
+ // users can find the correct line number when parsing failed
+ return Optional.of(new SqlCommandCall(cmd, operands));
+ }
+ }
+
+ /**
+ * A temporary solution. We can't get the default SqlParser config through table environment now.
+ */
+ private SqlParser.Config createSqlParserConfig(boolean isBlinkPlanner) {
+ SqlParser.Config config = SqlParser
+ .config()
+ .withParserFactory(FlinkSqlParserImpl.FACTORY)
+ .withConformance(FlinkSqlConformance.DEFAULT)
+ .withLex(Lex.JAVA);
+ if (isBlinkPlanner) {
+ return config.withIdentifierMaxLength(256);
+ } else {
+ return config;
+ }
+ }
+
+ private static SqlCommandParser sqlCommandParser;
+
+ public static SqlCommandParser getInstance() {
+ if(sqlCommandParser == null) {
+ synchronized (OperationFactory.class) {
+ if(sqlCommandParser == null) {
+ sqlCommandParser = ClassUtil.getInstance(SqlCommandParser.class, new SqlCommandParserImpl());
+ }
+ }
+ }
+ return sqlCommandParser;
+ }
+
+}
diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/client/utils/SqlCommandParser.java b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/client/utils/SqlCommandParser.java
deleted file mode 100644
index 9ca6b5459a..0000000000
--- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/client/utils/SqlCommandParser.java
+++ /dev/null
@@ -1,411 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.webank.wedatasphere.linkis.engineconnplugin.flink.client.utils;
-
-import com.webank.wedatasphere.linkis.engineconnplugin.flink.exception.SqlParseException;
-import com.webank.wedatasphere.linkis.engineconnplugin.flink.ql.GrammarFactory;
-import java.lang.reflect.Field;
-import java.util.Arrays;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.function.Function;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import org.apache.calcite.config.Lex;
-import org.apache.calcite.sql.SqlDrop;
-import org.apache.calcite.sql.SqlExplain;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlNodeList;
-import org.apache.calcite.sql.SqlSetOption;
-import org.apache.calcite.sql.parser.SqlParser;
-import org.apache.flink.sql.parser.ddl.SqlAlterDatabase;
-import org.apache.flink.sql.parser.ddl.SqlAlterTable;
-import org.apache.flink.sql.parser.ddl.SqlCreateDatabase;
-import org.apache.flink.sql.parser.ddl.SqlCreateTable;
-import org.apache.flink.sql.parser.ddl.SqlCreateView;
-import org.apache.flink.sql.parser.ddl.SqlDropDatabase;
-import org.apache.flink.sql.parser.ddl.SqlDropTable;
-import org.apache.flink.sql.parser.ddl.SqlDropView;
-import org.apache.flink.sql.parser.ddl.SqlUseCatalog;
-import org.apache.flink.sql.parser.ddl.SqlUseDatabase;
-import org.apache.flink.sql.parser.dml.RichSqlInsert;
-import org.apache.flink.sql.parser.dql.SqlRichDescribeTable;
-import org.apache.flink.sql.parser.dql.SqlShowCatalogs;
-import org.apache.flink.sql.parser.dql.SqlShowDatabases;
-import org.apache.flink.sql.parser.dql.SqlShowFunctions;
-import org.apache.flink.sql.parser.dql.SqlShowTables;
-import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl;
-import org.apache.flink.sql.parser.validate.FlinkSqlConformance;
-
-/**
- * Simple parser for determining the type of command and its parameters.
- */
-public final class SqlCommandParser {
-
- private SqlCommandParser() {
- // private
- }
-
- /**
- * Parse the given statement and return corresponding SqlCommandCall.
- *
- * only `set`, `show modules`, `show current catalog` and `show current database`
- * are parsed through regex matching, other commands are parsed through sql parser.
- *
- *
throw {@link SqlParseException} if the statement contains multiple sub-statements separated by semicolon
- * or there is a parse error.
- *
- *
NOTE: sql parser only parses the statement to get the corresponding SqlCommand,
- * do not check whether the statement is valid here.
- */
- public static Optional parse(String stmt, boolean isBlinkPlanner) throws SqlParseException {
- Optional call = Arrays.stream(GrammarFactory.getGrammars()).filter(grammar -> grammar.canParse(stmt)).findFirst()
- .map(grammar -> new SqlCommandCall(SqlCommand.LINKIS_GRAMMAR, new String[] {stmt}));
- if(call.isPresent()) {
- return call;
- }
- // normalize
- String stmtForRegexMatch = stmt.trim();
- // remove ';' at the end
- if (stmtForRegexMatch.endsWith(";")) {
- stmtForRegexMatch = stmtForRegexMatch.substring(0, stmtForRegexMatch.length() - 1).trim();
- }
-
- // only parse gateway specific statements
- for (SqlCommand cmd : SqlCommand.values()) {
- if (cmd.hasPattern()) {
- final Matcher matcher = cmd.pattern.matcher(stmtForRegexMatch);
- if (matcher.matches()) {
- final String[] groups = new String[matcher.groupCount()];
- for (int i = 0; i < groups.length; i++) {
- groups[i] = matcher.group(i + 1);
- }
- return cmd.operandConverter.apply(groups)
- .map((operands) -> new SqlCommandCall(cmd, operands));
- }
- }
- }
-
- return parseStmt(stmt, isBlinkPlanner);
- }
-
- /**
- * Flink Parser only supports partial Operations, so we directly use Calcite Parser here.
- * Once Flink Parser supports all Operations, we should use Flink Parser instead of Calcite Parser.
- */
- private static Optional parseStmt(String stmt, boolean isBlinkPlanner) throws SqlParseException {
- SqlParser.Config config = createSqlParserConfig(isBlinkPlanner);
- SqlParser sqlParser = SqlParser.create(stmt, config);
- SqlNodeList sqlNodes;
- try {
- sqlNodes = sqlParser.parseStmtList();
- // no need check the statement is valid here
- } catch (org.apache.calcite.sql.parser.SqlParseException e) {
- throw new SqlParseException("Failed to parse statement.", e);
- }
- if (sqlNodes.size() != 1) {
- throw new SqlParseException("Only single statement is supported now");
- }
-
- final String[] operands;
- final SqlCommand cmd;
- SqlNode node = sqlNodes.get(0);
- if (node.getKind().belongsTo(SqlKind.QUERY)) {
- cmd = SqlCommand.SELECT;
- operands = new String[] { stmt };
- } else if (node instanceof RichSqlInsert) {
- RichSqlInsert insertNode = (RichSqlInsert) node;
- cmd = insertNode.isOverwrite() ? SqlCommand.INSERT_OVERWRITE : SqlCommand.INSERT_INTO;
- operands = new String[] { stmt, insertNode.getTargetTable().toString() };
- } else if (node instanceof SqlShowTables) {
- cmd = SqlCommand.SHOW_TABLES;
- operands = new String[0];
- } else if (node instanceof SqlCreateTable) {
- cmd = SqlCommand.CREATE_TABLE;
- operands = new String[] { stmt };
- } else if (node instanceof SqlDropTable) {
- cmd = SqlCommand.DROP_TABLE;
- operands = new String[] { stmt };
- } else if (node instanceof SqlAlterTable) {
- cmd = SqlCommand.ALTER_TABLE;
- operands = new String[] { stmt };
- } else if (node instanceof SqlCreateView) {
- // TableEnvironment currently does not support creating view
- // so we have to perform the modification here
- SqlCreateView createViewNode = (SqlCreateView) node;
- cmd = SqlCommand.CREATE_VIEW;
- operands = new String[] {
- createViewNode.getViewName().toString(),
- createViewNode.getQuery().toString()
- };
- } else if (node instanceof SqlDropView) {
- // TableEnvironment currently does not support dropping view
- // so we have to perform the modification here
- SqlDropView dropViewNode = (SqlDropView) node;
-
- // TODO: we can't get this field from SqlDropView normally until FLIP-71 is implemented
- Field ifExistsField;
- try {
- ifExistsField = SqlDrop.class.getDeclaredField("ifExists");
- } catch (NoSuchFieldException e) {
- throw new SqlParseException("Failed to parse drop view statement.", e);
- }
- ifExistsField.setAccessible(true);
- boolean ifExists;
- try {
- ifExists = ifExistsField.getBoolean(dropViewNode);
- } catch (IllegalAccessException e) {
- throw new SqlParseException("Failed to parse drop view statement.", e);
- }
-
- cmd = SqlCommand.DROP_VIEW;
- operands = new String[] { dropViewNode.getViewName().toString(), String.valueOf(ifExists) };
- } else if (node instanceof SqlShowDatabases) {
- cmd = SqlCommand.SHOW_DATABASES;
- operands = new String[0];
- } else if (node instanceof SqlCreateDatabase) {
- cmd = SqlCommand.CREATE_DATABASE;
- operands = new String[] { stmt };
- } else if (node instanceof SqlDropDatabase) {
- cmd = SqlCommand.DROP_DATABASE;
- operands = new String[] { stmt };
- } else if (node instanceof SqlAlterDatabase) {
- cmd = SqlCommand.ALTER_DATABASE;
- operands = new String[] { stmt };
- } else if (node instanceof SqlShowCatalogs) {
- cmd = SqlCommand.SHOW_CATALOGS;
- operands = new String[0];
- } else if (node instanceof SqlShowFunctions) {
- cmd = SqlCommand.SHOW_FUNCTIONS;
- operands = new String[0];
- } else if (node instanceof SqlUseCatalog) {
- cmd = SqlCommand.USE_CATALOG;
- operands = new String[] { ((SqlUseCatalog) node).getCatalogName().getSimple() };
- } else if (node instanceof SqlUseDatabase) {
- cmd = SqlCommand.USE;
- operands = new String[] { ((SqlUseDatabase) node).getDatabaseName().toString() };
- } else if (node instanceof SqlRichDescribeTable) {
- cmd = SqlCommand.DESCRIBE_TABLE;
- // TODO support describe extended
- String[] fullTableName = ((SqlRichDescribeTable) node).fullTableName();
- String escapedName =
- Stream.of(fullTableName).map(s -> "`" + s + "`").collect(Collectors.joining("."));
- operands = new String[] { escapedName };
- } else if (node instanceof SqlExplain) {
- cmd = SqlCommand.EXPLAIN;
- // TODO support explain details
- operands = new String[] { ((SqlExplain) node).getExplicandum().toString() };
- } else if (node instanceof SqlSetOption) {
- SqlSetOption setNode = (SqlSetOption) node;
- // refer to SqlSetOption#unparseAlterOperation
- if (setNode.getValue() != null) {
- cmd = SqlCommand.SET;
- operands = new String[] { setNode.getName().toString(), setNode.getValue().toString() };
- } else {
- cmd = SqlCommand.RESET;
- if (setNode.getName().toString().toUpperCase().equals("ALL")) {
- operands = new String[0];
- } else {
- operands = new String[] { setNode.getName().toString() };
- }
- }
- } else {
- cmd = null;
- operands = new String[0];
- }
-
- if (cmd == null) {
- return Optional.empty();
- } else {
- // use the origin given statement to make sure
- // users can find the correct line number when parsing failed
- return Optional.of(new SqlCommandCall(cmd, operands));
- }
- }
-
- /**
- * A temporary solution. We can't get the default SqlParser config through table environment now.
- */
- private static SqlParser.Config createSqlParserConfig(boolean isBlinkPlanner) {
- if (isBlinkPlanner) {
- return SqlParser
- .configBuilder()
- .setParserFactory(FlinkSqlParserImpl.FACTORY)
- .setConformance(FlinkSqlConformance.DEFAULT)
- .setLex(Lex.JAVA)
- .setIdentifierMaxLength(256)
- .build();
- } else {
- return SqlParser
- .configBuilder()
- .setParserFactory(FlinkSqlParserImpl.FACTORY)
- .setConformance(FlinkSqlConformance.DEFAULT)
- .setLex(Lex.JAVA)
- .build();
- }
- }
-
- // --------------------------------------------------------------------------------------------
-
- private static final Function> NO_OPERANDS =
- (operands) -> Optional.of(new String[0]);
-
- private static final int DEFAULT_PATTERN_FLAGS = Pattern.CASE_INSENSITIVE | Pattern.DOTALL;
-
- /**
- * Supported SQL commands.
- */
- public enum SqlCommand {
- LINKIS_GRAMMAR,
-
- SELECT,
-
- INSERT_INTO,
-
- INSERT_OVERWRITE,
-
- CREATE_TABLE,
-
- ALTER_TABLE,
-
- DROP_TABLE,
-
- CREATE_VIEW,
-
- DROP_VIEW,
-
- CREATE_DATABASE,
-
- ALTER_DATABASE,
-
- DROP_DATABASE,
-
- USE_CATALOG,
-
- USE,
-
- SHOW_CATALOGS,
-
- SHOW_DATABASES,
-
- SHOW_TABLES,
-
- SHOW_FUNCTIONS,
-
- EXPLAIN,
-
- DESCRIBE_TABLE,
-
- RESET,
-
- // the following commands are not supported by SQL parser but are needed by users
-
- SET(
- "SET",
- // `SET` with operands can be parsed by SQL parser
- // we keep `SET` with no operands here to print all properties
- NO_OPERANDS),
-
- // the following commands will be supported by SQL parser in the future
- // remove them once they're supported
-
- // FLINK-17396
- SHOW_MODULES(
- "SHOW\\s+MODULES",
- NO_OPERANDS),
-
- // FLINK-17111
- SHOW_VIEWS(
- "SHOW\\s+VIEWS",
- NO_OPERANDS),
-
- // the following commands are not supported by SQL parser but are needed by JDBC driver
- // these should not be exposed to the user and should be used internally
-
- SHOW_CURRENT_CATALOG(
- "SHOW\\s+CURRENT\\s+CATALOG",
- NO_OPERANDS),
-
- SHOW_CURRENT_DATABASE(
- "SHOW\\s+CURRENT\\s+DATABASE",
- NO_OPERANDS);
-
- public final Pattern pattern;
- public final Function> operandConverter;
-
- SqlCommand(String matchingRegex, Function> operandConverter) {
- this.pattern = Pattern.compile(matchingRegex, DEFAULT_PATTERN_FLAGS);
- this.operandConverter = operandConverter;
- }
-
- SqlCommand() {
- this.pattern = null;
- this.operandConverter = null;
- }
-
- @Override
- public String toString() {
- return super.toString().replace('_', ' ');
- }
-
- boolean hasPattern() {
- return pattern != null;
- }
- }
-
- /**
- * Call of SQL command with operands and command type.
- */
- public static class SqlCommandCall {
- public final SqlCommand command;
- public final String[] operands;
-
- public SqlCommandCall(SqlCommand command, String[] operands) {
- this.command = command;
- this.operands = operands;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- SqlCommandCall that = (SqlCommandCall) o;
- return command == that.command && Arrays.equals(operands, that.operands);
- }
-
- @Override
- public int hashCode() {
- int result = Objects.hash(command);
- result = 31 * result + Arrays.hashCode(operands);
- return result;
- }
-
- @Override
- public String toString() {
- return command + "(" + Arrays.toString(operands) + ")";
- }
- }
-}
diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/util/RetryUtil.java b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/util/RetryUtil.java
deleted file mode 100644
index fbaf364708..0000000000
--- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/com/webank/wedatasphere/linkis/engineconnplugin/flink/util/RetryUtil.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.webank.wedatasphere.linkis.engineconnplugin.flink.util;
-
-import com.github.rholder.retry.RetryException;
-import com.github.rholder.retry.Retryer;
-import com.github.rholder.retry.RetryerBuilder;
-import com.github.rholder.retry.StopStrategies;
-import com.github.rholder.retry.WaitStrategies;
-import com.google.common.base.Predicate;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A retry Util-class based on guava-retry(基于guava-retry的重试工具类)
- * Retry Tool class based on guava-retry
- */
-public class RetryUtil {
-
- private static final Logger logger = LoggerFactory.getLogger(RetryUtil.class);
-
- /**
- * @param task task pending for retry(重试执行得任务)
- * @param predicate retry predication(符合预期结果需要重试)
- * @param fixedWaitTime time interval between retries(本次重试与上次重试之间的固定间隔时长)
- * @param maxEachExecuTime maxium time for a retry(一次重试的最大执行的时间)
- * @param attemptNumber number of retry attempts(重试次数)
- */
-
- public static T retry(Callable task, Predicate predicate, long fixedWaitTime, long maxEachExecuTime,
- TimeUnit timeUnit, int attemptNumber) {
- Retryer retryer = RetryerBuilder
- .newBuilder()
- // Will retry: runtime exception; checked exception. May not retry: error (抛出runtime异常、checked异常时都会重试,但是抛出error不会重试。)
- .retryIfException()
- // if predication is met, then retry(对执行结果的预期。符合预期就重试)
- .retryIfResult(predicate)
- // fixed waiting time for retry(每次重试固定等待fixedWaitTime时间)
- .withWaitStrategy(WaitStrategies.fixedWait(fixedWaitTime, timeUnit))
- // number of retry attempts(尝试次数)
- .withStopStrategy(StopStrategies.stopAfterAttempt(attemptNumber))
- .build();
- T t = null;
- try {
- t = retryer.call(task);
- } catch (ExecutionException e) {
- logger.error("", e);
- } catch (RetryException e) {
- logger.error("", e);
- }
- return t;
- }
-
-}
\ No newline at end of file
diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/FlinkEngineConnPlugin.scala b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/FlinkEngineConnPlugin.scala
index 71aa76f03e..9765c28054 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/FlinkEngineConnPlugin.scala
+++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/FlinkEngineConnPlugin.scala
@@ -36,12 +36,7 @@ class FlinkEngineConnPlugin extends EngineConnPlugin {
private val EP_CONTEXT_CONSTRUCTOR_LOCK = new Object()
- override def init(params: java.util.Map[String, Any]): Unit = {
- //do noting
-// engineResourceFactory = new FlinkEngineConnResourceFactory
-// engineConnLaunchBuilder = new FlinkEngineConnLaunchBuilder
-// engineConnFactory = new FlinkEngineConnFactory
- }
+ override def init(params: java.util.Map[String, Any]): Unit = {}
override def getEngineResourceFactory: EngineResourceFactory = {
EP_CONTEXT_CONSTRUCTOR_LOCK.synchronized{
diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala
index edd32fda17..772b6c74c5 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala
+++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala
@@ -52,4 +52,8 @@ object FlinkEnvConfiguration {
val FLINK_CLIENT_REQUEST_TIMEOUT = CommonVars("flink.client.request.timeout", new TimeType("30s"))
val FLINK_ONCE_APP_STATUS_FETCH_INTERVAL = CommonVars("flink.app.fetch.status.interval", new TimeType("5s"))
+ val FLINK_REPORTER_ENABLE = CommonVars("linkis.flink.reporter.enable", false)
+ val FLINK_REPORTER_CLASS = CommonVars("linkis.flink.reporter.class", "")
+ val FLINK_REPORTER_INTERVAL = CommonVars("linkis.flink.reporter.interval", new TimeType("60s"))
+
}
diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/executor/FlinkCodeOnceExecutor.scala b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/executor/FlinkCodeOnceExecutor.scala
index 14b303f158..04a630de02 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/executor/FlinkCodeOnceExecutor.scala
+++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/executor/FlinkCodeOnceExecutor.scala
@@ -26,7 +26,6 @@ import com.webank.wedatasphere.linkis.engineconn.once.executor.OnceExecutorExecu
import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.deployment.YarnPerJobClusterDescriptorAdapter
import com.webank.wedatasphere.linkis.engineconnplugin.flink.context.FlinkEngineConnContext
import com.webank.wedatasphere.linkis.engineconnplugin.flink.exception.FlinkInitFailedException
-import com.webank.wedatasphere.linkis.engineconnplugin.flink.ql.GrammarFactory
import com.webank.wedatasphere.linkis.governance.common.paser.{CodeParserFactory, CodeType}
import com.webank.wedatasphere.linkis.protocol.constants.TaskConstant
import com.webank.wedatasphere.linkis.scheduler.executer.ErrorExecuteResponse
@@ -63,7 +62,7 @@ class FlinkCodeOnceExecutor(override val id: Long,
future = Utils.defaultScheduler.submit(new Runnable {
override def run(): Unit = {
info("Try to execute codes.")
- Utils.tryCatch(runCode()){ t =>
+ Utils.tryCatch(CodeParserFactory.getCodeParser(CodeType.SQL).parse(codes).filter(StringUtils.isNotBlank).foreach(runCode)){ t =>
error("Run code failed!", t)
setResponse(ErrorExecuteResponse("Run code failed!", t))
tryFailed()
@@ -81,30 +80,28 @@ class FlinkCodeOnceExecutor(override val id: Long,
/**
* Only support to execute sql in order, so it is problematic if more than one insert sql is submitted.
*/
- private def runCode(): Unit = CodeParserFactory.getCodeParser(CodeType.SQL).parse(codes).filter(StringUtils.isNotBlank).foreach { code =>
+ protected def runCode(code: String): Unit = {
if(isClosed) return
val trimmedCode = StringUtils.trim(code)
info(s"$getId >> " + trimmedCode)
val startTime = System.currentTimeMillis
- GrammarFactory.getGrammar(trimmedCode, flinkEngineConnContext).map(_.execute).getOrElse {
- val tableResult = flinkEngineConnContext.getExecutionContext.wrapClassLoader(new Supplier[TableResult]{
- override def get(): TableResult = flinkEngineConnContext.getExecutionContext.getTableEnvironment.executeSql(trimmedCode)
- })
- if(tableResult.getJobClient.isPresent) {
- val jobClient = tableResult.getJobClient.get
- jobClient match {
- case adaptor: ClusterClientJobClientAdapter[ApplicationId] =>
- info(s"jobId is ${jobClient.getJobID.toHexString}")
- clusterDescriptor.deployCluster(jobClient.getJobID, FlinkCodeOnceExecutor.getClusterClient(adaptor))
- }
- this synchronized notify()
- tableResult.await()
- }
- tableResult.getResultKind match {
- case ResultKind.SUCCESS_WITH_CONTENT =>
- tableResult.print()
- case _ =>
+ val tableResult = flinkEngineConnContext.getExecutionContext.wrapClassLoader(new Supplier[TableResult]{
+ override def get(): TableResult = flinkEngineConnContext.getExecutionContext.getTableEnvironment.executeSql(trimmedCode)
+ })
+ if(tableResult.getJobClient.isPresent) {
+ val jobClient = tableResult.getJobClient.get
+ jobClient match {
+ case adaptor: ClusterClientJobClientAdapter[ApplicationId] =>
+ info(s"jobId is ${jobClient.getJobID.toHexString}")
+ clusterDescriptor.deployCluster(jobClient.getJobID, FlinkCodeOnceExecutor.getClusterClient(adaptor))
}
+ this synchronized notify()
+ tableResult.await()
+ }
+ tableResult.getResultKind match {
+ case ResultKind.SUCCESS_WITH_CONTENT =>
+ tableResult.print()
+ case _ =>
}
info(s"Costs ${ByteTimeUtils.msDurationToString(System.currentTimeMillis - startTime)} to complete.")
}
diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala
index 1d3f6dcd4f..3ccea0f169 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala
+++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala
@@ -26,13 +26,12 @@ import com.webank.wedatasphere.linkis.engineconn.computation.executor.execute.{C
import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.deployment.{ClusterDescriptorAdapterFactory, YarnSessionClusterDescriptorAdapter}
import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.result.ResultKind
import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.{AbstractJobOperation, JobOperation, OperationFactory}
-import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.utils.SqlCommandParser
+import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.parser.{SqlCommand, SqlCommandParser}
import com.webank.wedatasphere.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration
import com.webank.wedatasphere.linkis.engineconnplugin.flink.context.FlinkEngineConnContext
import com.webank.wedatasphere.linkis.engineconnplugin.flink.exception.{ExecutorInitException, SqlParseException}
import com.webank.wedatasphere.linkis.engineconnplugin.flink.listener.RowsType.RowsType
import com.webank.wedatasphere.linkis.engineconnplugin.flink.listener.{FlinkStreamingResultSetListener, InteractiveFlinkStatusListener}
-import com.webank.wedatasphere.linkis.manager.label.entity.cluster.EnvLabel
import com.webank.wedatasphere.linkis.protocol.engine.JobProgressInfo
import com.webank.wedatasphere.linkis.scheduler.executer.{ErrorExecuteResponse, ExecuteResponse, SuccessExecuteResponse}
import com.webank.wedatasphere.linkis.storage.resultset.ResultSetFactory
@@ -68,17 +67,17 @@ class FlinkSQLComputationExecutor(id: Long,
}
override def executeLine(engineExecutionContext: EngineExecutionContext, code: String): ExecuteResponse = {
- val callOpt = SqlCommandParser.parse(code.trim, true)
+ val callOpt = SqlCommandParser.getSqlCommandParser.parse(code.trim, true)
val callSQL = if (!callOpt.isPresent) throw new SqlParseException("Unknown statement: " + code)
else callOpt.get
RelMetadataQueryBase.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(FlinkDefaultRelMetadataProvider.INSTANCE))
- val operation = OperationFactory.createOperation(callSQL, flinkEngineConnContext)
+ val operation = OperationFactory.getOperationFactory.createOperation(callSQL, flinkEngineConnContext)
operation match {
case jobOperation: JobOperation =>
jobOperation.setClusterDescriptorAdapter(clusterDescriptor)
this.operation = jobOperation
jobOperation.addFlinkListener(new FlinkSQLStatusListener(jobOperation, engineExecutionContext))
- if(callSQL.command == SqlCommandParser.SqlCommand.SELECT) {
+ if(callSQL.command == SqlCommand.SELECT) {
jobOperation.addFlinkListener(new FlinkSQLStreamingResultSetListener(jobOperation, engineExecutionContext))
val properties: util.Map[String, String] = engineExecutionContext.getProperties.map {
case (k, v: String) => (k, v)
@@ -132,10 +131,6 @@ class FlinkSQLComputationExecutor(id: Long,
override def getProgressInfo: Array[JobProgressInfo] = Array.empty
-
-
-
-
override def getId: String = "FlinkComputationSQL_"+ id
override def close(): Unit = {
diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/executormanager/FlinkExecutorManager.scala b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/executormanager/FlinkExecutorManager.scala
index 2be1cccc8d..996a4425d0 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/executormanager/FlinkExecutorManager.scala
+++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/executormanager/FlinkExecutorManager.scala
@@ -27,7 +27,7 @@ import com.webank.wedatasphere.linkis.manager.label.entity.Label
class FlinkExecutorManager extends LabelExecutorManagerImpl{
override def getReportExecutor: Executor = if (getExecutors.isEmpty) {
val labels = defaultFactory match {
- case onceExecutorFactory: OnceExecutorFactory =>
+ case _: OnceExecutorFactory =>
if (null == engineConn.getEngineCreationContext.getLabels()) Array.empty[Label[_]]
else engineConn.getEngineCreationContext.getLabels().toArray[Label[_]](Array.empty[Label[_]])
case labelExecutorFactory: CodeLanguageLabelExecutorFactory =>
diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala
index b52c2d70b7..459bb947ef 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala
+++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala
@@ -32,6 +32,7 @@ import com.webank.wedatasphere.linkis.engineconnplugin.flink.config.FlinkEnvConf
import com.webank.wedatasphere.linkis.engineconnplugin.flink.config.FlinkResourceConfiguration._
import com.webank.wedatasphere.linkis.engineconnplugin.flink.context.{EnvironmentContext, FlinkEngineConnContext}
import com.webank.wedatasphere.linkis.engineconnplugin.flink.exception.FlinkInitFailedException
+import com.webank.wedatasphere.linkis.engineconnplugin.flink.util.ClassUtil
import com.webank.wedatasphere.linkis.manager.engineplugin.common.conf.EnvConfiguration
import com.webank.wedatasphere.linkis.manager.engineplugin.common.creation.{ExecutorFactory, MultiExecutorEngineConnFactory}
import com.webank.wedatasphere.linkis.manager.label.entity.Label
@@ -97,8 +98,10 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
flinkConfig.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse(jobManagerMemory))
flinkConfig.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse(taskManagerMemory))
flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numberOfTaskSlots)
- flinkConfig.set(MetricOptions.REPORTER_CLASS, "com.webank.ims.reporter.IMSReporter");
- flinkConfig.set(MetricOptions.REPORTER_INTERVAL, Duration.ofSeconds(60))
+ if(FLINK_REPORTER_ENABLE.getValue) {
+ flinkConfig.set(MetricOptions.REPORTER_CLASS, FLINK_REPORTER_CLASS.getValue)
+ flinkConfig.set(MetricOptions.REPORTER_INTERVAL, Duration.ofMillis(FLINK_REPORTER_INTERVAL.getValue.toLong))
+ }
//set savePoint(设置 savePoint)
val savePointPath = FLINK_SAVE_POINT_PATH.getValue(options)
if (StringUtils.isNotBlank(savePointPath)) {
@@ -180,7 +183,9 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
override protected def getEngineConnType: EngineType = EngineType.FLINK
- private val executorFactoryArray = Array[ExecutorFactory](new FlinkSQLExecutorFactory, new FlinkApplicationExecutorFactory, new FlinkCodeExecutorFactory)
+ private val executorFactoryArray = Array[ExecutorFactory](ClassUtil.getInstance(classOf[FlinkSQLExecutorFactory], new FlinkSQLExecutorFactory),
+ ClassUtil.getInstance(classOf[FlinkApplicationExecutorFactory], new FlinkApplicationExecutorFactory),
+ ClassUtil.getInstance(classOf[FlinkCodeExecutorFactory], new FlinkCodeExecutorFactory))
override def getExecutorFactories: Array[ExecutorFactory] = executorFactoryArray
}
diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/factory/FlinkSQLExecutorFactory.scala b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/factory/FlinkSQLExecutorFactory.scala
index f51b5a75a9..2d53e66a62 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/factory/FlinkSQLExecutorFactory.scala
+++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/factory/FlinkSQLExecutorFactory.scala
@@ -22,17 +22,13 @@ import com.webank.wedatasphere.linkis.engineconn.common.creation.EngineCreationC
import com.webank.wedatasphere.linkis.engineconn.common.engineconn.EngineConn
import com.webank.wedatasphere.linkis.engineconn.computation.executor.creation.ComputationExecutorFactory
import com.webank.wedatasphere.linkis.engineconn.computation.executor.execute.ComputationExecutor
-import com.webank.wedatasphere.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration
import com.webank.wedatasphere.linkis.engineconnplugin.flink.context.FlinkEngineConnContext
import com.webank.wedatasphere.linkis.engineconnplugin.flink.executor.FlinkSQLComputationExecutor
import com.webank.wedatasphere.linkis.manager.label.entity.Label
-
import com.webank.wedatasphere.linkis.manager.label.entity.engine.RunType
import com.webank.wedatasphere.linkis.manager.label.entity.engine.RunType.RunType
import org.apache.flink.yarn.configuration.YarnConfigOptions
-import scala.collection.JavaConversions._
-
class FlinkSQLExecutorFactory extends ComputationExecutorFactory {
@@ -43,14 +39,6 @@ class FlinkSQLExecutorFactory extends ComputationExecutorFactory {
case context: FlinkEngineConnContext =>
context.getEnvironmentContext.getFlinkConfig.set(YarnConfigOptions.PROPERTIES_FILE_LOCATION, EngineConnConf.getWorkHome)
val executor = new FlinkSQLComputationExecutor(id, context)
-// val containsEnvLabel = if(labels != null) labels.exists(_.isInstanceOf[EnvLabel]) else false
-// if(!containsEnvLabel) {
-// executor.getExecutorLabels().add(getEnvLabel(engineCreationContext))
-// }
-// if(executor.getEnvLabel.getEnvType == EnvLabel.DEV) {
-// context.getEnvironmentContext.getDefaultEnv
-// .setExecution(Map("max-table-reFlinkSQLComputationExecutorsult-rows" -> FlinkEnvConfiguration.FLINK_SQL_DEV_SELECT_MAX_LINES.getValue.asInstanceOf[Object]))
-// }
executor
}
diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/ql/Grammar.scala b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/ql/Grammar.scala
deleted file mode 100644
index d3b968ef0a..0000000000
--- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/ql/Grammar.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.webank.wedatasphere.linkis.engineconnplugin.flink.ql
-
-import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.Operation
-import com.webank.wedatasphere.linkis.engineconnplugin.flink.context.FlinkEngineConnContext
-
-
-trait Grammar extends Operation {
-
- def canParse(sql: String): Boolean
-
- def copy(context: FlinkEngineConnContext, sql: String): Grammar
-
-}
diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/ql/impl/CreateTableAsSelectGrammar.scala b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/ql/impl/CreateTableAsSelectGrammar.scala
deleted file mode 100644
index 4057ab1b2a..0000000000
--- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/ql/impl/CreateTableAsSelectGrammar.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.webank.wedatasphere.linkis.engineconnplugin.flink.ql.impl
-
-import com.webank.wedatasphere.linkis.common.utils.Logging
-import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.OperationUtil
-import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.result.ResultSet
-import com.webank.wedatasphere.linkis.engineconnplugin.flink.context.FlinkEngineConnContext
-import com.webank.wedatasphere.linkis.engineconnplugin.flink.ql.Grammar
-import org.apache.flink.table.api.internal.TableEnvironmentInternal
-
-import scala.util.matching.Regex
-
-
-class CreateTableAsSelectGrammar(context: FlinkEngineConnContext, sql: String) extends Grammar with Logging {
-
- def this() = this(null, null)
-
- override def canParse(sql: String): Boolean =
- CreateTableAsSelectGrammar.CREATE_TABLE_AS_SELECT_GRAMMAR.unapplySeq(sql).isDefined
-
- /**
- * Execute the command and return the result.
- */
- override def execute(): ResultSet = sql match {
- case CreateTableAsSelectGrammar.CREATE_TABLE_AS_SELECT_GRAMMAR(_, _, tableName, _, select, sql) =>
- val realSql = select + " " + sql
- info(s"Ready to create a table $tableName, the sql is: $realSql.")
- val function = new java.util.function.Function[TableEnvironmentInternal, Unit] {
- override def apply(t: TableEnvironmentInternal): Unit = {
- val table = t.sqlQuery(realSql)
- t.createTemporaryView(tableName, table)
- }
- }
- context.getExecutionContext.wrapClassLoader[Unit](function)
- OperationUtil.OK
- }
-
- override def copy(context: FlinkEngineConnContext, sql: String): CreateTableAsSelectGrammar = new CreateTableAsSelectGrammar(context, sql)
-}
-
-object CreateTableAsSelectGrammar {
-
- val CREATE_TABLE_AS_SELECT_GRAMMAR: Regex = "(create|CREATE)\\s+(table|TABLE)\\s+([a-zA-Z_][a-zA-Z_0-9]*)\\s+(as|AS)?\\s*(select|SELECT)(.+)".r
-
-}
\ No newline at end of file
diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/ql/GrammarFactory.scala b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/util/ClassUtil.scala
similarity index 50%
rename from linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/ql/GrammarFactory.scala
rename to linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/util/ClassUtil.scala
index baedc41886..9e62c4dddf 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/ql/GrammarFactory.scala
+++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/com/webank/wedatasphere/linkis/engineconnplugin/flink/util/ClassUtil.scala
@@ -15,24 +15,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package com.webank.wedatasphere.linkis.engineconnplugin.flink.ql
-import com.webank.wedatasphere.linkis.common.utils.ClassUtils
-import com.webank.wedatasphere.linkis.engineconnplugin.flink.context.FlinkEngineConnContext
-import com.webank.wedatasphere.linkis.engineconnplugin.flink.exception.SqlExecutionException
+package com.webank.wedatasphere.linkis.engineconnplugin.flink.util
-import scala.collection.convert.WrapAsScala._
+import com.webank.wedatasphere.linkis.common.utils.{ClassUtils, Utils}
+import com.webank.wedatasphere.linkis.engineconnplugin.flink.exception.JobExecutionException
+import scala.collection.convert.wrapAsScala._
-object GrammarFactory {
+object ClassUtil {
- private val grammars = ClassUtils.reflections.getSubTypesOf(classOf[Grammar]).filterNot(ClassUtils.isInterfaceOrAbstract).map(_.newInstance).toArray
-
- def getGrammars: Array[Grammar] = grammars
-
- def apply(sql: String, context: FlinkEngineConnContext): Grammar = getGrammar(sql, context)
- .getOrElse(throw new SqlExecutionException("Not support grammar " + sql))
-
- def getGrammar(sql: String, context: FlinkEngineConnContext): Option[Grammar] = grammars.find(_.canParse(sql)).map(_.copy(context, sql))
+ def getInstance[T](clazz: Class[T] , defaultValue: T): T = {
+ val classes = ClassUtils.reflections.getSubTypesOf(clazz).filterNot(ClassUtils.isInterfaceOrAbstract).toArray
+ if(classes.length <= 1) defaultValue
+ else if(classes.length == 2) {
+ val realClass = if(classes(0) == defaultValue.getClass) classes(1) else classes(0);
+ Utils.tryThrow(realClass.newInstance) { t =>
+ new JobExecutionException(s"New a instance of ${clazz.getSimpleName} failed!", t);
+ }
+ } else {
+ throw new JobExecutionException(s"Too many subClasses of ${clazz.getSimpleName}, list: $classes.");
+ }
+ }
}