Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-18667][PySpark][SQL] Change the way to group row in BatchEvalPythonExec so input_file_name function can work with UDF in pyspark #16115

Closed
wants to merge 1 commit into from

Conversation

viirya
Copy link
Member

@viirya viirya commented Dec 2, 2016

What changes were proposed in this pull request?

input_file_name doesn't return filename when working with UDF in PySpark. An example shows the problem:

from pyspark.sql.functions import *
from pyspark.sql.types import *

def filename(path):
    return path

sourceFile = udf(filename, StringType())
spark.read.json("tmp.json").select(sourceFile(input_file_name())).show()

+---------------------------+
|filename(input_file_name())|
+---------------------------+
|                           |
+---------------------------+    

The cause of this issue is, we group rows in BatchEvalPythonExec for batching processing of PythonUDF. Currently we group rows first and then evaluate expressions on the rows. If the data is less than the required number of rows for a group, the iterator will be consumed to the end before the evaluation. However, once the iterator reaches the end, we will unset input filename. So the input_file_name expression can't return correct filename.

This patch fixes the approach to group the batch of rows. We evaluate the expression first and then group evaluated results to batch.

How was this patch tested?

Added unit test to PySpark.

Please review http://spark.apache.org/contributing.html before opening a pull request.

@viirya
Copy link
Member Author

viirya commented Dec 2, 2016

cc @davies

@SparkQA
Copy link

SparkQA commented Dec 2, 2016

Test build #69549 has started for PR 16115 at commit 7cd606b.

@viirya
Copy link
Member Author

viirya commented Dec 2, 2016

retest this please.

@SparkQA
Copy link

SparkQA commented Dec 2, 2016

Test build #69553 has finished for PR 16115 at commit 7cd606b.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 2, 2016

Test build #69563 has finished for PR 16115 at commit 8b0ef1e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Dec 7, 2016

cc @cloud-fan @hvanhovell too

@viirya
Copy link
Member Author

viirya commented Dec 8, 2016

ping @davies again

asfgit pushed a commit that referenced this pull request Dec 8, 2016
…ythonExec so input_file_name function can work with UDF in pyspark

## What changes were proposed in this pull request?

`input_file_name` doesn't return filename when working with UDF in PySpark. An example shows the problem:

    from pyspark.sql.functions import *
    from pyspark.sql.types import *

    def filename(path):
        return path

    sourceFile = udf(filename, StringType())
    spark.read.json("tmp.json").select(sourceFile(input_file_name())).show()

    +---------------------------+
    |filename(input_file_name())|
    +---------------------------+
    |                           |
    +---------------------------+

The cause of this issue is, we group rows in `BatchEvalPythonExec` for batching processing of PythonUDF. Currently we group rows first and then evaluate expressions on the rows. If the data is less than the required number of rows for a group, the iterator will be consumed to the end before the evaluation. However, once the iterator reaches the end, we will unset input filename. So the input_file_name expression can't return correct filename.

This patch fixes the approach to group the batch of rows. We evaluate the expression first and then group evaluated results to batch.

## How was this patch tested?

Added unit test to PySpark.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #16115 from viirya/fix-py-udf-input-filename.

(cherry picked from commit 6a5a725)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@cloud-fan
Copy link
Contributor

good catch! Merging to master/2.1, thanks!

@asfgit asfgit closed this in 6a5a725 Dec 8, 2016
robert3005 pushed a commit to palantir/spark that referenced this pull request Dec 15, 2016
…ythonExec so input_file_name function can work with UDF in pyspark

## What changes were proposed in this pull request?

`input_file_name` doesn't return filename when working with UDF in PySpark. An example shows the problem:

    from pyspark.sql.functions import *
    from pyspark.sql.types import *

    def filename(path):
        return path

    sourceFile = udf(filename, StringType())
    spark.read.json("tmp.json").select(sourceFile(input_file_name())).show()

    +---------------------------+
    |filename(input_file_name())|
    +---------------------------+
    |                           |
    +---------------------------+

The cause of this issue is, we group rows in `BatchEvalPythonExec` for batching processing of PythonUDF. Currently we group rows first and then evaluate expressions on the rows. If the data is less than the required number of rows for a group, the iterator will be consumed to the end before the evaluation. However, once the iterator reaches the end, we will unset input filename. So the input_file_name expression can't return correct filename.

This patch fixes the approach to group the batch of rows. We evaluate the expression first and then group evaluated results to batch.

## How was this patch tested?

Added unit test to PySpark.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes apache#16115 from viirya/fix-py-udf-input-filename.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
…ythonExec so input_file_name function can work with UDF in pyspark

## What changes were proposed in this pull request?

`input_file_name` doesn't return filename when working with UDF in PySpark. An example shows the problem:

    from pyspark.sql.functions import *
    from pyspark.sql.types import *

    def filename(path):
        return path

    sourceFile = udf(filename, StringType())
    spark.read.json("tmp.json").select(sourceFile(input_file_name())).show()

    +---------------------------+
    |filename(input_file_name())|
    +---------------------------+
    |                           |
    +---------------------------+

The cause of this issue is, we group rows in `BatchEvalPythonExec` for batching processing of PythonUDF. Currently we group rows first and then evaluate expressions on the rows. If the data is less than the required number of rows for a group, the iterator will be consumed to the end before the evaluation. However, once the iterator reaches the end, we will unset input filename. So the input_file_name expression can't return correct filename.

This patch fixes the approach to group the batch of rows. We evaluate the expression first and then group evaluated results to batch.

## How was this patch tested?

Added unit test to PySpark.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes apache#16115 from viirya/fix-py-udf-input-filename.
@viirya viirya deleted the fix-py-udf-input-filename branch December 27, 2023 18:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants