Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-30277][python]Allow PYTHONPATH of Python Worker configurable #21770

Closed
wants to merge 1 commit into from

Conversation

Samrat002
Copy link
Contributor

@Samrat002 Samrat002 commented Jan 28, 2023

What is the purpose of the change

Currently, below are the ways Python Worker gets the Python Flink Dependencies.

  1. Worker Node's System Python Path (/usr/local/lib64/python3.7/site-packages)
  2. Client passes the python Dependencies through -pyfs and --pyarch which is localised into PYTHONPATH of Python Worker.
  3. Client passes the requirements through -requirement.txt which gets installed on Worker Node and added into PYTHONPATH of Python Worker.

This change allow PYTHONPATH of Python Worker configurable where Admin/Service provider can install the required python Flink dependencies on a custom path (/usr/lib/pyflink/lib/python3.7/site-packages) on all Worker Nodes and then set the path in the client machine configuration flink-conf.yaml. This way it works without any configurations from the Application Users and also without affecting any other components dependent on System Python Path.

Brief change log

add an option to add in flink conf to pick configurable python path in worker and client.

Verifying this change

In flink-conf.yaml add the following configs:

  1. python.client.executable: python3
  2. python.executable: python3
  3. python.pythonpath: /tmp/PYTHONPATH/lib64/python3.7/site-packages/:/tmp/PYTHONPATH/lib/python3.7/site-packages/

Install PyFlink libraries on Client and Worker nodes

[hadoop@ip-172-1-2-3 flink]$ cat  /etc/flink/conf/req.txt
apache-flink==1.16.0

Install python in all worker nodes

python3 -m pip install --ignore-installed -r /tmp/req.txt --prefix /tmp/PYTHONPATH/

Run a job

flink run -py /usr/lib/flink/examples/python/table/word_count.py

LOG

jobmanager log

obmanager.log:2023-02-23 07:09:30,931 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: python.pythonpath, /tmp/PYTHONPATH/lib64/python3.7/site-packages/:/tmp/PYTHONPATH/lib/python3.7/site-packages/
jobmanager.log:2023-02-23 07:09:33,070 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: python.pythonpath, /tmp/PYTHONPATH/lib64/python3.7/site-packages/:/tmp/PYTHONPATH/lib/python3.7/site-packages/

taskmanager log

taskmanager.log:2023-02-23 07:14:50,549 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: python.pythonpath, /tmp/PYTHONPATH/lib64/python3.7/site-packages/:/tmp/PYTHONPATH/lib/python3.7/site-packages/
taskmanager.log:2023-02-23 07:14:53,978 INFO  org.apache.flink.python.env.AbstractPythonEnvironmentManager [] - PYTHONPATH of python worker: /tmp/PYTHONPATH/lib64/python3.7/site-packages/:/tmp/PYTHONPATH/lib/python3.7/site-packages/
taskmanager.log:2023-02-23 07:14:56,039 INFO  /tmp/PYTHONPATH/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py:84 [] - Logging handler created.
taskmanager.log:2023-02-23 07:14:56,039 INFO  /tmp/PYTHONPATH/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py:107 [] - semi_persistent_directory: /tmp
taskmanager.log:2023-02-23 07:14:56,039 WARN  /tmp/PYTHONPATH/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py:281 [] - No session file found: /tmp/staged/pickled_main_session. Functions defined in __main__ (interactive session) may fail.
taskmanager.log:2023-02-23 07:14:56,039 WARN  /tmp/PYTHONPATH/lib64/python3.7/site-packages/apache_beam/options/pipeline_options.py:335 [] - Discarding unparseable args: ['--options_id=0.0', '--app_name=BeamPythonFunctionRunner']
taskmanager.log:2023-02-23 07:14:56,039 INFO  /tmp/PYTHONPATH/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py:125 [] - Pipeline_options: {'experiments': ['state_cache_size=1000']}
taskmanager.log:2023-02-23 07:14:56,039 INFO  /tmp/PYTHONPATH/lib64/python3.7/site-packages/apache_beam/runners/worker/statecache.py:172 [] - Creating state cache with size 1000
taskmanager.log:2023-02-23 07:14:56,039 INFO  /tmp/PYTHONPATH/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py:164 [] - Creating insecure control channel for localhost:43865.
taskmanager.log:2023-02-23 07:14:56,040 INFO  /tmp/PYTHONPATH/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py:172 [] - Control channel established.
taskmanager.log:2023-02-23 07:14:56,040 INFO  /tmp/PYTHONPATH/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py:215 [] - Initializing SDKHarness with unbounded number of workers.
taskmanager.log:2023-02-23 07:14:56,040 INFO  /tmp/PYTHONPATH/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py:179 [] - Python sdk harness starting.
taskmanager.log:2023-02-23 07:14:56,174 INFO  /tmp/PYTHONPATH/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py:840 [] - Creating insecure state channel for localhost:35489.
taskmanager.log:2023-02-23 07:14:56,174 INFO  /tmp/PYTHONPATH/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py:847 [] - State channel established.
taskmanager.log:2023-02-23 07:14:56,178 INFO  /tmp/PYTHONPATH/lib64/python3.7/site-packages/apache_beam/runners/worker/data_plane.py:750 [] - Creating client data channel for localhost:43911

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no) no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no) no
  • The serializers: (yes / no / don't know) no
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know) don't know
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) no
  • The S3 file system connector: (yes / no / don't know) no

Documentation

  • Does this pull request introduce a new feature? (yes / no) yes
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) docs

@Samrat002 Samrat002 changed the title [DRAFT][FLINK-30277][python]Allow PYTHONPATH of Python Worker configurable [FLINK-30277][python]Allow PYTHONPATH of Python Worker configurable Jan 28, 2023
@flinkbot
Copy link
Collaborator

flinkbot commented Jan 28, 2023

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@Samrat002
Copy link
Contributor Author

Samrat002 commented Feb 9, 2023

@HuangXingBo
Please review this small change in free time .

Copy link
Contributor

@HuangXingBo HuangXingBo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Samrat002 Thanks a lot for the feature. I have left some comments.

if (pythonEnv.pythonPath != null) {
String defaultPythonPath = env.get("PYTHONPATH");
String defaultPythonPath =
config.getOptional(PYTHON_PATH).orElse(env.get("PYTHON_PATH"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
config.getOptional(PYTHON_PATH).orElse(env.get("PYTHON_PATH"));
config.getOptional(PYTHON_PATH).orElse(env.get("PYTHONPATH"));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the PYTHON_PATH configuration work on both the client and the cluster side?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I prefer these logic is in preparePythonEnvironment rather than adding another param ReadableConfig to startPythonProcess

Copy link
Contributor Author

@Samrat002 Samrat002 Feb 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the PYTHON_PATH configuration work on both the client and the cluster side?

Yes this is for both client side and cluster side.

@@ -6,7 +6,7 @@ info:
license:
name: Apache 2.0
url: https://www.apache.org/licenses/LICENSE-2.0.html
version: v1/1.17-SNAPSHOT
version: v1/1.18-SNAPSHOT
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modifications not related to this feature?

Copy link
Contributor Author

@Samrat002 Samrat002 Feb 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is related to this feature change only.

This doc update is generated after running the below command

./mvnw package -Dgenerate-rest-docs -pl flink-docs -am -nsu -DskipTests

@Samrat002
Copy link
Contributor Author

Samrat002 commented Feb 23, 2023

Thank you for initially reviewing the changes. @HuangXingBo
I have made updates in the PR as per review comments, also updated the description section with logs and steps done to verify the changes in a sample cluster.
Please review the changes whenever time .

@Samrat002 Samrat002 force-pushed the FLINK-30277 branch 4 times, most recently from 7684db4 to 29821f3 Compare February 24, 2023 03:59
Copy link
Contributor

@HuangXingBo HuangXingBo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Samrat002 Thanks a lot for the update. I have left some comments.

Copy link
Contributor

@HuangXingBo HuangXingBo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Samrat002 Thanks a lot for the update. Only a comment left which I will update during merging.

Co-authored-by: HuangXingBo <hxbks2ks@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants