-
Notifications
You must be signed in to change notification settings - Fork 13.7k
[FLINK-14581][python] Let python UDF execution no longer rely on the flink directory structure to support running python UDFs on yarn. #10061
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…flink directory structure to support running python UDFs on yarn.
|
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 99b5327 (Wed Dec 04 15:24:20 UTC 2019) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@WeiZhong94 Thanks a lot for the pr. Good job! I left some comments below. Mainly include:
- There are some code format problems. For example, the
extractBasicDependenciesFromResourcemethod in RunnerEnvUtil. It's better to declare each parameter in a new line. I only left the comment in one place and please check other places. - Maybe it's better to place the methods from
RunnerEnvUtilintoAbstractPythonFunctionRunnerdirectly. The methods seem not util methods that can be used by other places. - It's better to make the python log path the same with taskmanager log path.
Best, Hequn
| <resource> | ||
| <directory>lib</directory> | ||
| <includes> | ||
| <include>*.zip</include> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we include these zip files in the jar, can we remove the current python folder in opt folder(i.e., build-target/opt/python)?
| */ | ||
| public class RunnerEnvUtil { | ||
|
|
||
| public static final String[] PYTHON_BASIC_DEPENDENCIES = new String[] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- public => private
- remove
new String[]
| * Python libraries and shell script extracted from resource of flink-python jar. | ||
| * They are used to support running python udf worker in process mode. | ||
| */ | ||
| private transient List<File> pythonFiles; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name is easy to be mixed with python files uploaded by users after we support dependency management. So maybe we can rename it to be more clear? How about renaming it to internalPythonFiles?
| break; | ||
| } | ||
| } | ||
| Map<String, String> env = RunnerEnvUtil.appendEnvironmentVariable(System.getenv(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put System.getenv(), to a new line.
| if (systemEnv.get("FLINK_LOG_DIR") == null) { | ||
| if (systemEnv.get("LOG_DIRS") != null) { | ||
| // log directory of yarn mode | ||
| result.put("FLINK_LOG_DIR", systemEnv.get("LOG_DIRS").split(File.pathSeparator)[0]); | ||
| } else if (systemEnv.get(ConfigConstants.ENV_FLINK_LIB_DIR) != null) { | ||
| // log directory of standalone mode | ||
| File flinkHomeDir = new File(systemEnv.get(ConfigConstants.ENV_FLINK_LIB_DIR)).getParentFile(); | ||
| result.put("FLINK_LOG_DIR", new File(flinkHomeDir, "log").getAbsolutePath()); | ||
| } else if (systemEnv.get(ConfigConstants.ENV_FLINK_HOME_DIR) != null) { | ||
| // log directory of pyflink shell mode or user specified flink | ||
| result.put("FLINK_LOG_DIR", | ||
| new File(systemEnv.get(ConfigConstants.ENV_FLINK_HOME_DIR), "log").getAbsolutePath()); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The log path can be same with the log path of taskmanager?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently using System.getProperty("log.file") (default value of the log path of taskmanager) here, after moving the environment creation into EnvironmentManager in #10017 we can pass the true log path to this method easily.
| * The basic environment does not include python part of Apache Beam. | ||
| * Users need to prepare it themselves. | ||
| */ | ||
| public class RunnerEnvUtil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about put these two methods into AbstractPythonFunctionRunner for now and move them into EnvironmentManager in #10017.
It seems the two methods are not util methods that can be used by other places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Due to the extract method is reused by the PythonResourceExtractor, I only move the appendEnvironmentVariable into AbstractPythonFunctionRunner.
| "pyflink-udf-runner.sh" | ||
| }; | ||
|
|
||
| public static List<File> extractBasicDependenciesFromResource( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Be careful with the format. You can take a look at the methods in KeyedStream.
public static List<File> extractBasicDependenciesFromResource(
String tmpdir,
ClassLoader classLoader,
String prefix) throws IOException {
Also check other places.
|
@hequn8128 Thanks for your review! I have addressed your comments in the latest commit, please take a look again :). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@WeiZhong94 Thanks a lot for the update. The code looks better. I left some further comments below.
| public static void main(String[] args) throws IOException { | ||
| String tmpdir = System.getProperty("java.io.tmpdir"); | ||
|
|
||
| List<File> files = extractBasicDependenciesFromResource(tmpdir, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pay attention to the format.
| List<File> files = extractBasicDependenciesFromResource(tmpdir, | ||
| PythonResourceExtractor.class.getClassLoader(), UUID.randomUUID().toString()); | ||
|
|
||
| files.stream().filter(file -> file.getName().endsWith(".sh")).forEach(File::delete); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we avoid this. For example, we can pass a list to the extractBasicDependenciesFromResource method that the list contains no sh files.
|
|
||
| files.stream().filter(file -> file.getName().endsWith(".sh")).forEach(File::delete); | ||
|
|
||
| System.out.print(files.stream().filter(file -> !file.getName().endsWith(".sh")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can also remove the filter(file -> !file.getName().endsWith(".sh")) here.
| import static org.apache.flink.python.util.ResourceUtil.extractBasicDependenciesFromResource; | ||
|
|
||
| /** | ||
| * The program that extracts the internal python libraries and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Format the comment length to about 100 char length. It is common for java.
flink-python/bin/pyflink-shell.sh
Outdated
| EXTRACTOR="org.apache.flink.client.python.PythonResourceExtractor" | ||
| PYFLINK_INTERNAL_LIB=`${JAVA_RUN} ${JVM_ARGS} -cp ${PYTHON_JAR_PATH} ${EXTRACTOR}` | ||
| export PYTHONPATH="$PYFLINK_INTERNAL_LIB:$PYTHONPATH" | ||
| export PYFLINK_INTERNAL_LIB |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also clean temp files of PYFLINK_INTERNAL_LIB after executing pyflink.shell. If python shell exits abnormally, the file would not be cleaned.
| String tmpdir = tempDirs[rnd.nextInt(tempDirs.length)]; | ||
| String prefix = UUID.randomUUID().toString() + "_"; | ||
| try { | ||
| pythonInternalLibs = ResourceUtil.extractBasicDependenciesFromResource( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pay attention to the format. Change to:
pythonInternalLibs = ResourceUtil.extractBasicDependenciesFromResource(
tmpdir,
this.getClass().getClassLoader(),
prefix);
| LOG.warn("Can not get the log directory from property log.file.", e); | ||
| } | ||
| } | ||
| return appendEnvironmentVariable( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pay attention to the code format.
| pythonDependencies, logDir); | ||
| } | ||
|
|
||
| private static Map<String, String> appendEnvironmentVariable( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Merge this method with the previous appendEnvironmentVariable?
| /** | ||
| * The util class help to prepare Python env and run the python process. | ||
| */ | ||
| public final class PythonEnvUtils { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about rename this Class to PythonDriverEnvUtils ? I was confused when seeing extractBasicDependenciesFromResource was also called here at first glance.
| try { | ||
| ClassLoader classLoader = ResourceUtil.class.getClassLoader(); | ||
| String prefix = "tmp_"; | ||
| List<File> files = ResourceUtil.extractBasicDependenciesFromResource( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pay attention to the code format.
|
@hequn8128 Thanks for your review again! I have updated this PR. Please take a look. :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@WeiZhong94 We should also update the tests in PythonDriverEnvUtilsTest. See the test error in misc
|
@hequn8128 Thanks for your reminder, I have fixed the test failure in the latest commit. |
|
@WeiZhong94 Thanks a lot for the update. Merging... |
…flink directory structure to support running python UDFs on yarn. This closes apache#10061.
What is the purpose of the change
This pull request makes python UDF execution no longer rely on the flink directory structure to support running python UDFs on yarn.
Brief change log
pyflink-udf-runner.sh,pyflink.zip,py4j-0.10.8.1-src.zipandcloudpickle-1.2.2-src.zipas resources toflink-python-{VERSION}.jar.RunnerEnvUtilto extract above files before running python udf worker.pyflink-udf-runner.shand relevant tests to adapt to the above changes.Verifying this change
This change is already covered by existing tests, such as RunnerEnvUtilTest, test_process_mode_boot.py etc.
Does this pull request potentially affect one of the following parts:
@Public(Evolving): (no)Documentation