Skip to content

Commit

Permalink
[BugFix] [UDF] Fix the issue where UDF cannot find classes in Flink S…
Browse files Browse the repository at this point in the history
…QL tasks (#3525)

Co-authored-by: zackyoungh <zackyoungh@users.noreply.github.com>
  • Loading branch information
zackyoungh and zackyoungh committed May 23, 2024
1 parent 4cfd123 commit 443f501
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import org.dinky.function.constant.PathConstant;

import org.apache.flink.table.catalog.FunctionLanguage;

import lombok.extern.slf4j.Slf4j;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;
Expand All @@ -40,12 +42,12 @@ public BoxedUnit apply(String msg) {
}
}

public static IMain getInterpreter(Integer missionId) {
public static IMain getInterpreter() {

GenericRunnerSettings settings = new GenericRunnerSettings(new ErrorHandler());

settings.usejavacp().tryToSetFromPropertyValue("true");
settings.Yreploutdir().tryToSetFromPropertyValue(PathConstant.getUdfCompilerJavaPath(missionId));
settings.Yreploutdir().tryToSetFromPropertyValue(PathConstant.getUdfCompilerPath(FunctionLanguage.JAVA));
return new IMain(settings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.util.StrUtil;

/** @since 0.6.8 */
public class JVMPackage implements FunctionPackage {

@Override
Expand All @@ -57,7 +56,7 @@ public String[] pack(List<UDF> udfList, Integer missionId) {
for (int i = 0; i < classNameList.size(); i++) {
String className = classNameList.get(i);
String classFile = StrUtil.replace(className, ".", "/") + ".class";
String absoluteFilePath = PathConstant.getUdfCompilerJavaPath(missionId, classFile);
String absoluteFilePath = PathConstant.getUdfCompilerPath(FunctionLanguage.JAVA, classFile);

clazzs[i] = classFile;
fileInputStreams[i] = FileUtil.getInputStream(absoluteFilePath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.dinky.function.data.model.UDF;

import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.catalog.FunctionLanguage;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -46,7 +47,7 @@ public synchronized boolean compiler(UDF udf, ReadableConfig conf, Integer missi
// TODO 改为ProcessStep注释
log.info("Compiling java code, class: {}", udf.getClassName());
CustomStringJavaCompiler compiler = new CustomStringJavaCompiler(udf.getCode());
boolean res = compiler.compilerToTmpPath(PathConstant.getUdfCompilerJavaPath(missionId));
boolean res = compiler.compilerToTmpPath(PathConstant.getUdfCompilerPath(FunctionLanguage.JAVA));
String className = compiler.getFullClassName();
if (res) {
log.info("class compiled successfully:{}", className);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ public boolean compiler(UDF udf, ReadableConfig conf, Integer missionId) {
log.info("正在编译 python 代码 , class: " + udf.getClassName());
File pyFile = FileUtil.writeUtf8String(
udf.getCode(),
PathConstant.getUdfCompilerPythonPath(missionId, UDFUtil.getPyFileName(udf.getClassName()) + ".py"));
PathConstant.getUdfCompilerPath(
FunctionLanguage.PYTHON, UDFUtil.getPyFileName(udf.getClassName()) + ".py"));
File zipFile = ZipUtil.zip(pyFile);
FileUtil.del(pyFile);
try {
Expand All @@ -83,12 +84,12 @@ public boolean compiler(UDF udf, ReadableConfig conf, Integer missionId) {
SystemConfiguration.getInstances().getPythonHome());

PythonFunctionFactory.getPythonFunction(udf.getClassName(), configuration, null);
log.info("Python udf编译成功 ; className:" + udf.getClassName());
log.info("Python udf compiled successfully; className:{}", udf.getClassName());
} catch (Exception e) {
log.error("Python udf编译失败 ; className:"
+ udf.getClassName()
+ " 。 原因: "
+ ExceptionUtil.getRootCauseMessage(e));
log.error(
"Python udf compilation failed; className:{}\n.reason: {}",
udf.getClassName(),
ExceptionUtil.getRootCauseMessage(e));
return false;
}
FileUtil.del(zipFile);
Expand All @@ -112,8 +113,8 @@ public String[] pack(List<UDF> udfList, Integer missionId) {
.map(udf -> {
File file = FileUtil.writeUtf8String(
udf.getCode(),
PathConstant.getUdfCompilerPythonPath(
missionId, UDFUtil.getPyFileName(udf.getClassName()) + ".py"));
PathConstant.getUdfCompilerPath(
FunctionLanguage.PYTHON, UDFUtil.getPyFileName(udf.getClassName()) + ".py"));
return FileUtil.getInputStream(file);
})
.toArray(InputStream[]::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public boolean compiler(UDF udf, ReadableConfig conf, Integer missionId) {

String className = udf.getClassName();
log.info("正在编译 scala 代码 , class: " + className);
if (CustomStringScalaCompiler.getInterpreter(missionId).compileString(udf.getCode())) {
if (CustomStringScalaCompiler.getInterpreter().compileString(udf.getCode())) {
log.info("scala class编译成功:" + className);
return true;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.dinky.function.constant;

import org.apache.flink.table.catalog.FunctionLanguage;

import java.io.File;

import cn.hutool.core.util.StrUtil;
Expand Down Expand Up @@ -58,12 +60,12 @@ public static String getPath(Object... path) {
return StrUtil.join(File.separator, path) + File.separator;
}

public static String getUdfCompilerJavaPath(Integer missionId, Object... path) {
return getPath(UDF_PATH, missionId, COMPILER, "java", path);
public static String getUdfCompilerPath(FunctionLanguage language, String fileName) {
return getPath(UDF_PATH, COMPILER, language.name(), fileName);
}

public static String getUdfCompilerPythonPath(Integer missionId, Object... path) {
return getPath(UDF_PATH, missionId, COMPILER, "python", path);
public static String getUdfCompilerPath(FunctionLanguage language) {
return getPath(UDF_PATH, COMPILER, language.name());
}

public static String getUdfPackagePath(Integer missionId, Object... path) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public static Map<String, List<String>> buildJar(List<UDF> codeList) {
}
} else if (udf.getFunctionLanguage() == FunctionLanguage.SCALA) {
String className = udf.getClassName();
if (CustomStringScalaCompiler.getInterpreter(null).compileString(udf.getCode())) {
if (CustomStringScalaCompiler.getInterpreter().compileString(udf.getCode())) {
log.info("scala class compile successful:{}", className);
ClassPool.push(ClassEntity.build(className, udf.getCode()));
successList.add(className);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,43 +37,43 @@ export const buildRunModelOptions = () => {
{
label: 'Local',
value: RUN_MODE.LOCAL,
key: RUN_MODE.LOCAL,
key: RUN_MODE.LOCAL
},
{
label: 'Standalone',
value: RUN_MODE.STANDALONE,
key: RUN_MODE.STANDALONE,
key: RUN_MODE.STANDALONE
},
{
label: 'Yarn Session',
value: RUN_MODE.YARN_SESSION,
key: RUN_MODE.YARN_SESSION,
key: RUN_MODE.YARN_SESSION
},
{
// flink弃用了 yarn per-job 模式 在这写个标签 带横线的 | flink deprecated yarn per-job mode, write a label here with a horizontal line
label: <del>Yarn Per-Job (Deprecated)</del>,
value: RUN_MODE.YARN_PER_JOB,
key: RUN_MODE.YARN_PER_JOB,
key: RUN_MODE.YARN_PER_JOB
},
{
label: 'Yarn Application',
value: RUN_MODE.YARN_APPLICATION,
key: RUN_MODE.YARN_APPLICATION,
key: RUN_MODE.YARN_APPLICATION
},
{
label: 'Kubernetes Session',
value: RUN_MODE.KUBERNETES_SESSION,
key: RUN_MODE.KUBERNETES_SESSION,
key: RUN_MODE.KUBERNETES_SESSION
},
{
label: 'Kubernetes Application',
value: RUN_MODE.KUBERNETES_APPLICATION,
key: RUN_MODE.KUBERNETES_APPLICATION,
key: RUN_MODE.KUBERNETES_APPLICATION
},
{
label: 'Kubernetes Operator Application',
value: RUN_MODE.KUBERNETES_APPLICATION_OPERATOR,
key: RUN_MODE.KUBERNETES_APPLICATION_OPERATOR,
key: RUN_MODE.KUBERNETES_APPLICATION_OPERATOR
}
);

Expand Down Expand Up @@ -165,7 +165,7 @@ export const buildEnvOptions = (env: TaskInfo[] = []) => {
{l('button.disable')}
</Space>
),
title: l('button.disable'),
title: l('button.disable'),
value: -1,
key: -1
}
Expand Down

0 comments on commit 443f501

Please sign in to comment.