Skip to content

Commit

Permalink
Add a better message and improve documentation for pinned thread mode
Browse files Browse the repository at this point in the history
  • Loading branch information
HyukjinKwon committed Nov 19, 2019
1 parent 882f54b commit cfc1277
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 61 deletions.
124 changes: 63 additions & 61 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from pyspark.traceback_utils import CallSite, first_spark_call
from pyspark.status import StatusTracker
from pyspark.profiler import ProfilerCollector, BasicProfiler
from pyspark.util import _warn_pin_thread

if sys.version > '3':
xrange = range
Expand Down Expand Up @@ -1008,60 +1009,61 @@ def setJobGroup(self, groupId, description, interruptOnCancel=False):
ensure that the tasks are actually stopped in a timely manner, but is off by default due
to HDFS-1208, where HDFS may respond to Thread.interrupt() by marking nodes as dead.
.. note:: Currently, setting a group ID (set to local properties) with a thread does
not properly work. Internally threads on PVM and JVM are not synced, and JVM thread
can be reused for multiple threads on PVM, which fails to isolate local properties
for each thread on PVM. To work around this, you can set `PYSPARK_PIN_THREAD` to
.. note:: Currently, setting a group ID (set to local properties) with multiple threads
does not properly work. Internally threads on PVM and JVM are not synced, and JVM
thread can be reused for multiple threads on PVM, which fails to isolate local
properties for each thread on PVM.
To work around this, you can set `PYSPARK_PIN_THREAD` to
`'true'` (see SPARK-22340). However, note that it cannot inherit the local properties
from the parent thread although it isolates each thread on PVM and JVM with its own
local properties. To work around this, you should manually copy and set the local
local properties.
To work around this, you should manually copy and set the local
properties from the parent thread to the child thread when you create another thread.
"""
warnings.warn(
"Currently, setting a group ID (set to local properties) with a thread does "
"not properly work. "
"\n"
"Internally threads on PVM and JVM are not synced, and JVM thread can be reused "
"for multiple threads on PVM, which fails to isolate local properties for each "
"thread on PVM. "
"\n"
"To work around this, you can set PYSPARK_PIN_THREAD to true (see SPARK-22340). "
"However, note that it cannot inherit the local properties from the parent thread "
"although it isolates each thread on PVM and JVM with its own local properties. "
"\n"
"To work around this, you should manually copy and set the local properties from "
"the parent thread to the child thread when you create another thread.",
UserWarning)
Workaround class can be written as below (unofficial way):
>>> class CustomThread(threading.Thread):
>>> def __init__(self, sc, target, *args, **kwargs):
>>> properties = sc._jsc.sc().getLocalProperties()
>>> def copy_local_properties(*a, **k):
>>> sc._jsc.sc().setLocalProperties(properties)
>>> return target(*a, **k)
>>> super(CustomThread, self).__init__(
... target=copy_local_properties, *args, **kwargs)
"""
_warn_pin_thread("setJobGroup")
self._jsc.setJobGroup(groupId, description, interruptOnCancel)

def setLocalProperty(self, key, value):
"""
Set a local property that affects jobs submitted from this thread, such as the
Spark fair scheduler pool.
.. note:: Currently, setting a local property with a thread does
not properly work. Internally threads on PVM and JVM are not synced, and JVM thread
.. note:: Currently, setting a local property with multiple threads does not properly work.
Internally threads on PVM and JVM are not synced, and JVM thread
can be reused for multiple threads on PVM, which fails to isolate local properties
for each thread on PVM. To work around this, you can set `PYSPARK_PIN_THREAD` to
for each thread on PVM.
To work around this, you can set `PYSPARK_PIN_THREAD` to
`'true'` (see SPARK-22340). However, note that it cannot inherit the local properties
from the parent thread although it isolates each thread on PVM and JVM with its own
local properties. To work around this, you should manually copy and set the local
local properties.
To work around this, you should manually copy and set the local
properties from the parent thread to the child thread when you create another thread.
"""
warnings.warn(
"Currently, setting a local property with a thread does not properly work. "
"\n"
"Internally threads on PVM and JVM are not synced, and JVM thread can be reused "
"for multiple threads on PVM, which fails to isolate local properties for each "
"thread on PVM. "
"\n"
"To work around this, you can set PYSPARK_PIN_THREAD to true (see SPARK-22340). "
"However, note that it cannot inherit the local properties from the parent thread "
"although it isolates each thread on PVM and JVM with its own local properties. "
"\n"
"To work around this, you should manually copy and set the local properties from "
"the parent thread to the child thread when you create another thread.",
UserWarning)
Workaround class can be written as below (unofficial way):
>>> class CustomThread(threading.Thread):
>>> def __init__(self, sc, target, *args, **kwargs):
>>> properties = sc._jsc.sc().getLocalProperties()
>>> def copy_local_properties(*a, **k):
>>> sc._jsc.sc().setLocalProperties(properties)
>>> return target(*a, **k)
>>> super(CustomThread, self).__init__(
... target=copy_local_properties, *args, **kwargs)
"""
_warn_pin_thread("setLocalProperty")
self._jsc.setLocalProperty(key, value)

def getLocalProperty(self, key):
Expand All @@ -1075,30 +1077,30 @@ def setJobDescription(self, value):
"""
Set a human readable description of the current job.
.. note:: Currently, setting a job description (set to local properties) with a thread does
not properly work. Internally threads on PVM and JVM are not synced, and JVM thread
can be reused for multiple threads on PVM, which fails to isolate local properties
for each thread on PVM. To work around this, you can set `PYSPARK_PIN_THREAD` to
.. note:: Currently, setting a job description (set to local properties) with multiple
threads does not properly work. Internally threads on PVM and JVM are not synced,
and JVM thread can be reused for multiple threads on PVM, which fails to isolate
local properties for each thread on PVM.
To work around this, you can set `PYSPARK_PIN_THREAD` to
`'true'` (see SPARK-22340). However, note that it cannot inherit the local properties
from the parent thread although it isolates each thread on PVM and JVM with its own
local properties. To work around this, you should manually copy and set the local
local properties.
To work around this, you should manually copy and set the local
properties from the parent thread to the child thread when you create another thread.
"""
warnings.warn(
"Currently, setting a job description (set to local properties) with a thread does "
"not properly work. "
"\n"
"Internally threads on PVM and JVM are not synced, and JVM thread can be reused "
"for multiple threads on PVM, which fails to isolate local properties for each "
"thread on PVM. "
"\n"
"To work around this, you can set PYSPARK_PIN_THREAD to true (see SPARK-22340). "
"However, note that it cannot inherit the local properties from the parent thread "
"although it isolates each thread on PVM and JVM with its own local properties. "
"\n"
"To work around this, you should manually copy and set the local properties from "
"the parent thread to the child thread when you create another thread.",
UserWarning)
Workaround class can be written as below (unofficial way):
>>> class CustomThread(threading.Thread):
>>> def __init__(self, sc, target, *args, **kwargs):
>>> properties = sc._jsc.sc().getLocalProperties()
>>> def copy_local_properties(*a, **k):
>>> sc._jsc.sc().setLocalProperties(properties)
>>> return target(*a, **k)
>>> super(CustomThread, self).__init__(
... target=copy_local_properties, *args, **kwargs)
"""
_warn_pin_thread("setJobDescription")
self._jsc.setJobDescription(value)

def sparkUser(self):
Expand Down
29 changes: 29 additions & 0 deletions python/pyspark/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import re
import sys
import traceback
import os
import warnings
import inspect
from py4j.protocol import Py4JJavaError

Expand Down Expand Up @@ -112,6 +114,33 @@ def wrapper(*args, **kwargs):
return wrapper


def _warn_pin_thread(name):
if os.environ.get("PYSPARK_PIN_THREAD", "false").lower() == "true":
msg = (
"PYSPARK_PIN_THREAD feature is enabled. "
"However, note that it cannot inherit the local properties from the parent thread "
"although it isolates each thread on PVM and JVM with its own local properties. "
"\n"
"To work around this, you should manually copy and set the local properties from "
"the parent thread to the child thread when you create another thread.")
else:
msg = (
"Currently, '%s' (set to local properties) with multiple threads does "
"not properly work. "
"\n"
"Internally threads on PVM and JVM are not synced, and JVM thread can be reused "
"for multiple threads on PVM, which fails to isolate local properties for each "
"thread on PVM. "
"\n"
"To work around this, you can set PYSPARK_PIN_THREAD to true (see SPARK-22340). "
"However, note that it cannot inherit the local properties from the parent thread "
"although it isolates each thread on PVM and JVM with its own local properties. "
"\n"
"To work around this, you should manually copy and set the local properties from "
"the parent thread to the child thread when you create another thread." % name)
warnings.warn(msg, UserWarning)


def _print_missing_jar(lib_name, pkg_name, jar_name, spark_version):
print("""
________________________________________________________________________________________________
Expand Down

0 comments on commit cfc1277

Please sign in to comment.