-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-22340][PYTHON] Save localProperties in thread.local #24705
Conversation
add to whitelist |
CC @ueshin |
Test build #105775 has finished for PR 24705 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a few comments, but I'm not familiar with this part of the codebase. My main question is: There are other methods in self.ctx._jvm.PythonRDD
which get called from rdd.py. Will those still behave the same way with the changes in context.py? Or will those no longer have the expected jobGroup, local properties, etc.? If the behavior has changed, what's the best way to keep it consistent? CC @ueshin ?
Also, Takuya, do you want to use SPARK-22340 for this or a new JIRA? Thank you!!
@@ -159,7 +159,12 @@ private[spark] object PythonRDD extends Logging { | |||
* @return 2-tuple (as a Java array) with the port number of a local socket which serves the | |||
* data collected from this job, and the secret for authentication. | |||
*/ | |||
def collectAndServe[T](rdd: RDD[T]): Array[Any] = { | |||
def collectAndServe[T]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll let @ueshin judge, but my guess is that, even though this is a "private" API, we'll want to add a new collectAndServe with the 2 arguments, leaving the old one in case 3rd-party libraries use the private API.
run_job(group_A_name, 0) | ||
self.assertFalse(is_job_cancelled[0], "job didn't succeeded.") | ||
|
||
for i in range(num_threads): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd explain what this is testing in a comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should better just clarify the limitation rather than re-implementing get-set local property logic within Python side ..
python/pyspark/rdd.py
Outdated
java_map = MapConverter().convert(self.context.getLocalProperties(), | ||
self.context._gateway._gateway_client) | ||
sock_info = self.ctx._jvm.PythonRDD.collectAndServe( | ||
self._jrdd.rdd() ,java_map) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would that work if we use UDF + count and use TaskContext's local property access?
python/pyspark/rdd.py
Outdated
java_map = MapConverter().convert(self.context.getLocalProperties(), | ||
self.context._gateway._gateway_client) | ||
sock_info = self.ctx._jvm.PythonRDD.collectAndServe( | ||
self._jrdd.rdd() ,java_map) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are multiple actions like toPandas
. If the changes are needed to all of them, it sounds like we're re-implementing the local property logic within Python side.
Test build #105878 has finished for PR 24705 at commit
|
Close this PR now. We will design this more carefully. |
What changes were proposed in this pull request?
threading.local()
inSparkContext
setLocalProperty
,setJobGroup
, andsetJobDescription
getLocalProperties
collect
setLocalProperty
incollectAndServe
How was this patch tested?
Add one unit test in test_rdd.py
Please review https://spark.apache.org/contributing.html before opening a pull request.