Skip to content

Commit

Permalink
Merge pull request #960 from wushengyeyouya/dev-1.0.2
Browse files Browse the repository at this point in the history
Optimize the code compatibility of flink EngineConn, and optimize the pom parent of flink module.
  • Loading branch information
peacewong committed Aug 16, 2021
2 parents 5221230 + 3daab43 commit 3867bef
Show file tree
Hide file tree
Showing 20 changed files with 646 additions and 749 deletions.
3 changes: 1 addition & 2 deletions linkis-engineconn-plugins/engineconn-plugins/flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>linkis</artifactId>
<artifactId>linkis-engineconn-plugins</artifactId>
<groupId>com.webank.wedatasphere.linkis</groupId>
<version>1.0.2</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,115 +19,15 @@
package com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation;


import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.impl.CreateViewOperation;
import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.impl.DDLOperation;
import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.impl.DescribeTableOperation;
import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.impl.DropViewOperation;
import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.impl.ExplainOperation;
import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.impl.InsertOperation;
import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.impl.ResetOperation;
import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.impl.SelectOperation;
import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.impl.SetOperation;
import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.impl.ShowCatalogsOperation;
import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.impl.ShowCurrentCatalogOperation;
import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.impl.ShowCurrentDatabaseOperation;
import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.impl.ShowDatabasesOperation;
import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.impl.ShowFunctionsOperation;
import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.impl.ShowModulesOperation;
import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.impl.ShowTablesOperation;
import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.impl.ShowViewsOperation;
import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.impl.UseCatalogOperation;
import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.impl.UseDatabaseOperation;
import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.utils.SqlCommandParser.SqlCommandCall;
import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.parser.SqlCommandCall;
import com.webank.wedatasphere.linkis.engineconnplugin.flink.context.FlinkEngineConnContext;
import com.webank.wedatasphere.linkis.engineconnplugin.flink.exception.SqlParseException;
import com.webank.wedatasphere.linkis.engineconnplugin.flink.ql.GrammarFactory;

public class OperationFactory {
public interface OperationFactory {

public static Operation createOperation(SqlCommandCall call, FlinkEngineConnContext context) throws SqlParseException {

Operation operation;
switch (call.command) {
case SELECT:
operation = new SelectOperation(context, call.operands[0]);
break;
case CREATE_VIEW:
operation = new CreateViewOperation(context, call.operands[0], call.operands[1]);
break;
case DROP_VIEW:
operation = new DropViewOperation(context, call.operands[0], Boolean.parseBoolean(call.operands[1]));
break;
case LINKIS_GRAMMAR:
operation = GrammarFactory.apply(call.operands[0], context);
break;
case CREATE_TABLE:
case DROP_TABLE:
case ALTER_TABLE:
case CREATE_DATABASE:
case DROP_DATABASE:
case ALTER_DATABASE:
operation = new DDLOperation(context, call.operands[0], call.command);
break;
case SET:
// list all properties
if (call.operands.length == 0) {
operation = new SetOperation(context);
} else {
// set a property
operation = new SetOperation(context, call.operands[0], call.operands[1]);
}
break;
case RESET:
if (call.operands.length > 0) {
throw new SqlParseException("Only RESET ALL is supported now");
}
operation = new ResetOperation(context);
break;
case USE_CATALOG:
operation = new UseCatalogOperation(context, call.operands[0]);
break;
case USE:
operation = new UseDatabaseOperation(context, call.operands[0]);
break;
case INSERT_INTO:
case INSERT_OVERWRITE:
operation = new InsertOperation(context, call.operands[0], call.operands[1]);
break;
case SHOW_MODULES:
operation = new ShowModulesOperation(context);
break;
case SHOW_CATALOGS:
operation = new ShowCatalogsOperation(context);
break;
case SHOW_CURRENT_CATALOG:
operation = new ShowCurrentCatalogOperation(context);
break;
case SHOW_DATABASES:
operation = new ShowDatabasesOperation(context);
break;
case SHOW_CURRENT_DATABASE:
operation = new ShowCurrentDatabaseOperation(context);
break;
case SHOW_TABLES:
operation = new ShowTablesOperation(context);
break;
case SHOW_VIEWS:
operation = new ShowViewsOperation(context);
break;
case SHOW_FUNCTIONS:
operation = new ShowFunctionsOperation(context);
break;
case DESCRIBE_TABLE:
operation = new DescribeTableOperation(context, call.operands[0]);
break;
case EXPLAIN:
operation = new ExplainOperation(context, call.operands[0]);
break;
default:
throw new SqlParseException("Unsupported command call " + call + ". This is a bug.");
}

return operation;
static OperationFactory getOperationFactory() {
return OperationFactoryImpl.getInstance();
}

Operation createOperation(SqlCommandCall call, FlinkEngineConnContext context) throws SqlParseException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation;

import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.impl.*;
import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.parser.SqlCommandCall;
import com.webank.wedatasphere.linkis.engineconnplugin.flink.context.FlinkEngineConnContext;
import com.webank.wedatasphere.linkis.engineconnplugin.flink.exception.SqlParseException;
import com.webank.wedatasphere.linkis.engineconnplugin.flink.util.ClassUtil;


public class OperationFactoryImpl implements OperationFactory {

private OperationFactoryImpl() {
}

@Override
public Operation createOperation(SqlCommandCall call, FlinkEngineConnContext context) throws SqlParseException {
Operation operation;
switch (call.command) {
case SELECT:
operation = new SelectOperation(context, call.operands[0]);
break;
case CREATE_VIEW:
operation = new CreateViewOperation(context, call.operands[0], call.operands[1]);
break;
case DROP_VIEW:
operation = new DropViewOperation(context, call.operands[0], Boolean.parseBoolean(call.operands[1]));
break;
case CREATE_TABLE:
case DROP_TABLE:
case ALTER_TABLE:
case CREATE_DATABASE:
case DROP_DATABASE:
case ALTER_DATABASE:
operation = new DDLOperation(context, call.operands[0], call.command);
break;
case SET:
// list all properties
if (call.operands.length == 0) {
operation = new SetOperation(context);
} else {
// set a property
operation = new SetOperation(context, call.operands[0], call.operands[1]);
}
break;
case RESET:
if (call.operands.length > 0) {
throw new SqlParseException("Only RESET ALL is supported now");
}
operation = new ResetOperation(context);
break;
case USE_CATALOG:
operation = new UseCatalogOperation(context, call.operands[0]);
break;
case USE:
operation = new UseDatabaseOperation(context, call.operands[0]);
break;
case INSERT_INTO:
case INSERT_OVERWRITE:
operation = new InsertOperation(context, call.operands[0], call.operands[1]);
break;
case SHOW_MODULES:
operation = new ShowModulesOperation(context);
break;
case SHOW_CATALOGS:
operation = new ShowCatalogsOperation(context);
break;
case SHOW_CURRENT_CATALOG:
operation = new ShowCurrentCatalogOperation(context);
break;
case SHOW_DATABASES:
operation = new ShowDatabasesOperation(context);
break;
case SHOW_CURRENT_DATABASE:
operation = new ShowCurrentDatabaseOperation(context);
break;
case SHOW_TABLES:
operation = new ShowTablesOperation(context);
break;
case SHOW_VIEWS:
operation = new ShowViewsOperation(context);
break;
case SHOW_FUNCTIONS:
operation = new ShowFunctionsOperation(context);
break;
case DESCRIBE_TABLE:
operation = new DescribeTableOperation(context, call.operands[0]);
break;
case EXPLAIN:
operation = new ExplainOperation(context, call.operands[0]);
break;
default:
throw new SqlParseException("Unsupported command call " + call + ". This is a bug.");
}
return operation;
}

private static OperationFactory operationFactory;

public static OperationFactory getInstance() {
if(operationFactory == null) {
synchronized (OperationFactory.class) {
if(operationFactory == null) {
operationFactory = ClassUtil.getInstance(OperationFactory.class, new OperationFactoryImpl());
}
}
}
return operationFactory;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.NonJobOperation;
import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.OperationUtil;
import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.operation.result.ResultSet;
import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.utils.SqlCommandParser.SqlCommand;
import com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.parser.SqlCommand;
import com.webank.wedatasphere.linkis.engineconnplugin.flink.context.FlinkEngineConnContext;
import com.webank.wedatasphere.linkis.engineconnplugin.flink.exception.SqlExecutionException;
import org.apache.flink.table.api.TableEnvironment;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.webank.wedatasphere.linkis.engineconnplugin.flink.client.sql.parser;

import java.util.Optional;
import java.util.function.Function;
import java.util.regex.Pattern;

public enum SqlCommand {

LINKIS_GRAMMAR,

SELECT,

INSERT_INTO,

INSERT_OVERWRITE,

CREATE_TABLE,

ALTER_TABLE,

DROP_TABLE,

CREATE_VIEW,

DROP_VIEW,

CREATE_DATABASE,

ALTER_DATABASE,

DROP_DATABASE,

USE_CATALOG,

USE,

SHOW_CATALOGS,

SHOW_DATABASES,

SHOW_TABLES,

SHOW_FUNCTIONS,

EXPLAIN,

DESCRIBE_TABLE,

RESET,

SET(
"SET",
Inner_Config.NO_OPERANDS),

SHOW_MODULES(
"SHOW\\s+MODULES",
Inner_Config.NO_OPERANDS),

SHOW_VIEWS(
"SHOW\\s+VIEWS",
Inner_Config.NO_OPERANDS),

SHOW_CURRENT_CATALOG(
"SHOW\\s+CURRENT\\s+CATALOG",
Inner_Config.NO_OPERANDS),

SHOW_CURRENT_DATABASE(
"SHOW\\s+CURRENT\\s+DATABASE",
Inner_Config.NO_OPERANDS);

private final Pattern pattern;
private final Function<String[], Optional<String[]>> operandConverter;

SqlCommand(String matchingRegex, Function<String[], Optional<String[]>> operandConverter) {
this.pattern = Pattern.compile(matchingRegex, Inner_Config.DEFAULT_PATTERN_FLAGS);
this.operandConverter = operandConverter;
}

SqlCommand() {
this.pattern = null;
this.operandConverter = null;
}

@Override
public String toString() {
return super.toString().replace('_', ' ');
}

boolean hasPattern() {
return pattern != null && operandConverter != null;
}

Pattern getPattern() {
return pattern;
}

Function<String[], Optional<String[]>> getOperandConverter() {
return operandConverter;
}

static class Inner_Config {
private static final Function<String[], Optional<String[]>> NO_OPERANDS =
(operands) -> Optional.of(new String[0]);

private static final int DEFAULT_PATTERN_FLAGS = Pattern.CASE_INSENSITIVE | Pattern.DOTALL;
}

}
Loading

0 comments on commit 3867bef

Please sign in to comment.