Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-19097][python] Support add_jars() for Python StreamExecutionEnvironment. #13292

Closed

Conversation

shuiqiangchen
Copy link
Contributor

What is the purpose of the change

Add add_jars() interface in Python StreamExecutionEnvironment to enable users to specify jar dependencies in their Python DataStream Job.

Brief change log

  • Added StreamExecutionEnvironment.add_jars() interface.

Verifying this change

This pull request has been tested by test_add_jars() in test_stream_execution_environment.py.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): ( no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? ( not documented)

@flinkbot
Copy link
Collaborator

flinkbot commented Sep 1, 2020

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 81162c9 (Tue Sep 01 07:18:20 UTC 2020)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!
  • This pull request references an unassigned Jira ticket. According to the code contribution guide, tickets need to be assigned before starting with the implementation work.

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Sep 1, 2020

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@HuangXingBo HuangXingBo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shuiqiangchen Thanks a lot for the PR. I have a question is why not add two methods that add_jars for pipeline.jars and add_class_paths for pipeline.classpaths

jvm = get_gateway().jvm
jars_key = jvm.org.apache.flink.configuration.PipelineOptions.JARS.key()
classpaths_key = jvm.org.apache.flink.configuration.PipelineOptions.CLASSPATHS.key()
if key in [jars_key, classpaths_key]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Raise a exception if the key is not pipeline.jars or pipeline.classpaths

@@ -553,6 +553,29 @@ def set_python_executable(self, python_exec: str):
.getEnvironmentConfig(self._j_stream_execution_environment)
env_config.setString(jvm.PythonOptions.PYTHON_EXECUTABLE.key(), python_exec)

def add_jars(self, key: str, jars_path: str):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not add two methods that add_jars for pipeline.jars and add_class_paths for pipeline.classpaths

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it will be more intuitive if we separate it into two interfaces named add_jars() and add_classpaths().

@shuiqiangchen
Copy link
Contributor Author

Hi @HuangXingBo , thanks for your comments, I have updated the PR according to your suggestions, please have look at it.

Copy link
Contributor

@HuangXingBo HuangXingBo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shuiqiangchen Thanks a lot for the update. It looks better now. I only left a minor comments.

@@ -553,6 +553,34 @@ def set_python_executable(self, python_exec: str):
.getEnvironmentConfig(self._j_stream_execution_environment)
env_config.setString(jvm.PythonOptions.PYTHON_EXECUTABLE.key(), python_exec)

def add_jars(self, jars_path: str):
"""
Adds a list of jar files that contain the user-defined function (UDF) classes and all
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think add_jars is not only used in udf situations, maybe you need to change the annotation

def add_classpaths(self, classpaths: str):
"""
Adds a list of URLs that are added to the classpath of each user code classloader of the
program. Paths must specify a protocol (e.g. file://) and be accessible on all nodes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
program. Paths must specify a protocol (e.g. file://) and be accessible on all nodes
program. Paths must specify a protocol (e.g. file://) and be accessible by all nodes

deserialization_schema = JsonRowDeserializationSchema.builder() \
.type_info(type_info=type_info).build()

# Will get a ClassNotFoundException if not add the kafka connector into the pipeline
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Will get a ClassNotFoundException if not add the kafka connector into the pipeline
# It will raise a ClassNotFoundException if the kafka connector is not added into the pipeline

Copy link
Contributor

@dianfu dianfu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shuiqiangchen Thanks for the PR. I notice that we have introduced two public API in this PR. Could you explain why not use configuration.set_string('pipeline.jars', 'xxx') just as the Python Table API?

@shuiqiangchen
Copy link
Contributor Author

shuiqiangchen commented Sep 4, 2020

Hi @dianfu Thank you for your comments. Currently, users are not able to get/set Configurations for a StreamExecutionEnvironment since the StreamExecutionEnvironment.getConfiguration() is a protected method.
Here we have two options:

  1. Making the StreamExecutionEnvironment.getConfiguration() public in Python DataStream API.
  2. Add these two new interface that the Java StreamExecutionEnvironment does not have, this is actually a bit controversial.

Copy link
Contributor

@dianfu dianfu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shuiqiangchen Thanks for the explanation. It makes sense to me.

@@ -553,6 +553,33 @@ def set_python_executable(self, python_exec: str):
.getEnvironmentConfig(self._j_stream_execution_environment)
env_config.setString(jvm.PythonOptions.PYTHON_EXECUTABLE.key(), python_exec)

def add_jars(self, jars_path: str):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it makes sense to change the interface a bit as following?

Suggested change
def add_jars(self, jars_path: str):
def add_jars(self, *jars: str):

Besides, we should also consider cases where users may call add_jars multiple times.

.getEnvironmentConfig(self._j_stream_execution_environment)
env_config.setString(jars_key, jars_path)

def add_classpaths(self, classpaths: str):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@@ -553,6 +553,45 @@ def set_python_executable(self, python_exec: str):
.getEnvironmentConfig(self._j_stream_execution_environment)
env_config.setString(jvm.PythonOptions.PYTHON_EXECUTABLE.key(), python_exec)

def add_jars(self, *jars_path: List[str]):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the signature should be:

Suggested change
def add_jars(self, *jars_path: List[str]):
def add_jars(self, *jars_path: str):

What do you think?

jars_key = jvm.org.apache.flink.configuration.PipelineOptions.JARS.key()
env_config = jvm.org.apache.flink.python.util.PythonConfigUtil \
.getEnvironmentConfig(self._j_stream_execution_environment)
set_jar_paths = env_config.getString(jars_key, None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not as following?

Suggested change
set_jar_paths = env_config.getString(jars_key, None)
set_jar_paths = env_config.getString(jars_key, '')

if set_jar_paths is None:
set_jar_paths = ''
jars_path = jvm.PythonDependencyUtils.FILE_DELIMITER.join(*jars_path)
python_files = jvm.PythonDependencyUtils.FILE_DELIMITER.join([set_jar_paths,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why named python_files?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, we could reuse the 'jars_path'

"""
Adds a list of jar files that will be uploaded to the cluster and referenced by the job.

:param jars_path: Path of jars that delimited by ';'.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the comment isn't correct and should also be updated.

@shuiqiangchen
Copy link
Contributor Author

@dianfu Thank you for your comments, I have updated the PR according to your suggestions, please have a look.

@dianfu dianfu closed this in 71ffab6 Sep 7, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants