Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32846][SQL][PYTHON] Support createDataFrame from an RDD of pd.DataFrames #29719

Conversation

linar-jether
Copy link

What changes were proposed in this pull request?

Added support to createDataFrame to receive an RDD of pd.DataFrame objects, and convert them using arrow into an RDD of record batches which is then directly converted to a spark DF.

Added a pandasRDD flag to createDataFrame to distinguish between RDD[pd.DataFrame] and other RDDs without peeking into their content.

from pyspark.sql import SparkSession
import pyspark
import pyarrow as pa
import numpy as np
import pandas as pd
import re

spark = SparkSession \
    .builder \
    .master("local") \
    .appName("Python RDD[pd.DataFrame] to spark DF example") \
    .getOrCreate()

# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
sc = spark.sparkContext

# Create a spark DF from an RDD of pandas DFs
prdd = sc.range(0, 4).map(lambda x: pd.DataFrame([[x,]*4], columns=list('ABCD')))

prdd_large = sc.range(0, 32, numSlices=32). \
    map(lambda x: pd.DataFrame(np.random.randint(0, 100, size=(40 << 15, 4)), columns=list('ABCD')))

df = spark.createDataFrame(prdd, schema=None, pandasRDD=True)
df.toPandas()

How was this patch tested?

Added a new test using for creating a spark DF from an RDD of pandas dataframes.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@HyukjinKwon
Copy link
Member

I think you're already able to do the same thing with spark.range(0, 4).mapInPandas(...). Just like you're creating an RDD from pandas DataFrame with sc.range(0, 4).map(...)

@linar-jether
Copy link
Author

Thank you @HyukjinKwon, issue is that this only applies to dataframes, this means that only spark supported types can be input to .mapInPandas.
so this does not cover use cases such as: RDD[python_object] -> obj_to_pandas_df() -> create spark DF, or as a method to read an RDD of pickle files and convert them to a spark DF.

I believe this can enable more seamless integration with python packages that do not natively support spark.

@linar-jether
Copy link
Author

Friendly ping @HyukjinKwon, is anything more needed for a review?

@linar-jether
Copy link
Author

@BryanCutler @ueshin @viirya Who would be the right person to review this?

@github-actions
Copy link

github-actions bot commented Jan 9, 2021

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Jan 9, 2021
…andas-rdd-to-spark-df-SPARK-3284

� Conflicts:
�	python/pyspark/sql/pandas/types.py
�	python/pyspark/sql/session.py
@github-actions github-actions bot added the CORE label Jan 9, 2021
@linar-jether
Copy link
Author

Hi @HyukjinKwon @BryanCutler, I've synced with master, hoping this could get reviewed and get the "stale" tag removed

@HyukjinKwon HyukjinKwon removed the Stale label Jan 10, 2021
@JacekPliszka
Copy link

Any chance of getting it through? I'd love to have the feature.

My use case is many to many pandas dataframes/arrow tables.

applyInPandas with cogroup is just many to one...

@drakesiard
Copy link

This would be a really useful feature, if it could be merged.

Copy link
Contributor

@holdenk holdenk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a potentially useful feature.

Comment on lines +300 to +301
pandasRDD=True creates a DataFrame from an RDD of pandas dataframes
(currently only supported using arrow)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to do type checking here instead of having a flag?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we somehow define/get the type of the RDD[py-object] without evaluating the first element of it?
If not, then the RDD might contain any type of object, so the pandasRDD option is used as a way to differentiate between initialization from an RDD and an RDD of pd.DataFrames.

Thank you for reviewing! please let me know if there's anything else i can do to get this merged.

Copy link
Contributor

@holdenk holdenk Jul 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. If we look in session.py we can see _createFromRDD does it magic there. Personally I would put this logic instead inside of _inferSchema and toInternal respectively but I'm coming at this from more a core-spark dev perspective maybe @HyukjinKwon has a different view.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if we don't refactor this back into session.py, I'd encourage you to look at session.py and consider structuring this in a similar way so that we don't have to have this flag here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this seems to fit well into _inferSchema & _createFromRDD, although we still would need some way to discern between an rdd of DataFrames and other types when the user provides a schema (and we don't want to peek into the first item).

Do you think it would be better to move the pandas flag into _createFromRDD?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So _inferSchema does effectively peek into the first element. I think we could just put the logic down inside of the map and then the user doesn't have to specify this flag.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's in case the user wants to infer the schema (so we have to peek into the rdd), but in case the use does specify the schema, there's no need to peek, and we're left with no other option to tell which code path we need

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So let's say the user specifies a schema, in that case inside of _createFromRDD we can just look at the type of each element that were processing and see if it's a DataFrame or a Row or a Dictionary and dispatch the logic there. What do you think? Or is there a reason I'm missing why we couldn't do the dispatch inside of _createFromRDD based on type?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well in case the user specifies a schema, the entire process is lazy, so there's no need to evaluate any of the rdd elements...

if we keep everything lazy and map each element to either a row or RecordBatch, we would still need to know which path to take, e.g. for RecordBatches we need to call:

        from pyspark.sql.dataframe import DataFrame
        jrdd = rb_rdd._to_java_object_rdd()
        jdf = self._jvm.PythonSQLUtils.toDataFrame(jrdd, schema.json(), self._wrapped._jsqlContext)
        df = DataFrame(jdf, self._wrapped)
        df._schema = schema
        return df

and for Rows we need to call:

        jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
        jdf = self._jsparkSession.applySchemaToPythonRDD(jrdd.rdd(), schema.json())
        df = DataFrame(jdf, self._wrapped)
        df._schema = schema
        return df

@holdenk
Copy link
Contributor

holdenk commented Aug 5, 2021

Hey @linar-jether just pinging to see if this still something your working on.

@HyukjinKwon
Copy link
Member

@linar-jether, just to clarify what's your usecase? you can leverage a binary DataFrame and call mapInPandas as an example. e.g.)

RDD -> DataFrame[binary] -> DataFrame.mapInPandas(func_binary_to_pandas, schema)

@HyukjinKwon
Copy link
Member

my biggest concern is that the current API here encourages to more use RDD whereas Spark more encourages to use DataFrame APIs in general to leverage Spark SQL optimizer, etc.

@JacekPliszka
Copy link

JacekPliszka commented Aug 6, 2021

The exact reason I need this API is because Spark DataFrame optimization does not work for complex transformations I have.

Simple switch to my own custom wrapper on RDDs (lame analogue of this API) gave me 4-20 times better performance and much lower cost of pandas to pyspark migration.

So for me it is not a concern.

@linar-jether
Copy link
Author

linar-jether commented Aug 8, 2021

@HyukjinKwon @holdenk My use case is efficiently creating a spark DataFrame from a distributed dataset, spark currently supports doing this either with remote storage (e.g. write to parquete files) or using the rdd[Row] method, both are inefficient..

The suggestion to use DataFrame[binary] could work as well, although this does incur another serialization + copying stage, so i don't see the benefit over directly creating the Dataframe out of arrow RecordBatches.

I must say that we use this internally quite a bit (since Spark 2.X) and it greatly improves productivity, some example use cases: Reading large climatological datasets using xarray and treating them as a single Spark DataFrame.
Running many optimization problems (e.g. cvxpy) in parallel using RDDs and accessing their results as single Spark DataFrame.

I believe this feature can improve interoperability with other python libraries, similar to what can be done with Dask's dd.from_delayed(dfs), and allow people to leverage Spark's SQL capabilities instead of working directly with RDDs

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Oct 21, 2021

@linar-jether, would you mind sharing your pseudo codes? I am trying to figure out the general approach to address this problem (e.g., SPARK-32846, SPARK-30153, SPARK-26413).

@linar-jether
Copy link
Author

@HyukjinKwon What do you mean by pseudo codes? My initial snippet for using pandas<->arrow<->spark conversions was done using this:
https://gist.github.com/linar-jether/7dd61ed6fa89098ab9c58a1ab428b2b5 (based on spark 2.x)

And this comment for converting directly from arrow RecordBatches without using pandas: https://gist.github.com/linar-jether/7dd61ed6fa89098ab9c58a1ab428b2b5#gistcomment-3452086 (works with spark 3.x)

Basically, all of the logic for creating a dataframe from Arrow RecordBatches/Table objects already exists at PythonSQLUtils.toDataFrame, this PR only integrates it into the main api, and helps a bit with schemas and type conversions.

@linar-jether
Copy link
Author

linar-jether commented Oct 21, 2021

Regarding the issues you've mentioned, i think a simple RDD[arrow] -> spark.DataFrame public api would make most of these use cases pretty simple to implement.

If you feel this is a good approach, i can add an option for _dataframe_to_arrow_record_batch to accept arrow objects in "passthrough" mode

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@samkumar
Copy link

Have there been any updates for adding this kind of functionality since this pull request? Being able to take an RDD of pyarrow RecordBatches or pandas DataFrames and turn it into a Spark DataFrame would be very useful turning a dataset distributed at the workers outside of Spark into a Spark DataFrame for analysis.

Even if an API like this hasn't been added, is there any guidance on achieving this (building a Spark DataFrom from an RDD of pandas RecordBatches or pandas DataFrames) in Spark 3.4/3.5? As far as I can tell, the code in this pull request no longer works on the latest versions of Spark because toDataFrame now accepts an iterator as its argument, not an RDD.

@JacekPliszka
Copy link

I think currently RDD[python arrow table] -> DataFrame would be even better as it would allow not only pandas but polars and pyarrow as well.

@linar-jether
Copy link
Author

@samkumar @JacekPliszka
Using spark 3.5, here's a sample for converting an RDD of arrow record batches to a spark DataFrame, this still uses internal spark methods, so no guarantees of future compatibility.

Hopefully this can be incorporated into a proper api...

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.pandas.types import from_arrow_schema
import pyarrow as pa
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable

def map_to_record_batch(i):
    df = pd.DataFrame(np.random.randn(10, 4) * i, columns=list('ABCD'))
    return pa.RecordBatch.from_pandas(df)

if __name__ == '__main__':
    spark = SparkSession.builder.appName("Python Arrow-in-Spark example").getOrCreate()

    # Create an RDD of Arrow RecordBatch objects
    ardd = spark.sparkContext.parallelize([1, 2, 3, 4], 4)
    ardd = ardd.map(map_to_record_batch).cache()  # cache to avoid recomputing after inferring schema

    # Peek at the first record batch to infer schema
    arrow_schema = ardd.first().schema
    spark_schema = from_arrow_schema(arrow_schema)

    # Convert RDD[RecordBatch] to RDD[bytearray] for serialization
    ardd = ardd.map(lambda x: bytearray(x.serialize()))

    # Create a spark DataFrame from RDD[bytearray] and schema
    jrdd = ardd._to_java_object_rdd()
    jdf = spark._jvm.PythonSQLUtils.toDataFrame(jrdd, spark_schema.json(),
                                                spark._jsparkSession)
    df = DataFrame(jdf, spark)
    df._schema = spark_schema

    df.show()

@JacekPliszka
Copy link

JacekPliszka commented Nov 21, 2023

@linar-jether

jdf = spark._jvm.PythonSQLUtils.toDataFrame(jrdd, spark_schema.json(),
                                            spark._jsparkSession)

Fails for me here with

py4j.Py4JException: Method toDataFrame([class org.apache.spark.api.java.JavaRDD, class java.lang.String, class org.apache.spark.sql.SparkSession]) does not exist. This is single node cluster and the method does exist.. strange..

Acording to Databricks this is 3.5.0 - on 3.3.0 it failed earlier.

Same error on my PC

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Nov 21, 2023

You could even do sth like this:

import pyarrow

spark_schema = from_arrow_schema(ardd.first().schema)
ser_ardd = ardd.map(lambda x: bytearray(x.serialize()))
df = spark.createDataFrame(ser_ardd, "binary")
df.mapInArrow(pyarrow.deserialize, schema=spark_schema)

@JacekPliszka
Copy link

JacekPliszka commented Nov 21, 2023

my pyarrow.deserialize has no text_signature

But this worked as workaround:

def f(obj):
return pa.deserialize(obj)

df1=df.mapInArrow(f, schema=spark_schema)

Still df1.collect() failed - does it work for you?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
7 participants