Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-43055][CONNECT][PYTHON] Support duplicated nested field names #40692

Closed

Conversation

ueshin
Copy link
Member

@ueshin ueshin commented Apr 6, 2023

What changes were proposed in this pull request?

Supports duplicated nested field names when spark.createDataFrame or df.collect.

Why are the changes needed?

If there are duplicated nested field names, the following error is raised:

>>> from pyspark.sql.types import *
>>>
>>> data = [Row(Row("a", 1), Row(2, 3, "b", 4, "c")), Row(Row("x", 6), Row(7, 8, "y", 9, "z"))]
>>> schema = (
...     StructType()
...     .add("struct", StructType().add("x", StringType()).add("x", IntegerType()))
...     .add(
...         "struct",
...         StructType()
...         .add("a", IntegerType())
...         .add("x", IntegerType())
...         .add("x", StringType())
...         .add("y", IntegerType())
...         .add("y", StringType()),
...     )
... )
>>> df = spark.createDataFrame(data, schema=schema)
Traceback (most recent call last):
...
pyarrow.lib.ArrowTypeError: Expected bytes, got a 'int' object

Does this PR introduce any user-facing change?

The duplicated nested field names will be available.

How was this patch tested?

Added a test.

@HyukjinKwon
Copy link
Member

cc @zhengruifeng

@zhengruifeng
Copy link
Contributor

Just FYI, vanilla PySpark's DataFrame.toPandas also has this issue https://issues.apache.org/jira/browse/SPARK-41971
Is it possible to move the changes to ArrowUtils to fix them all?

@ueshin
Copy link
Member Author

ueshin commented Apr 7, 2023

Just FYI, vanilla PySpark's DataFrame.toPandas also has this issue issues.apache.org/jira/browse/SPARK-41971
Is it possible to move the changes to ArrowUtils to fix them all?

Yes, I'm aware of the issue, but let me hold on it to the following PRs.
(Thanks for filing the ticket, btw. 😄 )

TL;DR

Actually this PR still has an issue with toPandas.

>>> spark.sql("values (1, struct(1 as a, 2 as a)) as t(x, y)").toPandas()
   x                     y
0  1  {'a_0': 1, 'a_1': 2}

The duplicated fields have suffix _1, _2, and so on.

Also, handling struct type in toPandas was not well-defined and there are behavior difference even between Arrow enabled/disabled in PySpark.

>>> spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', False)
>>> spark.sql("values (1, struct(1 as a, 2 as b)) as t(x, y)").toPandas()
   x       y
0  1  (1, 2)
>>> spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True)
>>> spark.sql("values (1, struct(1 as a, 2 as b)) as t(x, y)").toPandas()
   x                 y
0  1  {'a': 1, 'b': 2}

Currently PySpark with Arrow enabled, and Spark Connect, use a map for the struct type object as a result, whereas Row object in PySpark without Arrow.

The options are:

  1. It's ok to be different, also with suffix.
    • In this case, the suffix is a must because a map object will hold only one value for the duplicates.
  2. Row object should be used for the struct.
    • In this case, we will lose the benefit of Arrow -> pandas fast conversion.

@@ -60,13 +61,19 @@ private[sql] class SparkResult[T](
private def processResponses(stopOnFirstNonEmptyResponse: Boolean): Boolean = {
while (responses.hasNext) {
val response = responses.next()
if (response.hasSchema) {
structType =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between this schema and the one in the arrow batch?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is the original schema and the one in the arrow batch is modified to deduplicate the struct field names.
Also the original schema contains UDT if it's supported. Python client works fine with that.

Copy link
Contributor

@amaliujia amaliujia Apr 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic actual becomes more confusing now about the structType assingment.

I am wondering if it should becomes something like

if (response.hasSchema)
else if (response.hasArrowBatch)

I am becoming not sure as the code is

  1. if response gives a schema, use it
  2. if response didn't give then try arrow's schema

then how to handle when both response has a schema and arrow has schema is not clear, or which one should be used first, etc. Per my read the response schema and arrow schema could be even not consistent?

Copy link
Member Author

@ueshin ueshin Apr 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that the original schema arrives earlier than arrow batches, we should use it if it's available; otherwise fallback to the schema from arrow batch.

Yes, the response schema and arrow schema could be inconsistent in terms of the nested field names if there are duplicates, but it's not problem while encoder is handling the ColumnarBatch as long as the data structure is consistent.

Added some comments.

@ueshin ueshin marked this pull request as ready for review April 7, 2023 06:49
@HyukjinKwon
Copy link
Member

Merged to master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants