Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.flink.table.operations.NopOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.StatementSetOperation;
import org.apache.flink.table.operations.command.AddJarOperation;
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
import org.apache.flink.table.planner.delegation.ParserImpl;
import org.apache.flink.table.planner.delegation.PlannerContext;
Expand Down Expand Up @@ -208,7 +209,9 @@ public List<Operation> parse(String statement) {
return super.parse(statement);
}

Optional<Operation> nonSqlOperation = tryProcessHiveNonSqlStatement(statement);
Optional<Operation> nonSqlOperation =
tryProcessHiveNonSqlStatement(
((HiveCatalog) currentCatalog).getHiveConf(), statement);
if (nonSqlOperation.isPresent()) {
return Collections.singletonList(nonSqlOperation.get());
}
Expand All @@ -231,16 +234,22 @@ public List<Operation> parse(String statement) {
}
}

private Optional<Operation> tryProcessHiveNonSqlStatement(String statement) {
private Optional<Operation> tryProcessHiveNonSqlStatement(HiveConf hiveConf, String statement) {
String[] commandTokens = statement.split("\\s+");
HiveCommand hiveCommand = HiveCommand.find(commandTokens);
if (hiveCommand != null) {
String cmdArgs = statement.substring(commandTokens[0].length()).trim();
// the command may end with ";" since it won't be removed by Flink SQL CLI,
// so, we need to remove ";"
if (cmdArgs.endsWith(";")) {
cmdArgs = cmdArgs.substring(0, cmdArgs.length() - 1);
}
if (hiveCommand == HiveCommand.SET) {
return Optional.of(
processSetCmd(
statement, statement.substring(commandTokens[0].length()).trim()));
return Optional.of(processSetCmd(statement, cmdArgs));
} else if (hiveCommand == HiveCommand.RESET) {
return Optional.of(super.parse(statement).get(0));
} else if (hiveCommand == HiveCommand.ADD) {
return Optional.of(processAddCmd(substituteVariables(hiveConf, cmdArgs)));
} else {
throw new UnsupportedOperationException(
String.format("The Hive command %s is not supported.", hiveCommand));
Expand All @@ -250,29 +259,22 @@ private Optional<Operation> tryProcessHiveNonSqlStatement(String statement) {
}

private Operation processSetCmd(String originCmd, String setCmdArgs) {
String nwcmd = setCmdArgs.trim();
// the set command may end with ";" since it won't be removed by Flink SQL CLI,
// so, we need to remove ";"
if (nwcmd.endsWith(";")) {
nwcmd = nwcmd.substring(0, nwcmd.length() - 1);
}

if (nwcmd.equals("")) {
if (setCmdArgs.equals("")) {
return new HiveSetOperation();
}
if (nwcmd.equals("-v")) {
if (setCmdArgs.equals("-v")) {
return new HiveSetOperation(true);
}

String[] part = new String[2];
int eqIndex = nwcmd.indexOf('=');
if (nwcmd.contains("=")) {
if (eqIndex == nwcmd.length() - 1) { // x=
part[0] = nwcmd.substring(0, nwcmd.length() - 1);
int eqIndex = setCmdArgs.indexOf('=');
if (setCmdArgs.contains("=")) {
if (eqIndex == setCmdArgs.length() - 1) { // x=
part[0] = setCmdArgs.substring(0, setCmdArgs.length() - 1);
part[1] = "";
} else { // x=y
part[0] = nwcmd.substring(0, eqIndex).trim();
part[1] = nwcmd.substring(eqIndex + 1).trim();
part[0] = setCmdArgs.substring(0, eqIndex).trim();
part[1] = setCmdArgs.substring(eqIndex + 1).trim();
if (!startWithHiveSpecialVariablePrefix(part[0])) {
// TODO:
// currently, for the command set key=value, we will fall to
Expand All @@ -294,7 +296,7 @@ private Operation processSetCmd(String originCmd, String setCmdArgs) {
}
return new HiveSetOperation(part[0], part[1]);
}
return new HiveSetOperation(nwcmd);
return new HiveSetOperation(setCmdArgs);
}

/**
Expand All @@ -306,6 +308,27 @@ private String substituteVariables(HiveConf conf, String statement) {
return new VariableSubstitution(() -> hiveVariables).substitute(conf, statement);
}

private Operation processAddCmd(String addCmdArgs) {
String[] tokens = addCmdArgs.split("\\s+");
SessionState.ResourceType resourceType = SessionState.find_resource_type(tokens[0]);
if (resourceType == SessionState.ResourceType.FILE) {
throw new UnsupportedOperationException(
"ADD FILE is not supported yet. Usage: ADD JAR <file_path>");
} else if (resourceType == SessionState.ResourceType.ARCHIVE) {
throw new UnsupportedOperationException(
"ADD ARCHIVE is not supported yet. Usage: ADD JAR <file_path>");
} else if (resourceType == SessionState.ResourceType.JAR) {
if (tokens.length != 2) {
throw new UnsupportedOperationException(
"Add multiple jar in one single statement is not supported yet. Usage: ADD JAR <file_path>");
}
return new AddJarOperation(tokens[1]);
} else {
throw new IllegalArgumentException(
String.format("Unknown resource type: %s.", tokens[0]));
}
}

private List<Operation> processCmd(
String cmd, HiveConf hiveConf, HiveShim hiveShim, HiveCatalog hiveCatalog) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.flink.table.functions.hive.util.TestSplitUDTFInitializeWithStructObjectInspector;
import org.apache.flink.table.operations.DescribeTableOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.command.AddJarOperation;
import org.apache.flink.table.operations.command.ClearOperation;
import org.apache.flink.table.operations.command.HelpOperation;
import org.apache.flink.table.operations.command.QuitOperation;
Expand Down Expand Up @@ -1144,6 +1145,33 @@ public void testSetCommand() throws Exception {
assertThat(result.toString()).isEqualTo(String.format("[+I[%s]]", path));
}

@Test
public void testAddCommand() {
TableEnvironmentInternal tableEnvInternal = (TableEnvironmentInternal) tableEnv;
Parser parser = tableEnvInternal.getParser();

// test add jar
Operation operation = parser.parse("add jar test.jar").get(0);
assertThat(operation).isInstanceOf(AddJarOperation.class);
assertThat(((AddJarOperation) operation).getPath()).isEqualTo("test.jar");
// test add jar with variable substitute
operation = parser.parse("add jar ${hiveconf:common-key}.jar").get(0);
assertThat(operation).isInstanceOf(AddJarOperation.class);
assertThat(((AddJarOperation) operation).getPath()).isEqualTo("common-val.jar");

// test unsupported add command
assertThatThrownBy(() -> tableEnv.executeSql("add jar t1.jar t2.jar"))
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage(
"Add multiple jar in one single statement is not supported yet. Usage: ADD JAR <file_path>");
assertThatThrownBy(() -> tableEnv.executeSql("add file t1.txt"))
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage("ADD FILE is not supported yet. Usage: ADD JAR <file_path>");
assertThatThrownBy(() -> tableEnv.executeSql("add archive t1.tgz"))
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage("ADD ARCHIVE is not supported yet. Usage: ADD JAR <file_path>");
}

@Test
public void testUnsupportedOperation() {
List<String> statements =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,16 @@
@RunWith(Parameterized.class)
public class CliClientITCase extends AbstractTestBase {

private static final String HIVE_ADD_ONE_UDF_CLASS = "HiveAddOneFunc";
private static final String HIVE_ADD_ONE_UDF_CODE =
"public class "
+ HIVE_ADD_ONE_UDF_CLASS
+ " extends org.apache.hadoop.hive.ql.exec.UDF {\n"
+ " public int evaluate(int content) {\n"
+ " return content + 1;\n"
+ " }"
+ "}\n";

private static Path historyPath;
private static Map<String, String> replaceVars;

Expand Down Expand Up @@ -109,6 +119,7 @@ public static void setup() throws IOException {
classNameCodes.put(
GENERATED_UPPER_UDF_CLASS,
String.format(GENERATED_UPPER_UDF_CODE, GENERATED_UPPER_UDF_CLASS));
classNameCodes.put(HIVE_ADD_ONE_UDF_CLASS, HIVE_ADD_ONE_UDF_CODE);

File udfJar =
UserClassLoaderJarTestUtils.createJarFile(
Expand Down
30 changes: 30 additions & 0 deletions flink-table/flink-sql-client/src/test/resources/sql/set.q
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,36 @@ CREATE TABLE foo as select 1;
1 row in set
!ok

# test add jar
ADD JAR $VAR_UDF_JAR_PATH;
[INFO] The specified jar is added into session classloader.
!info

SHOW JARS;
$VAR_UDF_JAR_PATH
!ok

CREATE FUNCTION hive_add_one as 'HiveAddOneFunc';
[INFO] Execute statement succeed.
!info

SELECT hive_add_one(1);
+----+-------------+
| op | _o__c0 |
+----+-------------+
| +I | 2 |
+----+-------------+
Received a total of 1 row
!ok

REMOVE JAR '$VAR_UDF_JAR_PATH';
[INFO] The specified jar is removed from session classloader.
!info

SHOW JARS;
Empty set
!ok

# list the configured configuration
set;
'execution.attached' = 'true'
Expand Down