Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27234][SS][PYTHON] Use InheritableThreadLocal for current epoch in EpochTracker (to support Python UDFs) #24946

Closed
wants to merge 3 commits into from

Conversation

HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented Jun 24, 2019

What changes were proposed in this pull request?

This PR proposes to use InheritableThreadLocal instead of ThreadLocal for current epoch in EpochTracker. Python UDF needs threads to write out to and read it from Python processes and when there are new threads, previously set epoch is lost.

After this PR, Python UDFs can be used at Structured Streaming with the continuous mode.

How was this patch tested?

The test cases were written on the top of #24945.
Unit tests were added.

Manual tests.

@HyukjinKwon

This comment has been minimized.

@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented Jun 24, 2019

@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented Jun 24, 2019

There's similar case that uses InheritableThreadLocal, input_file_name(). cc @cloud-fan as well.

@HyukjinKwon HyukjinKwon changed the title [SPARK-27234][SS][PYTHON] Use InheritableThreadLocal for current epoch in EpochTracker [SPARK-27234][SS][PYTHON] Use InheritableThreadLocal for current epoch in EpochTracker (to support Python UDFs) Jun 24, 2019
@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@mengxr
Copy link
Contributor

mengxr commented Jun 24, 2019

@HyukjinKwon How does SCALAR_ITER Pandas UDF work with continuous processing after this PR? Do we only run initialization code once per task?

@HyukjinKwon
Copy link
Member Author

It works after this PR as below:

from pyspark.sql.functions import col, pandas_udf, PandasUDFType

@pandas_udf("int", PandasUDFType.SCALAR_ITER)
def the_udf(iterator):
    for col1_batch in iterator:
        yield col1_batch

spark \
    .readStream \
    .format("rate") \
    .load() \
    .withColumn("foo", the_udf(col("value"))) \
    .writeStream \
    .format("console") \
    .trigger(continuous="5 second").start() 

Before:

...
Caused by: java.util.NoSuchElementException: None.get
	at scala.None$.get(Option.scala:366)
	at scala.None$.get(Option.scala:364)
	at org.apache.spark.sql.execution.streaming.continuous.ContinuousQueuedDataReader.next(ContinuousQueuedDataReader.scala:116)
	at org.apache.spark.sql.execution.streaming.continuous.ContinuousDataSourceRDD$$anon$1.getNext(ContinuousDataSourceRDD.scala:93)
	at org.apache.spark.sql.execution.streaming.continuous.ContinuousDataSourceRDD$$anon$1.getNext(ContinuousDataSourceRDD.scala:91)
	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:701)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
...

After:

...
-------------------------------------------
Batch: 3
-------------------------------------------
+--------------------+-----+---+
|           timestamp|value|foo|
+--------------------+-----+---+
|2019-06-25 09:05:...|    9|  9|
|2019-06-25 09:05:...|    2|  2|
|2019-06-25 09:05:...|   10| 10|
|2019-06-25 09:05:...|   11| 11|
|2019-06-25 09:05:...|    5|  5|
|2019-06-25 09:05:...|    3|  3|
|2019-06-25 09:05:...|    1|  1|
|2019-06-25 09:05:...|    6|  6|
|2019-06-25 09:05:...|    4|  4|
|2019-06-25 09:05:...|    8|  8|
|2019-06-25 09:05:...|    7|  7|
|2019-06-25 09:05:...|    0|  0|
+--------------------+-----+---+
...

Because each epoch couldn't be referred in writer thread (to Python process). Each UDF will be executed each execution per each epoch.

@HyukjinKwon
Copy link
Member Author

#24945 is merged. Let me rebase.

@HyukjinKwon HyukjinKwon force-pushed the SPARK-27234 branch 2 times, most recently from e3d9908 to aa34d9e Compare June 25, 2019 03:03
@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

Copy link
Member

@zsxwing zsxwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we handle other thread local variables that are not InheritableThreadLocal, such as org.apache.spark.TaskContext.get?

@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented Jun 25, 2019

In case of org.apache.spark.TaskContext, looks it's manually set for Python's writer thread:

and the information are (de)serialized into Python worker, TaskContext instance is constructed and used in Python worker later.

I think it works too but I thought it's better to isolate this logic out of Python. Python runners are in core and this code is in SQL FWIW. We should move the codes around to mimic this approach.

@SparkQA

This comment has been minimized.

@HyukjinKwon
Copy link
Member Author

gentle ping .. :-) ..

@SparkQA

This comment has been minimized.

@SparkQA
Copy link

SparkQA commented Jul 22, 2019

Test build #108000 has finished for PR 24946 at commit b103acc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Jul 22, 2019

LGTM

@HyukjinKwon
Copy link
Member Author

Thanks for review and approval, @zsxwing.

Just FYI, there's similar fix and discussion going on at #24958.

@HyukjinKwon
Copy link
Member Author

Merged to master.

Thanks all.

@aurorazl
Copy link

@HyukjinKwon Was problem fixed in spark 3.0? What can i do to fix it in spark2.4.3?

@HyukjinKwon
Copy link
Member Author

This fix will be included in Apache Spark 3.0. I think you should upgrade it later when this is released.

@dongjoon-hyun
Copy link
Member

In the dev mailing list, this issue is discussed for 2.4.4. I'll follow the decision from @HyukjinKwon and @zsxwing .

@HyukjinKwon
Copy link
Member Author

Am fine with backporting but @zsxwing WDYT?

@zsxwing
Copy link
Member

zsxwing commented Aug 14, 2019

I'm fine with backporting this small fix.

@dongjoon-hyun
Copy link
Member

Thank you, @zsxwing and @HyukjinKwon .
Could you make a backporting PR against branch-2.4, @HyukjinKwon ?

@HyukjinKwon
Copy link
Member Author

Yup. BTW IntegratedUDFTestUtils doesn't exist in master branch. I will have to manually test and backport.

HyukjinKwon added a commit to HyukjinKwon/spark that referenced this pull request Aug 15, 2019
…h in EpochTracker (to support Python UDFs)

This PR proposes to use `InheritableThreadLocal` instead of `ThreadLocal` for current epoch in `EpochTracker`. Python UDF needs threads to write out to and read it from Python processes and when there are new threads, previously set epoch is lost.

After this PR, Python UDFs can be used at Structured Streaming with the continuous mode.

The test cases were written on the top of apache#24945.
Unit tests were added.

Manual tests.

Closes apache#24946 from HyukjinKwon/SPARK-27234.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
@HyukjinKwon HyukjinKwon deleted the SPARK-27234 branch March 3, 2020 01:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants