Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-31398][sql-gateway] OperationExecutor is no longer set context classloader when executing statement. #22294

Merged
merged 1 commit into from Apr 19, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -90,7 +90,6 @@
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TemporaryClassLoaderContext;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -182,23 +181,17 @@ public ResultFetcher configureSession(OperationHandle handle, String statement)

public ResultFetcher executeStatement(OperationHandle handle, String statement) {
// Instantiate the TableEnvironment lazily
// TODO: remove the usage of the context classloader until {@link
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Althogh the comment said we need make HiveParserUtils#getFunctionInfo use ResourceManager explicitly.
But after do some investigation, I found it's not necessary.
Currently, there's two callers to call method HiveParserUtils#getFunctionInfo
1:

FunctionInfo fi = HiveParserUtils.getFunctionInfo(funcText);
 if (fi == null) {
 desc = convertSqlOperator(funcText, children, ctx);
}
....

The fi will be null after we remove the classloader wrapper as it can't load the function using Thread.currentThread().getContextClassLoader(). But it's fine since it will then try to find the function from the function catalog. Such case have been convered in flink-sql-client by set.q.

2: getFuncExprNodeDescWithUdfData
There're two places call method getFuncExprNodeDescWithUdfData.
a: convertGenericFunc(ExprNodeGenericFuncDesc func), but it will only be called when it's hive build-in function, so it can be found in Thread.currentThread().getContextClassLoader() as use must put flink-connector-hive jar which include hive built-in functions to FLINK_HOME/lib to use Hive Dialect.
b:

// type conversion for LHS
            if (TypeInfoUtils.isConversionRequiredForComparison(lhsType, commonType)) {
                lhsDesc =
                        HiveASTParseUtils.createConversionCast(
                                lhsDesc, (PrimitiveTypeInfo) commonType);
            }

Also, HiveParserUtils.getFunctionInfo(funcText) will only be called to when it's build-in function which can then be loaded.

AFAIC, it's not easy to make HiveParserUtils#getFunctionInfo use ResourceManager explicitly which will involves much modification. I think it's fine to remove the usage of the context classloader in here.
cc @fsk119

// HiveParserUtils}#getFunctionInfo use ResourceManager explicitly.
try (TemporaryClassLoaderContext ignored =
TemporaryClassLoaderContext.of(
sessionContext.getSessionState().resourceManager.getUserClassLoader())) {
TableEnvironmentInternal tableEnv = getTableEnvironment();
List<Operation> parsedOperations = tableEnv.getParser().parse(statement);
if (parsedOperations.size() > 1) {
throw new UnsupportedOperationException(
"Unsupported SQL statement! Execute statement only accepts a single SQL statement or "
+ "multiple 'INSERT INTO' statements wrapped in a 'STATEMENT SET' block.");
}
Operation op = parsedOperations.get(0);
return sessionContext.isStatementSetState()
? executeOperationInStatementSetState(tableEnv, handle, op)
: executeOperation(tableEnv, handle, op);
TableEnvironmentInternal tableEnv = getTableEnvironment();
List<Operation> parsedOperations = tableEnv.getParser().parse(statement);
if (parsedOperations.size() > 1) {
throw new UnsupportedOperationException(
"Unsupported SQL statement! Execute statement only accepts a single SQL statement or "
+ "multiple 'INSERT INTO' statements wrapped in a 'STATEMENT SET' block.");
}
Operation op = parsedOperations.get(0);
return sessionContext.isStatementSetState()
? executeOperationInStatementSetState(tableEnv, handle, op)
: executeOperation(tableEnv, handle, op);
}

public String getCurrentCatalog() {
Expand Down