Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-35211][PYTHON] verify inferred schema for _create_dataframe #32320

Closed
wants to merge 2 commits into from

Conversation

da-liii
Copy link
Contributor

@da-liii da-liii commented Apr 24, 2021

What changes were proposed in this pull request?

  1. refactor code using inner_map
  2. do extra schema verification after it is inferred

This PR do not introduction any semantic changes except for the extra schema verification.

This pr fixes SPARK-35211 when schema verification is turned on. If schema verification is turned off, the bug described in SPARK-35211 still exists. I will create another PR to solve the issue.

Why are the changes needed?

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "false")
from pyspark.testing.sqlutils  import ExamplePoint
import pandas as pd
pdf = pd.DataFrame({'point': pd.Series([ExamplePoint(1, 1), ExamplePoint(2, 2)])})
df = spark.createDataFrame(pdf)
df.show()

The result is not correct because of incorrect type conversion.

With this PR, type check will be performed:

(spark) ➜  spark git:(sadhen/SPARK-35211) ✗ bin/pyspark
Python 3.8.8 (default, Feb 24 2021, 13:46:16)
[Clang 10.0.0 ] :: Anaconda, Inc. on darwin
Type "help", "copyright", "credits" or "license" for more information.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/04/24 17:42:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.2.0-SNAPSHOT
      /_/

Using Python version 3.8.8 (default, Feb 24 2021 13:46:16)
Spark context Web UI available at http://172.30.0.12:4040
Spark context available as 'sc' (master = local[*], app id = local-1619257343692).
SparkSession available as 'spark'.
>>> spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "false")
>>> from pyspark.testing.sqlutils  import ExamplePoint
>>> import pandas as pd
>>> pdf = pd.DataFrame({'point': pd.Series([ExamplePoint(1, 1), ExamplePoint(2, 2)])})
>>> df = spark.createDataFrame(pdf)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/da/github/apache/spark/python/pyspark/sql/session.py", line 653, in createDataFrame
    return super(SparkSession, self).createDataFrame(
  File "/Users/da/github/apache/spark/python/pyspark/sql/pandas/conversion.py", line 340, in createDataFrame
    return self._create_dataframe(data, schema, samplingRatio, verifySchema)
  File "/Users/da/github/apache/spark/python/pyspark/sql/session.py", line 699, in _create_dataframe
    rdd, schema = self._createFromLocal(map(prepare, data), schema)
  File "/Users/da/github/apache/spark/python/pyspark/sql/session.py", line 499, in _createFromLocal
    data = list(data)
  File "/Users/da/github/apache/spark/python/pyspark/sql/session.py", line 688, in prepare
    verify_func(obj)
  File "/Users/da/github/apache/spark/python/pyspark/sql/types.py", line 1409, in verify
    verify_value(obj)
  File "/Users/da/github/apache/spark/python/pyspark/sql/types.py", line 1390, in verify_struct
    verifier(v)
  File "/Users/da/github/apache/spark/python/pyspark/sql/types.py", line 1409, in verify
    verify_value(obj)
  File "/Users/da/github/apache/spark/python/pyspark/sql/types.py", line 1304, in verify_udf
    verifier(dataType.toInternal(obj))
  File "/Users/da/github/apache/spark/python/pyspark/sql/types.py", line 1409, in verify
    verify_value(obj)
  File "/Users/da/github/apache/spark/python/pyspark/sql/types.py", line 1354, in verify_array
    element_verifier(i)
  File "/Users/da/github/apache/spark/python/pyspark/sql/types.py", line 1409, in verify
    verify_value(obj)
  File "/Users/da/github/apache/spark/python/pyspark/sql/types.py", line 1403, in verify_default
    verify_acceptable_types(obj)
  File "/Users/da/github/apache/spark/python/pyspark/sql/types.py", line 1291, in verify_acceptable_types
    raise TypeError(new_msg("%s can not accept object %r in type %s"
TypeError: element in array field point: DoubleType can not accept object 1 in type <class 'int'>

Does this PR introduce any user-facing change?

No

How was this patch tested?

unit test

@da-liii da-liii changed the title [SPARK-35211][PYSPARK] _create_dataframe: infer schema earlier and do type check [SPARK-35211][PYTHON] _create_dataframe: infer schema earlier and do type check Apr 24, 2021
@SparkQA
Copy link

SparkQA commented Apr 24, 2021

Test build #137882 has finished for PR 32320 at commit bea87a5.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@da-liii da-liii marked this pull request as draft April 24, 2021 12:27
@SparkQA
Copy link

SparkQA commented Apr 24, 2021

Test build #137889 has finished for PR 32320 at commit 4dc085c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@da-liii da-liii changed the title [SPARK-35211][PYTHON] _create_dataframe: infer schema earlier and do type check [SPARK-35211][PYTHON] _create_dataframe: verify inferred schema Apr 24, 2021
@da-liii da-liii changed the title [SPARK-35211][PYTHON] _create_dataframe: verify inferred schema [SPARK-35211][PYTHON] verify inferred schema for _create_dataframe Apr 24, 2021
@da-liii da-liii marked this pull request as ready for review April 24, 2021 15:44
@HyukjinKwon
Copy link
Member

@sadhen, can we separate refactoring and the UDT inferred type verification? It would make the change much easier to review.

@da-liii
Copy link
Contributor Author

da-liii commented Apr 25, 2021

@HyukjinKwon There are little differences in _createFromRDD and _createFromLocal. If I do inferred type verification in a separate PR, I need to insert the following code snippet twice:

                verify_func = _make_type_verifier(struct) if verifySchema else lambda _: True

                def verified_converter(obj):
                    verify_func(obj)
                    return converter(obj)
                data = inner_map(verified_converter, data)

That's why I did a refactor.

Let me create another PR for inferred type verification.

@da-liii
Copy link
Contributor Author

da-liii commented Apr 25, 2021

A PR without refactor is prepared: #32332

@da-liii
Copy link
Contributor Author

da-liii commented Apr 25, 2021

This PR will be rebased on master when #32332 is merged.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants