Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24915][Python] Fix Row handling with Schema. #26118

Closed
wants to merge 1 commit into from

Conversation

jhereth
Copy link

@jhereth jhereth commented Oct 14, 2019

What changes were proposed in this pull request?

This change implements a special handling of pyspark.sql.Row within the conversion of a StructType into an internal SQL object (toInternal()) to fix SPARK-24915.

Previously, Row was processed as tuple (since it inherits from tuple). In particular, it was expected that values come in the "right" order. This works if the internal order of the Row (sorted by key in the current implementation) corresponds to the order of fields in the schema. If the fields have a different order and need special treatment (e.g. _needConversion is True) then exceptions happened when creating dataframes.

With this change, it will be processed as a dict if it has named columns and as tuple otherwise.

Design

Re asDict: I first had an implementation for Row as type. However, that implementation would fail for fields that are unknown to the Row object, this is inconsistent with the handling of dicts. The most consistent implementation is to convert the Row to dict.

Note: The underlying problem is that Row inherits from tuple. This is visible in the tests, too. for assertEqual the Rows Row(a=..., b=...) and Row(b=..., a=...) are not equal because they are compared as lists (and the order is wrong) while a direct comparison returns True (For this reason the tests compare based on asDict).

Why are the changes needed?

The code part being changed relies on Rows being RDD-style tuples but breaks for Rows created with kwargs.

This change fixes SPARK-24915, creating data frames from (pyspark.sql.)Rows which failed if the order of fields in the schema differed from the (internal) order of fields in the Row and the schema is "complicated".

Complicated can be if one type of the schema is nested (as in the JIRA issue) or one field needs conversion (e.g. DateType())

Without the change, the following examples fail:

From JIRA issue:

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, Row

conf = SparkConf().setMaster("local").setAppName("repro") 
context = SparkContext(conf=conf) 
session = SparkSession(context)
schema = StructType([
    StructField('field2', StructType([StructField('sub_field', StringType(), False)]), False),
    StructField('field1', StringType(), False),
])
data = [Row(field1="Hello", field2=Row(sub_field='world'))]
df = session.createDataFrame(data, schema=schema) # this will throw a ValueError
df.show()

Date example:

import datetime as dt
from pyspark.sql import Row, SparkSession
from pyspark.sql.types import StringType, DateType, StructField, StructType
spark = SparkSession.builder.master("local").getOrCreate()
schema = StructType([
    StructField("join_date", DateType(), False),
    StructField("name", StringType(),False),
])
rows = [Row(name='Alice', age=23, join_date=dt.datetime(2019,10,1,23,45,56)),]
spark.createDataFrame(rows, schema=schema).collect()

Does this PR introduce any user-facing change?

This change is not introducing User-facing changes for existing, working pyspark code.

Code that previously caused exceptions b/c of the fixed bug will now work (which - technically - is a user-facing change).

How was this patch tested?

Standard Tests

ARROW_PRE_0_15_IPC_FORMAT=1 ./dev/run-tests succeeded on my machine

Python: 3.7.4
Spark: master (a42d894a4090c97a90ce23b0989163909ebf548d)
OS: MacOS 10.14.6.

New Tests

I added the following tests in module pyspark.sql.tests.test_types:

  • test_create_dataframe_from_rows_mixed_with_datetype: schema with date field doesn't cause exception
  • test_create_dataframe_from_rows_with_nested_row: schema with nested field doesn't cause exception
  • test_create_dataframe_from_tuple_rows: Regression test: RDD-style Rows still work

The latter corresponds to the test case from SPARK_24915.

@jhereth
Copy link
Author

jhereth commented Oct 14, 2019

@davies @HyukjinKwon you've been the last to work on this part of the code. Are you available for review?

@jhereth
Copy link
Author

jhereth commented Oct 14, 2019

This contribution is my original work and that I license the work to the project under the project’s open source license.

@jhereth jhereth changed the title [SPARK-24915][PySpark] Fix Row handling with Schema. [SPARK-24915][Python] Fix Row handling with Schema. Oct 16, 2019
@HyukjinKwon
Copy link
Member

ok to test

@HyukjinKwon
Copy link
Member

cc @BryanCutler too

@SparkQA
Copy link

SparkQA commented Oct 24, 2019

Test build #112581 has finished for PR 26118 at commit a52de2e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

Seems making sense to me.

@HyukjinKwon
Copy link
Member

cc @zero323

@zero323
Copy link
Member

zero323 commented Oct 26, 2019

@HyukjinKwon To be honest I have mixed feelings about this. It looks sensible as a temporary workaround, but I am not fond of the idea of enforcing notion of Row being an unordered dictionary-like object (though with compact dict as standard, that doesn't matter that much), especially when it is close to becoming completely obsolete.

Personally I'd prefer to wait a moment and see where the discussion on SPARK-22232 goes. If the resolution is introduction of legacy mode, then the scope of this particular change could be conditioned on it and Python version.

If not I'd like to see some memory profiling data (especially memory - timings might be actually better for now, as we skip all the nasty obj[n], but that's not very meaningful*) first.

I've done some rough testing and conversion to dict (with simple optimization suggested below) is at roughly six times slower than conversion to tuple. I'd expect that there is also significant memory overhead of dictionary conversion, as we effectively create a full copy of the data with associated names. If that suspicion is confirmed that would be a huge overhead, and shouldn't be incurred, if it is not necessary.

It is also worth mentioning that SPARK-24915 is really an edge case. If user wants to provide schema then it is hard to justify using Rows (plain tuple or different flavors of dict are much better choice in such case), so making everyone worse (if I am right about performance impact) to support it doesn't make much sense.


* Is there any reason why we do this:

return tuple(obj[n] for n in self.names)

instead of just tuple(obj)? That's huge performance bottleneck with wide schemas. Depending on the resolution of this one, that's something to fix, don't you think?

@jhereth
Copy link
Author

jhereth commented Oct 26, 2019

@zero323 Thanks for looking into this

Personally I'd prefer to wait a moment and see where the discussion on SPARK-22232 goes. If the resolution is introduction of legacy mode, then the scope of this particular change could be conditioned on it and Python version.

I agree that an update to Row is required and I'm happy to see discussions going in the
right direction. However, this will be part of Spark 3.x and for many Spark users using it in production it will be a long wait until this is going to happen.

Do you think it makes sense to apply this change for Spark 2.x?

If not I'd like to see some memory profiling data (especially memory - timings might be actually better for now, as we skip all the nasty obj[n], but that's not very meaningful*) first.

I can try to do that. Do you have any example how to do proper timings with spark?

It is also worth mentioning that SPARK-24915 is really an edge case. If user wants to provide schema then it is hard to justify using Rows (plain tuple or different flavors of dict are much better choice in such case), so making everyone worse (if I am right about performance impact) to support it doesn't make much sense.

In my case it was about reproducing input for a test case. I just wanted to created a dataframe containing the same rows as the problematic dataframe. In the end, I chose a different way to produce the data but it feels strange to not be able to simple create Rows the way you see them in some dataframe.

  • Is there any reason why we do this:

return tuple(obj[n] for n in self.names)

instead of just tuple(obj)? That's huge performance bottleneck with wide schemas. Depending on the resolution of this one, that's something to fix, don't you think?

This was a workaround introduced by #14469. tuple(obj) relies on the order of fields - which for Row is alphabetically. If this doesn't correspond to the schema the order of the fields will be messed up.

My change here is actually just doing the same workaround for schemas with fields that need some serialization.

I'm unclear if the proposed change will have any negative effect. The codepath should only be taken in cases that either failed (as in SPARK-24915) or might have silently mixed up the .toInternal() calls of different types.

Even with sub-optimal performance, this would only improve the situation for users.

Would you prefer to replace

https://github.com/qudade/spark/blob/a52de2e4b258e7fecad4143e00f01df4b096a513/python/pyspark/sql/types.py#L603

return self.toInternal(obj.asDict())

by

return tuple(f.toInternal(obj[n]) if c else obj[n]
                             for n, f, c in zip(self.names, self.fields, self._needConversion))

to reduce one indirection?

@SparkQA
Copy link

SparkQA commented Nov 7, 2019

Test build #113375 has finished for PR 26118 at commit 47f74fd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zero323
Copy link
Member

zero323 commented Nov 7, 2019

I agree that an update to Row is required and I'm happy to see discussions going in the
right direction. However, this will be part of Spark 3.x and for many Spark users using it in production it will be a long wait until this is going to happen.

Do you think it makes sense to apply this change for Spark 2.x?

I don't have strong opinion (especially I cannot speak for project policy, as I am usually less conservative than most of the folks around here), but I'd say that conditioned on lack of significant negative performance impact on more common use cases (i.e. RDD[Row] without schema), it is fine.

I am also not completely against incorporating this in 3.0, just prefer to see how

If not I'd like to see some memory profiling data (especially memory - timings might be actually better for now, as we skip all the nasty obj[n], but that's not very meaningful*) first.

I can try to do that. Do you have any example how to do proper timings with spark?

For such simple case timeit / memory_profiler on map objects should be more than sufficient, unless you observe something truly unexpected.

  • Time-wise performance should be for now better, than existing approach, so unless you see unexpected slowdown (particularly with wide schemas) we should be good.
  • Memory-wise we might expect higher peak memory usage (let's say up to 100%, excluding constant interpreter overhead), as temporary dicts should swept away, once we get out of scope.

This was a workaround introduced by #14469. tuple(obj) relies on the order of fields - which for Row is alphabetically. If this doesn't correspond to the schema the order of the fields will be messed up.

My change here is actually just doing the same workaround for schemas with fields that need some serialization.

My bad, makes sense.

I'm unclear if the proposed change will have any negative effect. The codepath should only be taken in cases that either failed (as in SPARK-24915) or might have silently mixed up the .toInternal() calls of different types.

Even with sub-optimal performance, this would only improve the situation for users.

Would you prefer to replace

https://github.com/qudade/spark/blob/a52de2e4b258e7fecad4143e00f01df4b096a513/python/pyspark/sql/types.py#L603

return self.toInternal(obj.asDict())

by

return tuple(f.toInternal(obj[n]) if c else obj[n]
                             for n, f, c in zip(self.names, self.fields, self._needConversion))

to reduce one indirection?

No, As much as I don't like dictionaries here, there much better solution here.

@SparkQA
Copy link

SparkQA commented Nov 10, 2019

Test build #113550 has finished for PR 26118 at commit c262689.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 11, 2019

Test build #113602 has finished for PR 26118 at commit 67efdf4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jhereth
Copy link
Author

jhereth commented Nov 12, 2019

@zero323 @HyukjinKwon
I did some performance tests (Details in this gist).

As expected, for Rows that are created using kwargs (has __from_dict__) AND where fields are ordered alphabetically the performance is worse (~15% at 15 fields, ~25% at 150 fields) and memory consumption increases.

Of course, this is an edge case - I think it is rare to construct dataframes from Rows (otherwise this bug would have been fixed earlier) but for tests/experiments when performance is less of an issue.

If performance is an issue, we could check if the order of fields is already alphabetical (making the performance worse for the general case) or determine the order once and reuse this mapping (might require major changes).

In my experience, not being able to create a dataframe from dict-like Rows was a time-consuming annoyance. The value added by this PR is to enable this.

What do you think? Is there any way to improve the code without making it unnecessarily complex?

@SparkQA
Copy link

SparkQA commented Nov 15, 2019

Test build #113848 has finished for PR 26118 at commit 3493238.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zero323
Copy link
Member

zero323 commented Nov 25, 2019

I did some performance tests (Details in this gist).

Thank you!

As expected, for Rows that are created using kwargs (has __from_dict__) AND where fields are ordered alphabetically the performance is worse (~15% at 15 fields, ~25% at 150 fields) and memory consumption increases.

That seem acceptable in my opinion - not great, but given the diminishing importance of Row it is not the most serious concern I guess.

In my experience, not being able to create a dataframe from dict-like Rows was a time-consuming annoyance. The value added by this PR is to enable this.

I will just point out that dicts, OrderedDicts, plain tuples or namedtuples are much more efficient input structures when schema is provided. The biggest value of Row is that it provides named structure to communicate results (and let's be honest - it doesn't do it very well). But that's just a side note.

@jhereth
Copy link
Author

jhereth commented Nov 27, 2019

Thanks!

I agree, I wouldn't have used Rows if not for trying to recreate a problem for a test setting. After reading the types.py code I found another way. I hope this change spares someone from this hustle.

@zero323 @HyukjinKwon What is the process to get this PR merged?

@SparkQA
Copy link

SparkQA commented Nov 27, 2019

Test build #114546 has finished for PR 26118 at commit 5a63e75.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Dec 24, 2019

Test build #115726 has finished for PR 26118 at commit 5a63e75.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

Let's review and merge #26496 first.

@jhereth
Copy link
Author

jhereth commented Dec 26, 2019

@HyukjinKwon I think #26496 will remove the main reason for this change but this will be Spark 3.0.

This change might be more helpful for 2.x. Does it make sense to open a PR against a 2.x branch? I saw examples of backporting fixes but not of PRs solely against old branches. Any suggestions how to proceed?

@SparkQA
Copy link

SparkQA commented Dec 26, 2019

Test build #115819 has finished for PR 26118 at commit 2490a11.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

It's difficult to backport to 2.x branches due to behavioural changes. Maintenance releases should ideally not have such behaviour changes.

@jhereth
Copy link
Author

jhereth commented Dec 27, 2019

@HyukjinKwon Thanks for the quick response.

I meant just applying this change (#26118) against 2.4. #26496 is certainly a major behaviour change - #26118 just changes behaviour as much as expected from a bugfix.

@BryanCutler
Copy link
Member

I'm -0 on this change for 2.x branch. It seems minimal and makes Row slightly more consistent, but there are still bigger issues this doesn't solve, I think it's better to just document as a known bug and workaround by using Dicts instead or not using Row with named arguments. I'm not against merging if others seem it's worth it though.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label May 17, 2020
@github-actions github-actions bot closed this May 19, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants