Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22340][PYTHON][FOLLOW-UP] Add a better message and improve documentation for pinned thread mode #26588

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
88 changes: 30 additions & 58 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from pyspark.traceback_utils import CallSite, first_spark_call
from pyspark.status import StatusTracker
from pyspark.profiler import ProfilerCollector, BasicProfiler
from pyspark.util import _warn_pin_thread

if sys.version > '3':
xrange = range
Expand Down Expand Up @@ -1008,60 +1009,41 @@ def setJobGroup(self, groupId, description, interruptOnCancel=False):
ensure that the tasks are actually stopped in a timely manner, but is off by default due
to HDFS-1208, where HDFS may respond to Thread.interrupt() by marking nodes as dead.

.. note:: Currently, setting a group ID (set to local properties) with a thread does
not properly work. Internally threads on PVM and JVM are not synced, and JVM thread
can be reused for multiple threads on PVM, which fails to isolate local properties
for each thread on PVM. To work around this, you can set `PYSPARK_PIN_THREAD` to
.. note:: Currently, setting a group ID (set to local properties) with multiple threads
does not properly work. Internally threads on PVM and JVM are not synced, and JVM
thread can be reused for multiple threads on PVM, which fails to isolate local
properties for each thread on PVM.

To work around this, you can set `PYSPARK_PIN_THREAD` to
`'true'` (see SPARK-22340). However, note that it cannot inherit the local properties
from the parent thread although it isolates each thread on PVM and JVM with its own
local properties. To work around this, you should manually copy and set the local
local properties.

To work around this, you should manually copy and set the local
properties from the parent thread to the child thread when you create another thread.
"""
warnings.warn(
"Currently, setting a group ID (set to local properties) with a thread does "
"not properly work. "
"\n"
"Internally threads on PVM and JVM are not synced, and JVM thread can be reused "
"for multiple threads on PVM, which fails to isolate local properties for each "
"thread on PVM. "
"\n"
"To work around this, you can set PYSPARK_PIN_THREAD to true (see SPARK-22340). "
"However, note that it cannot inherit the local properties from the parent thread "
"although it isolates each thread on PVM and JVM with its own local properties. "
"\n"
"To work around this, you should manually copy and set the local properties from "
"the parent thread to the child thread when you create another thread.",
UserWarning)
_warn_pin_thread("setJobGroup")
self._jsc.setJobGroup(groupId, description, interruptOnCancel)

def setLocalProperty(self, key, value):
"""
Set a local property that affects jobs submitted from this thread, such as the
Spark fair scheduler pool.

.. note:: Currently, setting a local property with a thread does
not properly work. Internally threads on PVM and JVM are not synced, and JVM thread
.. note:: Currently, setting a local property with multiple threads does not properly work.
Internally threads on PVM and JVM are not synced, and JVM thread
can be reused for multiple threads on PVM, which fails to isolate local properties
for each thread on PVM. To work around this, you can set `PYSPARK_PIN_THREAD` to
for each thread on PVM.

To work around this, you can set `PYSPARK_PIN_THREAD` to
`'true'` (see SPARK-22340). However, note that it cannot inherit the local properties
from the parent thread although it isolates each thread on PVM and JVM with its own
local properties. To work around this, you should manually copy and set the local
local properties.

To work around this, you should manually copy and set the local
properties from the parent thread to the child thread when you create another thread.
"""
warnings.warn(
"Currently, setting a local property with a thread does not properly work. "
"\n"
"Internally threads on PVM and JVM are not synced, and JVM thread can be reused "
"for multiple threads on PVM, which fails to isolate local properties for each "
"thread on PVM. "
"\n"
"To work around this, you can set PYSPARK_PIN_THREAD to true (see SPARK-22340). "
"However, note that it cannot inherit the local properties from the parent thread "
"although it isolates each thread on PVM and JVM with its own local properties. "
"\n"
"To work around this, you should manually copy and set the local properties from "
"the parent thread to the child thread when you create another thread.",
UserWarning)
_warn_pin_thread("setLocalProperty")
self._jsc.setLocalProperty(key, value)

def getLocalProperty(self, key):
Expand All @@ -1075,30 +1057,20 @@ def setJobDescription(self, value):
"""
Set a human readable description of the current job.

.. note:: Currently, setting a job description (set to local properties) with a thread does
not properly work. Internally threads on PVM and JVM are not synced, and JVM thread
can be reused for multiple threads on PVM, which fails to isolate local properties
for each thread on PVM. To work around this, you can set `PYSPARK_PIN_THREAD` to
.. note:: Currently, setting a job description (set to local properties) with multiple
threads does not properly work. Internally threads on PVM and JVM are not synced,
and JVM thread can be reused for multiple threads on PVM, which fails to isolate
local properties for each thread on PVM.

To work around this, you can set `PYSPARK_PIN_THREAD` to
`'true'` (see SPARK-22340). However, note that it cannot inherit the local properties
from the parent thread although it isolates each thread on PVM and JVM with its own
local properties. To work around this, you should manually copy and set the local
local properties.

To work around this, you should manually copy and set the local
properties from the parent thread to the child thread when you create another thread.
"""
warnings.warn(
"Currently, setting a job description (set to local properties) with a thread does "
"not properly work. "
"\n"
"Internally threads on PVM and JVM are not synced, and JVM thread can be reused "
"for multiple threads on PVM, which fails to isolate local properties for each "
"thread on PVM. "
"\n"
"To work around this, you can set PYSPARK_PIN_THREAD to true (see SPARK-22340). "
"However, note that it cannot inherit the local properties from the parent thread "
"although it isolates each thread on PVM and JVM with its own local properties. "
"\n"
"To work around this, you should manually copy and set the local properties from "
"the parent thread to the child thread when you create another thread.",
UserWarning)
_warn_pin_thread("setJobDescription")
self._jsc.setJobDescription(value)

def sparkUser(self):
Expand Down
29 changes: 29 additions & 0 deletions python/pyspark/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import re
import sys
import traceback
import os
import warnings
import inspect
from py4j.protocol import Py4JJavaError

Expand Down Expand Up @@ -112,6 +114,33 @@ def wrapper(*args, **kwargs):
return wrapper


def _warn_pin_thread(name):
if os.environ.get("PYSPARK_PIN_THREAD", "false").lower() == "true":
msg = (
"PYSPARK_PIN_THREAD feature is enabled. "
"However, note that it cannot inherit the local properties from the parent thread "
"although it isolates each thread on PVM and JVM with its own local properties. "
"\n"
"To work around this, you should manually copy and set the local properties from "
"the parent thread to the child thread when you create another thread.")
else:
msg = (
"Currently, '%s' (set to local properties) with multiple threads does "
"not properly work. "
"\n"
"Internally threads on PVM and JVM are not synced, and JVM thread can be reused "
"for multiple threads on PVM, which fails to isolate local properties for each "
"thread on PVM. "
"\n"
"To work around this, you can set PYSPARK_PIN_THREAD to true (see SPARK-22340). "
"However, note that it cannot inherit the local properties from the parent thread "
"although it isolates each thread on PVM and JVM with its own local properties. "
"\n"
"To work around this, you should manually copy and set the local properties from "
"the parent thread to the child thread when you create another thread." % name)
warnings.warn(msg, UserWarning)


def _print_missing_jar(lib_name, pkg_name, jar_name, spark_version):
print("""
________________________________________________________________________________________________
Expand Down