Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19223][SQL][PySpark] Fix InputFileBlockHolder for datasources which are based on HadoopRDD or NewHadoopRDD #16585

Closed
wants to merge 5 commits into from

Conversation

viirya
Copy link
Member

@viirya viirya commented Jan 14, 2017

What changes were proposed in this pull request?

For some datasources which are based on HadoopRDD or NewHadoopRDD, such as spark-xml, InputFileBlockHolder doesn't work with Python UDF.

The method to reproduce it is, running the following codes with bin/pyspark --packages com.databricks:spark-xml_2.11:0.4.1:

from pyspark.sql.functions import udf,input_file_name
from pyspark.sql.types import StringType
from pyspark.sql import SparkSession

def filename(path):
    return path

session = SparkSession.builder.appName('APP').getOrCreate()

session.udf.register('sameText', filename)
sameText = udf(filename, StringType())

df = session.read.format('xml').load('a.xml', rowTag='root').select('*', input_file_name().alias('file'))
df.select('file').show() # works
df.select(sameText(df['file'])).show()   # returns empty content

The issue is because in HadoopRDD and NewHadoopRDD we set the file block's info in InputFileBlockHolder before the returned iterator begins consuming. InputFileBlockHolder will record this info into thread local variable. When running Python UDF in batch, we set up another thread to consume the iterator from child plan's output rdd, so we can't read the info back in another thread.

To fix this, we have to set the info in InputFileBlockHolder after the iterator begins consuming. So the info can be read in correct thread.

How was this patch tested?

Manual test with above example codes for spark-xml package on pyspark: bin/pyspark --packages com.databricks:spark-xml_2.11:0.4.1.

Added pyspark test.

Please review http://spark.apache.org/contributing.html before opening a pull request.

@SparkQA
Copy link

SparkQA commented Jan 14, 2017

Test build #71366 has started for PR 16585 at commit 5fd215f.

@viirya
Copy link
Member Author

viirya commented Jan 14, 2017

retest this please.

@SparkQA
Copy link

SparkQA commented Jan 14, 2017

Test build #71368 has finished for PR 16585 at commit 5fd215f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Jan 14, 2017

retest this please.

@SparkQA
Copy link

SparkQA commented Jan 14, 2017

Test build #71370 has finished for PR 16585 at commit 5fd215f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Jan 15, 2017

ping @rxin @cloud-fan

@rxin
Copy link
Contributor

rxin commented Jan 16, 2017

should the proper fix be the python thread transfers the proper information over?

@viirya
Copy link
Member Author

viirya commented Jan 17, 2017

@rxin Thanks for looking at this. I think the simplest way to transfer the info is using InheritableThreadLocal to replace ThreadLocal in InputFileBlockHolder. As I tested, it works. What do you think? It is ok for you?

@cloud-fan
Copy link
Contributor

SGTM

@viirya
Copy link
Member Author

viirya commented Jan 17, 2017

@cloud-fan SGTM is for current approach or InheritableThreadLocal?

@cloud-fan
Copy link
Contributor

InheritableThreadLocal

@viirya viirya force-pushed the fix-inputfileblock-hadooprdd branch from e2d872c to 1563e03 Compare January 17, 2017 05:33
@rxin
Copy link
Contributor

rxin commented Jan 17, 2017

BTW please add a test case for this. Thanks.

@SparkQA
Copy link

SparkQA commented Jan 17, 2017

Test build #71474 has finished for PR 16585 at commit e2d872c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 17, 2017

Test build #71475 has finished for PR 16585 at commit 1563e03.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya viirya force-pushed the fix-inputfileblock-hadooprdd branch 2 times, most recently from 157f70f to d7c05d2 Compare January 17, 2017 14:15
import org.apache.spark.{SparkContext, SparkFunSuite}
import org.apache.spark.util.Utils

class InputFileBlockHolderSuite extends SparkFunSuite {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we just write a pyspark test?

Copy link
Member Author

@viirya viirya Jan 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't find a proper built-in datasource for testing it. I think all file-based datasources are based on FileFormat. They don't use HadoopRDD/NewHadoopRDD. The pyspark test shown in the description is using spark-xml package to test manually.

Copy link
Member Author

@viirya viirya Jan 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've written a pyspark test which directly reads file and produces HadoopRDD/NewHadoopRDD based dataframe. So this scala test is removed.

@SparkQA
Copy link

SparkQA commented Jan 17, 2017

Test build #71511 has finished for PR 16585 at commit d7c05d2.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class InputFileBlockHolderSuite extends SparkFunSuite

@SparkQA
Copy link

SparkQA commented Jan 17, 2017

Test build #71509 has finished for PR 16585 at commit b1bfc50.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class InputFileBlockHolderSuite extends SparkFunSuite with LocalSparkContext

@SparkQA
Copy link

SparkQA commented Jan 17, 2017

Test build #71510 has finished for PR 16585 at commit 157f70f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class InputFileBlockHolderSuite extends SparkFunSuite with LocalSparkContext

@viirya viirya force-pushed the fix-inputfileblock-hadooprdd branch from d7c05d2 to 8380617 Compare January 17, 2017 23:20
@SparkQA
Copy link

SparkQA commented Jan 17, 2017

Test build #71540 has finished for PR 16585 at commit 8380617.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class InputFileBlockHolderSuite extends SparkFunSuite

@SparkQA
Copy link

SparkQA commented Jan 18, 2017

Test build #71559 has finished for PR 16585 at commit 3444499.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya viirya force-pushed the fix-inputfileblock-hadooprdd branch from 3444499 to 2ce65cb Compare January 18, 2017 03:14
@SparkQA
Copy link

SparkQA commented Jan 18, 2017

Test build #71561 has finished for PR 16585 at commit 2ce65cb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

def filename(path):
return path

self.spark.udf.register('sameText', filename)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where do we call this registered function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh. wrongly copied.

'org.apache.hadoop.io.Text')

df2 = self.spark.read.json(rdd2).select(input_file_name().alias('file'))
row = df2.select(sameText(df2['file'])).first()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: row2?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure.

@SparkQA
Copy link

SparkQA commented Jan 18, 2017

Test build #71575 has started for PR 16585 at commit 2b61d47.

@cloud-fan
Copy link
Contributor

LGTM, pending jenkins

@viirya
Copy link
Member Author

viirya commented Jan 18, 2017

retest this please.

@SparkQA
Copy link

SparkQA commented Jan 18, 2017

Test build #71581 has finished for PR 16585 at commit 2b61d47.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Jan 18, 2017

retest this please.

@SparkQA
Copy link

SparkQA commented Jan 18, 2017

Test build #71596 has finished for PR 16585 at commit 2b61d47.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@asfgit asfgit closed this in d06172b Jan 18, 2017
@viirya
Copy link
Member Author

viirya commented Jan 18, 2017

@rxin @cloud-fan Thanks!

uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
…which are based on HadoopRDD or NewHadoopRDD

## What changes were proposed in this pull request?

For some datasources which are based on HadoopRDD or NewHadoopRDD, such as spark-xml, InputFileBlockHolder doesn't work with Python UDF.

The method to reproduce it is, running the following codes with `bin/pyspark --packages com.databricks:spark-xml_2.11:0.4.1`:

    from pyspark.sql.functions import udf,input_file_name
    from pyspark.sql.types import StringType
    from pyspark.sql import SparkSession

    def filename(path):
        return path

    session = SparkSession.builder.appName('APP').getOrCreate()

    session.udf.register('sameText', filename)
    sameText = udf(filename, StringType())

    df = session.read.format('xml').load('a.xml', rowTag='root').select('*', input_file_name().alias('file'))
    df.select('file').show() # works
    df.select(sameText(df['file'])).show()   # returns empty content

The issue is because in `HadoopRDD` and `NewHadoopRDD` we set the file block's info in `InputFileBlockHolder` before the returned iterator begins consuming. `InputFileBlockHolder` will record this info into thread local variable. When running Python UDF in batch, we set up another thread to consume the iterator from child plan's output rdd, so we can't read the info back in another thread.

To fix this, we have to set the info in `InputFileBlockHolder` after the iterator begins consuming. So the info can be read in correct thread.

## How was this patch tested?

Manual test with above example codes for spark-xml package on pyspark: `bin/pyspark --packages com.databricks:spark-xml_2.11:0.4.1`.

Added pyspark test.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes apache#16585 from viirya/fix-inputfileblock-hadooprdd.
cmonkey pushed a commit to cmonkey/spark that referenced this pull request Feb 15, 2017
…which are based on HadoopRDD or NewHadoopRDD

## What changes were proposed in this pull request?

For some datasources which are based on HadoopRDD or NewHadoopRDD, such as spark-xml, InputFileBlockHolder doesn't work with Python UDF.

The method to reproduce it is, running the following codes with `bin/pyspark --packages com.databricks:spark-xml_2.11:0.4.1`:

    from pyspark.sql.functions import udf,input_file_name
    from pyspark.sql.types import StringType
    from pyspark.sql import SparkSession

    def filename(path):
        return path

    session = SparkSession.builder.appName('APP').getOrCreate()

    session.udf.register('sameText', filename)
    sameText = udf(filename, StringType())

    df = session.read.format('xml').load('a.xml', rowTag='root').select('*', input_file_name().alias('file'))
    df.select('file').show() # works
    df.select(sameText(df['file'])).show()   # returns empty content

The issue is because in `HadoopRDD` and `NewHadoopRDD` we set the file block's info in `InputFileBlockHolder` before the returned iterator begins consuming. `InputFileBlockHolder` will record this info into thread local variable. When running Python UDF in batch, we set up another thread to consume the iterator from child plan's output rdd, so we can't read the info back in another thread.

To fix this, we have to set the info in `InputFileBlockHolder` after the iterator begins consuming. So the info can be read in correct thread.

## How was this patch tested?

Manual test with above example codes for spark-xml package on pyspark: `bin/pyspark --packages com.databricks:spark-xml_2.11:0.4.1`.

Added pyspark test.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes apache#16585 from viirya/fix-inputfileblock-hadooprdd.
@viirya viirya deleted the fix-inputfileblock-hadooprdd branch December 27, 2023 18:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants