Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-14019][python] add support for managing environment and dependencies of Python UDF in Flink Python API #10017

Closed
wants to merge 7 commits into from

Conversation

@WeiZhong94
Copy link
Contributor

WeiZhong94 commented Oct 28, 2019

What is the purpose of the change

This pull request adds support for managing environment and dependencies of Python UDF in Flink Python API.

Brief change log

  • Add new APIs in flink-python to support upload users' python environment and dependencies to cluster.
  • Refactor the operator layer of flink-python to support different type of running mode(docker mode, process mode, etc).
  • Add ProcessEnvironmentManager to support managing environment and dependencies of Python UDF worker when running in process mode.

Verifying this change

This change is already covered by newly added tests, such as ProcessEnvironmentManagerTest, PythonDependencyManagerTest, and so on.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (python docs)
@flinkbot

This comment has been minimized.

Copy link

flinkbot commented Oct 28, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit c851f97 (Mon Oct 28 13:09:37 UTC 2019)

Warnings:

  • 1 pom.xml files were touched: Check for build and licensing issues.
  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • 1. The [description] looks good.
  • 2. There is [consensus] that the contribution should go into to Flink.
  • 3. Needs [attention] from.
  • 4. The change fits into the overall [architecture].
  • 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier
@flinkbot

This comment has been minimized.

Copy link

flinkbot commented Oct 28, 2019

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build
Copy link
Contributor

hequn8128 left a comment

@WeiZhong94 Thanks a lot for the PR. Could you take a look at the test failures?

@WeiZhong94

This comment has been minimized.

Copy link
Contributor Author

WeiZhong94 commented Oct 30, 2019

@flinkbot run travis

@WeiZhong94

This comment has been minimized.

Copy link
Contributor Author

WeiZhong94 commented Oct 30, 2019

@hequn8128 Thanks you for the the notification! This failure did not occur in my own travis, but it happened twice in flink-ci travis. So I think it may be due to a cache error or other random factors. I''m trying to push something to refresh the cache and fix this.

Copy link
Contributor

hequn8128 left a comment

@WeiZhong94 Thanks a lot for the PR. The code looks good overall. Left some comments for the first commit. Will see other commits later.

@@ -274,6 +275,24 @@ def set_sql_dialect(self, sql_dialect):
"""
self._j_table_config.setSqlDialect(SqlDialect._to_j_sql_dialect(sql_dialect))

def set_python_executable(self, python_exec):
"""
Set the path of the python interpreter for running the python udf worker.

This comment has been minimized.

Copy link
@hequn8128

hequn8128 Oct 31, 2019

Contributor

for running the python udf worker => for running the python udf on workers ?

.. note::
The python udf worker depends on Apache Beam (version must be 2.15.0),
CloudPickle (version >= 1.2.2), Pip (version >= 7.1.0) and

This comment has been minimized.

Copy link
@hequn8128

hequn8128 Oct 31, 2019

Contributor

Remove ClouldPickle as it is packaged in Flink, so users don't need to add it.
I think the most important here is we should info users that python 3.5 or higher is required.

@@ -274,6 +275,24 @@ def set_sql_dialect(self, sql_dialect):
"""
self._j_table_config.setSqlDialect(SqlDialect._to_j_sql_dialect(sql_dialect))

def set_python_executable(self, python_exec):

This comment has been minimized.

Copy link
@hequn8128

hequn8128 Oct 31, 2019

Contributor

Also provide get method?

"""
Set the path of the python interpreter for running the python udf worker.
e.g. "/usr/local/bin/python3".

This comment has been minimized.

Copy link
@hequn8128

hequn8128 Oct 31, 2019

Contributor

Add descriptions about how to specify interpreter path when using archive.


PYTHON_FILE_PREFIX = "python_file"
PYTHON_REQUIREMENTS_PREFIX = "python_requirements_list_file"
PYTHON_REQUIREMENTS_DIR_PREFIX = "python_requirements_dir"

This comment has been minimized.

Copy link
@hequn8128

hequn8128 Oct 31, 2019

Contributor

Change to python_requirements_cache? To make it consistent with def set_python_requirements(self, requirements_list_file, requirements_cached_dir=None):

self._dependency_manager.add_python_file(file_path)
self._dependency_manager.transmit_parameters_to_jvm(self.get_config().get_configuration())

def set_python_requirements(self, requirements_list_file, requirements_cached_dir=None):

This comment has been minimized.

Copy link
@hequn8128

hequn8128 Oct 31, 2019

Contributor

Rename requirements_list_file to requirements_file_path?

Copy link
Contributor

hequn8128 left a comment

@WeiZhong94 Comments for the second commit. I think we need to be more careful with null checks in the commit.

@@ -80,6 +80,10 @@ under the License.
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-jackson</artifactId>

This comment has been minimized.

Copy link
@hequn8128

hequn8128 Nov 1, 2019

Contributor

Change to provided here. The flink-shaded-jackson has already been packaged in flink-dist.

* offline.
* If exists it should contain all the packages listed in requirements.txt file.
*/
public String getRequirementsDirPath() {

This comment has been minimized.

Copy link
@hequn8128

hequn8128 Nov 1, 2019

Contributor

Change to getRequirementsCacheDir? We may make all these names consistent.

* @param distributedCache The DistributedCache object of current task.
* @return The PythonDependencyManager object that contains whole information of python dependency.
*/
public static PythonDependencyManager createDependencyManager(

This comment has been minimized.

Copy link
@hequn8128

hequn8128 Nov 1, 2019

Contributor

Just rename it create? There is no need to add key word DependencyManager in the method as it returns a PythonDependencyManager.

/**
* This class is used to parse the information of python dependency and environment management
* stored in GlobalJobParameters.
* The parse result will be used to create PythonEnvironmentManager.

This comment has been minimized.

Copy link
@hequn8128

hequn8128 Nov 1, 2019

Contributor

Put this sentence directly after GlobalJobParameters? or add

before this sentence?


String requirementsFilePath = null;
String requirementsDirPath = null;
if (dependencyMetaData.get(PYTHON_REQUIREMENTS) != null) {

This comment has been minimized.

Copy link
@hequn8128

hequn8128 Nov 1, 2019

Contributor

Change all these null check to containsKey. Always try to avoid null check.

private Map<String, String> filesInPythonPath;
private String requirementsFilePath;
private String requirementsDirPath;
private String pythonExec;
private Map<String, String> archives;
Comment on lines 45 to 49

This comment has been minimized.

Copy link
@hequn8128

hequn8128 Nov 1, 2019

Contributor

Use @nullable annotation for these class members.

* TableEnvironment#add_python_file() or command option "-pyfs".
* Key is local absolute path and value is origin file name.
*/
public Map<String, String> getFilesInPythonPath() {

This comment has been minimized.

Copy link
@hequn8128

hequn8128 Nov 1, 2019

Contributor

Always use Optional to return nullable values. See more details in Code Style and Quality Guide

String pythonExec = null;
if (dependencyMetaData.get(PYTHON_EXEC) != null) {
pythonExec = dependencyMetaData.get(PYTHON_EXEC);
}
Comment on lines 171 to 174

This comment has been minimized.

Copy link
@hequn8128

hequn8128 Nov 1, 2019

Contributor

It is equivalent to String pythonExec=dependencyMetaData.get(PYTHON_EXEC);

throws IOException {
Map<String, String> filesPathToFilesName = new HashMap<>();
if (dependencyMetaData.get(PYTHON_FILE_MAP) != null) {
ObjectMapper mapper = new ObjectMapper();

This comment has been minimized.

Copy link
@hequn8128

hequn8128 Nov 1, 2019

Contributor

Put mapper as a method variable and avoid define it multi times?

@WeiZhong94 WeiZhong94 force-pushed the WeiZhong94:FLINK-14019 branch 3 times, most recently from 8800956 to 362c69a Nov 1, 2019
@WeiZhong94

This comment has been minimized.

Copy link
Contributor Author

WeiZhong94 commented Nov 1, 2019

@hequn8128 Thanks for your review! I have addressed your comments in the latest commits.

Copy link
Contributor

hequn8128 left a comment

@WeiZhong94 Thanks a lot for the update. Comments for the first three commits.

String pythonTmpDirectoryRoot = tmpDirectories[tmpDirectoryIndex];
ExecutionConfig.GlobalJobParameters globalJobParameters = getExecutionConfig().getGlobalJobParameters();
Map<String, String> parameters;
if (globalJobParameters != null) {

This comment has been minimized.

Copy link
@hequn8128

hequn8128 Nov 5, 2019

Contributor

globalJobParameters can not be null. Simplify it as

PythonDependencyManager dependencyManager = PythonDependencyManager.createDependencyManager(
  getExecutionConfig().getGlobalJobParameters().toMap(),
  getRuntimeContext().getDistributedCache());
String[] tmpDirectories =
getContainingTask().getEnvironment().getTaskManagerInfo().getTmpDirectories();
Random rand = new Random();
int tmpDirectoryIndex = rand.nextInt() % tmpDirectories.length;
String pythonTmpDirectoryRoot = tmpDirectories[tmpDirectoryIndex];
Comment on lines 119 to 123

This comment has been minimized.

Copy link
@hequn8128

hequn8128 Nov 5, 2019

Contributor

Maybe extract these logic into a small method? Putting all logic in the open makes it a mess.

PYTHON_FILE_MAP = "PYTHON_FILE_MAP"
PYTHON_REQUIREMENTS_FILE = "PYTHON_REQUIREMENTS_FILE"
PYTHON_REQUIREMENTS_CACHE = "PYTHON_REQUIREMENTS_CACHE"
PYTHON_ARCHIVES_MAP = "PYTHON_ARCHIVES_MAP"
PYTHON_EXEC = "PYTHON_EXEC"
Comment on lines 36 to 40

This comment has been minimized.

Copy link
@hequn8128

hequn8128 Nov 5, 2019

Contributor

As we put these names in config, how about adding prefix to all these names? For example, PYTHON_EXEC = "python.environment.python_exec".

DependencyManager.PYTHON_FILE_MAP, json.dumps(self._python_file_map))
self.register_file(key, file_path)

def set_python_requirements(self, requirements_file_path, requirements_cached_dir=None):

This comment has been minimized.

Copy link
@hequn8128

hequn8128 Nov 5, 2019

Contributor

Add some inline comments for this method?

if self._parameters.contains_key(DependencyManager.PYTHON_REQUIREMENTS_FILE):
self.remove_file(
self._parameters.get_string(DependencyManager.PYTHON_REQUIREMENTS_FILE, ""))
self._parameters.remove_config(DependencyManager.PYTHON_REQUIREMENTS_FILE)
Comment on lines 65 to 68

This comment has been minimized.

Copy link
@hequn8128

hequn8128 Nov 5, 2019

Contributor

Add a meaningful method for this? For example, removeIfDelete() and it can also be used to remove requirements_cached.

}

@Override
public void cleanup() {

This comment has been minimized.

Copy link
@hequn8128

hequn8128 Nov 5, 2019

Contributor

Call this cleanup in the close method of Runner. Remove the environmentManager member in AbstractPythonFunctionOperator.

*/
@Override
public String createRetrievalToken() throws IOException {
if (shutdownHook == null) {

This comment has been minimized.

Copy link
@hequn8128

hequn8128 Nov 5, 2019

Contributor

It's safer to register the hook in the open method so that we don't need to register the hook at every place which operates directories. We can add an open method in PythonEnvironmentManager and rename cleanup to close.

}
Path src = FileSystems.getDefault().getPath(entry.getKey());
try {
if (testCopy) {

This comment has been minimized.

Copy link
@hequn8128

hequn8128 Nov 5, 2019

Contributor

Remove this test logic?

}
Files.createSymbolicLink(target, src);
} catch (IOException e) {
FileUtils.copy(new org.apache.flink.core.fs.Path(src.toUri()),

This comment has been minimized.

Copy link
@hequn8128

hequn8128 Nov 5, 2019

Contributor

Add log info if create symbolic link failed, e.g., LOG.warn().

* function runner is configured to run python UDF in process mode.
*/
@Internal
public class ProcessEnvironmentManager implements PythonEnvironmentManager {

This comment has been minimized.

Copy link
@hequn8128

hequn8128 Nov 5, 2019

Contributor

public final?

@WeiZhong94

This comment has been minimized.

Copy link
Contributor Author

WeiZhong94 commented Nov 6, 2019

@hequn8128 Thanks for your review again! I have addressed most of your comments and reply some of them because I think it may need some discussion there. Please take a look :).

@WeiZhong94 WeiZhong94 force-pushed the WeiZhong94:FLINK-14019 branch from d59863b to b79460a Nov 12, 2019
@WeiZhong94 WeiZhong94 force-pushed the WeiZhong94:FLINK-14019 branch from b79460a to 0b1c1af Nov 20, 2019
Copy link
Contributor

dianfu left a comment

@WeiZhong94 Thanks a lot for the PR. Good work! Have left a few comments.

@@ -401,7 +407,34 @@ class PyFlinkStreamUserDefinedFunctionTests(UserDefinedFunctionTests,
pass


class BlinkUserDefinedFunctionTests(object):

This comment has been minimized.

Copy link
@dianfu

dianfu Nov 21, 2019

Contributor

How about adding a dedicated test file for dependency test, i.e. test_dependency.py, etc.

else:
env["PYTHONPATH"] = installed_python_path
# since '--prefix' option is only supported for pip 8.0+, so here we fallback to
# use '--install-option' when the pip version is lower than 8.0.0.

This comment has been minimized.

Copy link
@dianfu

dianfu Nov 21, 2019

Contributor

Move this comment to the corresponding place? i.e. line 119 where we judge version.


PYTHON_REQUIREMENTS_FILE = "_PYTHON_REQUIREMENTS_FILE"
PYTHON_REQUIREMENTS_CACHE = "_PYTHON_REQUIREMENTS_CACHE"
PYTHON_REQUIREMENTS_DIR_ENV = "_PYTHON_REQUIREMENTS_INSTALL_DIR"

This comment has been minimized.

Copy link
@dianfu

dianfu Nov 21, 2019

Contributor

rename to PYTHON_REQUIREMENTS_INSTALL_DIR?

if (PYTHON_REQUIREMENTS_FILE in os.environ
and PYTHON_REQUIREMENTS_DIR_ENV in os.environ):
requirements_file_path = os.environ[PYTHON_REQUIREMENTS_FILE]
requirements_target_path = os.environ[PYTHON_REQUIREMENTS_DIR_ENV]

This comment has been minimized.

Copy link
@dianfu

dianfu Nov 21, 2019

Contributor

rename to requirements_install_path?

and PYTHON_REQUIREMENTS_DIR_ENV in os.environ):
requirements_file_path = os.environ[PYTHON_REQUIREMENTS_FILE]
requirements_target_path = os.environ[PYTHON_REQUIREMENTS_DIR_ENV]
requirements_dir_path = None

This comment has been minimized.

Copy link
@dianfu

dianfu Nov 21, 2019

Contributor

rename to requirements_cache_path?

}

@VisibleForTesting
void prepareEnvironment() {

This comment has been minimized.

Copy link
@dianfu

dianfu Nov 21, 2019

Contributor

Can this method be removed?

* @throws IllegalArgumentException
*/
private void prepareWorkingDir() throws IOException, IllegalArgumentException {

This comment has been minimized.

Copy link
@dianfu

dianfu Nov 21, 2019

Contributor

remove empty line

public static final String PYTHON_ARCHIVES_MAP = "python.environment.archive-map";
public static final String PYTHON_EXEC = "python.environment.exec";

@Nonnull private Map<String, String> filesInPythonPath;

This comment has been minimized.

Copy link
@dianfu

dianfu Nov 21, 2019

Contributor

final?

}
}

private static boolean isMacOrUnix() {

This comment has been minimized.

Copy link
@dianfu

dianfu Nov 21, 2019

Contributor

Is it possible to use the util OperatingSystem?

/**
* Utils used to extract zip files and try to restore the origin permissions of files.
*/
public class UnzipUtil {

This comment has been minimized.

Copy link
@dianfu

dianfu Nov 21, 2019

Contributor

Is there third party library which could be reused as this requirement seems very common.

This comment has been minimized.

Copy link
@WeiZhong94

WeiZhong94 Nov 21, 2019

Author Contributor

Of course there is third party library which could be reused, but it will introduce additional dependencies. The apache-common-compress library is introduced in flink-core so reuse the classes in it won't introduce any new dependencies.

@WeiZhong94 WeiZhong94 force-pushed the WeiZhong94:FLINK-14019 branch 2 times, most recently from 1ae7d82 to 6074499 Nov 21, 2019
@WeiZhong94

This comment has been minimized.

Copy link
Contributor Author

WeiZhong94 commented Nov 21, 2019

@dianfu Thanks for your review! I have addressed your comments. Please take a look again. :)

python="python"
fi

if [[ "$_PYTHON_WORKING_DIR" != "" ]]; then

This comment has been minimized.

Copy link
@dianfu

dianfu Nov 24, 2019

Contributor

change to "if [[ "$_PYTHON_WORKING_DIR" != "" ]] and [[ "$python" == ${_PYTHON_WORKING_DIR}* ]]]"?

This comment has been minimized.

Copy link
@WeiZhong94

WeiZhong94 Nov 26, 2019

Author Contributor

If we change it as above, sometimes users will not be able to access archives through relative paths because the python working directory is not set as expected.

@@ -94,7 +96,8 @@ def start_test_provision_server():
self.env["FLINK_BOOT_TESTING"] = "1"
self.env["FLINK_LOG_DIR"] = os.path.join(self.env["FLINK_HOME"], "log")

self.tmp_dir = None
self.tmp_dir = tempfile.mkdtemp(str(time.time()))
print("Using %s as the semi_persist_dir." % self.tmp_dir)

This comment has been minimized.

Copy link
@dianfu

dianfu Nov 24, 2019

Contributor

remove this change?

self._dependency_manager.set_python_requirements(requirements_file_path,
requirements_cache_dir)

def add_python_archive(self, archive_path, target_dir):

This comment has been minimized.

Copy link
@dianfu

dianfu Nov 24, 2019

Contributor

Specify a default value for target_dir as it's optional, i.e. target_dir=None?

@@ -883,8 +987,8 @@ def __init__(self, j_tenv, is_blink_planner):
self._j_tenv = j_tenv
super(StreamTableEnvironment, self).__init__(j_tenv, is_blink_planner)

def _get_execution_config(self, filename, schema):

This comment has been minimized.

Copy link
@dianfu

dianfu Nov 24, 2019

Contributor

What's the reason of this change?

This comment has been minimized.

Copy link
@WeiZhong94

WeiZhong94 Nov 26, 2019

Author Contributor

Dependency manager needs execution environment to register files, so we need to implement a "_get_j_env" method to get it, which can completely replace this method. The old implementation also has a few problem and it's a internal method, so I removed it.

break
self._config.remove_config(config_key)

def _register_cached_file(self, config_key, file_path):

This comment has been minimized.

Copy link
@dianfu

dianfu Nov 24, 2019

Contributor

I'm thinking to remove this method and it could call self._j_env.registerCachedFile directly if need to register cache file. What do you think?

* and create ProcessEnvironment object of Beam Fn API. It will be created if the python
* function runner is configured to run python UDF in process mode.
*/
@Internal

This comment has been minimized.

Copy link
@dianfu

dianfu Nov 24, 2019

Contributor

Move PythonEnvironmentManager and ProcessEnvironmentManager to package o.a.f.python.env?

import java.util.stream.Collectors;

/**
* The ProcessEnvironmentManager used to prepare the working dir of python UDF worker

This comment has been minimized.

Copy link
@dianfu

dianfu Nov 24, 2019

Contributor

Rename ProcessEnvironmentManager to ProcessPythonEnvironmentManager?

@@ -198,19 +174,14 @@ public void open() throws Exception {
Struct pipelineOptions = PipelineOptionsTranslation.toProto(portableOptions);

jobBundleFactory = createJobBundleFactory(pipelineOptions);
// The creation of stageBundleFactory depends on the initialized environment manager.

This comment has been minimized.

Copy link
@dianfu

dianfu Nov 24, 2019

Contributor

Move environmentManager.open(); before portableOptions statement?

.getTaskManagerInfo()
.getConfiguration()
.getString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, System.getProperty("log.file"));
String logDirectory = taskManagerLogFile == null ? null : new File(taskManagerLogFile).getParent();

This comment has been minimized.

Copy link
@dianfu

dianfu Nov 24, 2019

Contributor

In which case taskManagerLogFile is null?

This comment has been minimized.

Copy link
@WeiZhong94

WeiZhong94 Nov 26, 2019

Author Contributor

e.g. running IT cases in IDE.

}

@Override
public PythonFunctionRunner<IN> createPythonFunctionRunner() throws IOException {

This comment has been minimized.

Copy link
@dianfu

dianfu Nov 24, 2019

Contributor

The IOException could be removed if createBaseDirectory is done in the open method of ProcessEnvironmentManager.

This comment has been minimized.

Copy link
@WeiZhong94

WeiZhong94 Nov 26, 2019

Author Contributor

The PythonDependencyInfo#create still throws IOException so we can not remove it currently.

@WeiZhong94 WeiZhong94 force-pushed the WeiZhong94:FLINK-14019 branch from 6074499 to e027f28 Nov 26, 2019
@WeiZhong94

This comment has been minimized.

Copy link
Contributor Author

WeiZhong94 commented Nov 26, 2019

@dianfu Thanks for your review again! I have addressed your comments. Please take a look.

@WeiZhong94 WeiZhong94 force-pushed the WeiZhong94:FLINK-14019 branch 2 times, most recently from f56d5d7 to 195f773 Nov 26, 2019
else:
break
os.environ["PYTHONPATH"] = env["PYTHONPATH"]
return 0

This comment has been minimized.

Copy link
@dianfu

dianfu Nov 27, 2019

Contributor

It seems that the return statement can be removed

# since '--prefix' option is only supported for pip 8.0+, so here we fallback to
# use '--install-option' when the pip version is lower than 8.0.0.
if parse_version(pip_version) >= parse_version('8.0.0'):
py_args.extend(

This comment has been minimized.

Copy link
@dianfu

dianfu Nov 27, 2019

Contributor

Could we rename py_args to a meaningful name such as pip_install_commands?

PYTHON_REQUIREMENTS_INSTALL_DIR = "_PYTHON_REQUIREMENTS_INSTALL_DIR"


def pip_install_requirements_process_mode():

This comment has been minimized.

Copy link
@dianfu

dianfu Nov 27, 2019

Contributor

pip_install_requirements_process_mode -> pip_install_requirements?

@@ -94,7 +99,7 @@ def start_test_provision_server():
self.env["FLINK_BOOT_TESTING"] = "1"
self.env["FLINK_LOG_DIR"] = os.path.join(self.env["FLINK_HOME"], "log")

self.tmp_dir = None
self.tmp_dir = tempfile.mkdtemp(str(time.time()))

This comment has been minimized.

Copy link
@dianfu

dianfu Nov 27, 2019

Contributor

Why not use the tmp_dir defined in the PyFlinkTestCase?

JProcessPythonEnvironmentManager = \
get_gateway().jvm.org.apache.flink.python.env.ProcessPythonEnvironmentManager

mock_pyflink_dir = os.path.join(self.tmp_dir, "pyflink")

This comment has been minimized.

Copy link
@dianfu

dianfu Nov 27, 2019

Contributor

Rename mock_pyflink_dir to pyflink_dir?

environmentManager.open();
String retrivalToken = environmentManager.createRetrievalToken();

File retrivalTokenFile = new File(retrivalToken);

This comment has been minimized.

Copy link
@dianfu

dianfu Nov 27, 2019

Contributor

retrivalTokenFile -> retrievalTokenFile

Map<String, String> expected = getBasicExpectedEnv(environmentManager);
expected.put("FLINK_LOG_DIR", "/tmp/log");
assertEquals(expected, env);
environmentManager.close();

This comment has been minimized.

Copy link
@dianfu

dianfu Nov 27, 2019

Contributor

use try finally?

payload.getCommand());
Map<String, String> expectedEnv = getBasicExpectedEnv(environmentManager);
assertEquals(expectedEnv, payload.getEnvMap());
environmentManager.close();

This comment has been minimized.

Copy link
@dianfu

dianfu Nov 27, 2019

Contributor

use try finally?

Map<String, String> systemEnv = new HashMap<>(this.systemEnv);

// add pyflink, py4j and cloudpickle to PYTHONPATH
String internalLibs = Arrays.stream(ResourceUtil.PYTHON_BASIC_DEPENDENCIES)

This comment has been minimized.

Copy link
@dianfu

dianfu Nov 27, 2019

Contributor

Rename PYTHON_BASIC_DEPENDENCIES to BUILT_IN_PYTHON_DEPENDENCIES?

}
}

private static String computeSHA1Checksum(File file) throws IOException, NoSuchAlgorithmException {

This comment has been minimized.

Copy link
@dianfu

dianfu Nov 27, 2019

Contributor

Use org.apache.commons.io.FileUtils.contentEquals()?

@WeiZhong94 WeiZhong94 force-pushed the WeiZhong94:FLINK-14019 branch from 195f773 to 2c0d810 Nov 28, 2019
@WeiZhong94

This comment has been minimized.

Copy link
Contributor Author

WeiZhong94 commented Nov 28, 2019

@dianfu Thanks for your review! I have addressed your comments in the latest commit.

@WeiZhong94 WeiZhong94 force-pushed the WeiZhong94:FLINK-14019 branch 5 times, most recently from 7d00e00 to 5ef0379 Nov 28, 2019
@WeiZhong94 WeiZhong94 force-pushed the WeiZhong94:FLINK-14019 branch from 5ef0379 to 2128064 Dec 3, 2019
@dianfu

This comment has been minimized.

Copy link
Contributor

dianfu commented Dec 3, 2019

@WeiZhong94 Thanks for the update. LGTM.

Copy link
Contributor

hequn8128 left a comment

LGTM. Thanks a lot for the PR @WeiZhong94 and the nice review @dianfu 👍
Merging...

hequn8128 added a commit to hequn8128/flink that referenced this pull request Dec 4, 2019
…cy management.

This closes apache#10017.
hequn8128 added a commit to hequn8128/flink that referenced this pull request Dec 4, 2019
…cy management.

This closes apache#10017.
@hequn8128

This comment has been minimized.

Copy link
Contributor

hequn8128 commented Dec 5, 2019

It seems the failure of Travis does not relate to this pr.

@hequn8128 hequn8128 closed this in 36cb5d4 Dec 5, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants
You can’t perform that action at this time.