[SPARK-27711][CORE] Unset InputFileBlockHolder at the end of tasks#24605
[SPARK-27711][CORE] Unset InputFileBlockHolder at the end of tasks#24605jose-torres wants to merge 6 commits intoapache:masterfrom
Conversation
|
LGTM |
|
ok to test |
|
Test build #105388 has finished for PR 24605 at commit
|
|
Test build #105389 has finished for PR 24605 at commit
|
|
Test build #105390 has finished for PR 24605 at commit
|
|
Test build #105391 has finished for PR 24605 at commit
|
|
Test build #105392 has finished for PR 24605 at commit
|
| import sys | ||
|
|
||
| from pyspark.sql import Row | ||
| from pyspark.sql.types import * |
There was a problem hiding this comment.
Can we avoid wlidcard import
| # [SC-12160]: if everything was properly reset after the last job, this should return | ||
| # empty string rather than the file read in the last job. | ||
| for result in results: | ||
| assert(result[0] == '') |
There was a problem hiding this comment.
we can stick to self.assertEqual for a better message.
| results = non_file_df.collect() | ||
| self.assertTrue(len(results) == 100) | ||
|
|
||
| # [SC-12160]: if everything was properly reset after the last job, this should return |
There was a problem hiding this comment.
+1 for @HyukjinKwon 's comment. Is this the internal issue tracker ID?
Could you update the PR, @jose-torres ?
| [Row(name=u'Tom'), Row(name=u'Alice'), Row(name=None)]) | ||
|
|
||
| def test_input_file_name_reset_for_rdd(self): | ||
| from pyspark.sql.functions import udf, input_file_name |
There was a problem hiding this comment.
I think we can just import on the top
| def test_input_file_name_reset_for_rdd(self): | ||
| from pyspark.sql.functions import udf, input_file_name | ||
| rdd = self.sc.textFile('python/test_support/hello/hello.txt').map(lambda x: {'data': x}) | ||
| df = self.spark.createDataFrame(rdd, StructType([StructField('data', StringType(), True)])) |
There was a problem hiding this comment.
actually, you don't have to import types:
spark.createDataFrame(rdd, "data STRING")
| df = self.spark.createDataFrame(rdd, StructType([StructField('data', StringType(), True)])) | ||
| df.select(input_file_name().alias('file')).collect() | ||
|
|
||
| non_file_df = self.spark.range(0, 100, 1, 100).select(input_file_name().alias('file')) |
There was a problem hiding this comment.
seems like we don;t need an alias file.
and why don't we just use spark.range(100)?
HyukjinKwon
left a comment
There was a problem hiding this comment.
Some nits. looks good to me too
|
Pushed comments. Sorry it took so long, I was on a trip. |
|
Test build #105702 has finished for PR 24605 at commit
|
|
Thanks! Merged to master, please manually backport to 2.4! |
Unset InputFileBlockHolder at the end of tasks to stop the file name from leaking over to other tasks in the same thread. This happens in particular in Pyspark because of its complex threading model. new pyspark test Closes apache#24605 from jose-torres/fix254. Authored-by: Jose Torres <torres.joseph.f+github@gmail.com> Signed-off-by: Xingbo Jiang <xingbo.jiang@databricks.com>
What changes were proposed in this pull request?
Unset InputFileBlockHolder at the end of tasks to stop the file name from leaking over to other tasks in the same thread. This happens in particular in Pyspark because of its complex threading model.
How was this patch tested?
new pyspark test