-
Notifications
You must be signed in to change notification settings - Fork 80
Description
Search before asking
- I searched in the issues and found nothing similar.
Description
Summary
The official Flink Agents example, workflow_single_agent_example.py, fails in YARN Application Cluster mode with a ModuleNotFoundError: No module named 'encodings', indicating an issue with the embedded Python interpreter's ability to find its standard library.
Crucially:
- The job runs successfully in Standalone Flink Cluster mode using the exact same Python virtual environment and code.
- A standard PyFlink DataStream example (
word_count.py) also runs successfully in YARN mode. - The YARN NodeManager machines DO NOT have a system-wide Python installation. The entire Python interpreter, necessary libraries, and dependencies are all contained within the archived virtual environment (
venv.tar.gz).
This confirms that the environment setup is entirely dependent on the archived file, and the failure occurs when the Pemja-embedded interpreter attempts to load this self-contained environment.
Error Log Snippet (taskmanager.log)
2025-11-02 07:46:21,055 INFO org.apache.beam.runners.fnexecution.data.GrpcDataService [] - Beam Fn Data client connected.
2025-11-02 07:46:21,053 WARN org.apache.flink.runtime.taskmanager.Task [] - action-execute-operator -> Map, Map -> Sink: Print to Std. Out (1/1)#0 (0b196afe82a3bfbc08fa3a8d12240d81_90bea66de1c231edf33913ecd54406c1_0_0) switched from INITIALIZING to FAILED with failure cause:
java.lang.RuntimeException: Failed to find libpython
at pemja.utils.CommonUtils.getPythonLibrary(CommonUtils.java:161) ~[flink-python-1.20.3.jar:1.20.3]
at pemja.utils.CommonUtils.loadPython(CommonUtils.java:44) ~[flink-python-1.20.3.jar:1.20.3]
at pemja.core.PythonInterpreter$MainInterpreter.initialize(PythonInterpreter.java:365) ~[flink-python-1.20.3.jar:1.20.3]
at pemja.core.PythonInterpreter.initialize(PythonInterpreter.java:144) ~[flink-python-1.20.3.jar:1.20.3]
at pemja.core.PythonInterpreter.<init>(PythonInterpreter.java:45) ~[flink-python-1.20.3.jar:1.20.3]
at org.apache.flink.agents.runtime.env.EmbeddedPythonEnvironment.getInterpreter(EmbeddedPythonEnvironment.java:45) ~[flink-agents-dist-0.1.0.jar:0.1.0]
at org.apache.flink.agents.runtime.python.utils.PythonActionExecutor.open(PythonActionExecutor.java:80) ~[flink-agents-dist-0.1.0.jar:0.1.0]
at org.apache.flink.agents.runtime.operator.ActionExecutionOperator.initPythonActionExecutor(ActionExecutionOperator.java:504) ~[flink-agents-dist-0.1.0.jar:0.1.0]
at org.apache.flink.agents.runtime.operator.ActionExecutionOperator.open(ActionExecutionOperator.java:247) ~[flink-agents-dist-0.1.0.jar:0.1.0]
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107) ~[flink-dist-1.20.3.jar:1.20.3]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858) ~[flink-dist-1.20.3.jar:1.20.3]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812) ~[flink-dist-1.20.3.jar:1.20.3]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) ~[flink-dist-1.20.3.jar:1.20.3]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812) ~[flink-dist-1.20.3.jar:1.20.3]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771) ~[flink-dist-1.20.3.jar:1.20.3]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970) ~[flink-dist-1.20.3.jar:1.20.3]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939) [flink-dist-1.20.3.jar:1.20.3]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) [flink-dist-1.20.3.jar:1.20.3]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) [flink-dist-1.20.3.jar:1.20.3]
at java.lang.Thread.run(Thread.java:834) [?:?]
Caused by: java.io.IOException: Failed to execute the command: /tmp/hadoop-hadoop/nm-local-dir/usercache/hadoop/appcache/application_1762069162598_0003/python-dist-6062dfd1-ba6b-4ea0-a82a-a45d53469610/python-archives/venv.tar.gz/bin/python -c from find_libpython import find_libpython;print(find_libpython())
output: Python path configuration:
PYTHONHOME = 'venv.tar.gz'
PYTHONPATH = (not set)
program name = '/tmp/hadoop-hadoop/nm-local-dir/usercache/hadoop/appcache/application_1762069162598_0003/python-dist-6062dfd1-ba6b-4ea0-a82a-a45d53469610/python-archives/venv.tar.gz/bin/python'
isolated = 0
environment = 1
user site = 1
import site = 1
sys._base_executable = '/tmp/hadoop-hadoop/nm-local-dir/usercache/hadoop/appcache/application_1762069162598_0003/python-dist-6062dfd1-ba6b-4ea0-a82a-a45d53469610/python-archives/venv.tar.gz/bin/python'
sys.base_prefix = 'venv.tar.gz'
sys.base_exec_prefix = 'venv.tar.gz'
sys.platlibdir = 'lib'
sys.executable = '/tmp/hadoop-hadoop/nm-local-dir/usercache/hadoop/appcache/application_1762069162598_0003/python-dist-6062dfd1-ba6b-4ea0-a82a-a45d53469610/python-archives/venv.tar.gz/bin/python'
sys.prefix = 'venv.tar.gz'
sys.exec_prefix = 'venv.tar.gz'
sys.path = [
'venv.tar.gz/lib/python310.zip',
'venv.tar.gz/lib/python3.10',
'venv.tar.gz/lib/python3.10/lib-dynload',
]
Fatal Python error: init_fs_encoding: failed to get the Python codec of the filesystem encoding
Python runtime state: core initialized
ModuleNotFoundError: No module named 'encodings'
Current thread 0x00007d1077b7f740 (most recent call first):
<no Python frame>
at pemja.utils.CommonUtils.execute(CommonUtils.java:180) ~[flink-python-1.20.3.jar:1.20.3]
at pemja.utils.CommonUtils.getPythonLibrary(CommonUtils.java:157) ~[flink-python-1.20.3.jar:1.20.3]
... 19 more
-
Behavior When Not Setting PYTHONHOME
If the parameter -Dcontainerized.taskmanager.env.PYTHONHOME=venv.tar.gz is removed, the Task Manager fails with the exact same Python core error (printed to taskmanager.err), only without the detailed Java stack trace
This confirms the core issue lies in the initialization of the embedded Python interpreter from the archived environment, regardless of whether PYTHONHOME is manually specified. -
Conclusion
The Pemja-embedded Python interpreter (used by Flink Agents) is failing to correctly locate the standard library (encodings module) of the self-contained, Conda-created virtual environment when deployed via YARN, even though the standard PyFlink Python Worker process runs successfully. This suggests a path resolution failure specific to how Pemja initializes the embedded environment on YARN.
How to reproduce
./flink-1.20.3/bin/flink run-application -t yarn-application \
-Dcontainerized.master.env.JAVA_HOME=/usr/lib/jvm/jre-11 \
-Dcontainerized.taskmanager.env.JAVA_HOME=/usr/lib/jvm/jre-11 \
-Djobmanager.memory.process.size=1024m \
-Dcontainerized.taskmanager.env.PYTHONHOME=venv.tar.gz \
-Dtaskmanager.memory.process.size=1024m \
-Dyarn.application.name=flink-agents-workflow \
-Dyarn.ship-files=./shipfiles \
-pyarch shipfiles/venv.tar.gz \
-pyclientexec venv.tar.gz/bin/python \
-pyexec venv.tar.gz/bin/python \
-pyfs shipfiles \
-pym workflow_single_agent_exampleVersion and environment
- Flink Version: 1.20.3
- Flink Agents Version: 0.1.0
- Deployment Mode: YARN Application Cluster (
-t yarn-application) - Python Version (in venv): Python 3.10
- Python Virtual Environment: Created and archived using Conda (
venv.tar.gz) - YARN Setup: NodeManagers lack a system Python; environment is self-contained in the archive.
Are you willing to submit a PR?
- I'm willing to submit a PR!