Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-41114] [CONNECT] [PYTHON] [FOLLOW-UP] Python Client support for local data #38803

Closed
wants to merge 5 commits into from

Conversation

grundprinzip
Copy link
Contributor

What changes were proposed in this pull request?

Since the Spark Connect server now supports reading local data from the client. This patch implements the necessary changes in the Python client to support reading from a local Pandas Data frame.

import pandas

pdf = pandas.DataFrame({"a": [1, 2, 3], "b": ["a", "b", "c"]})
df = spark.createDataFrame(pdf)
rows = df.filter(df.a == lit(3)).collect()
self.assertTrue(len(rows) == 1)
self.assertEqual(rows[0][0], 3)
self.assertEqual(rows[0][1], "c")

Why are the changes needed?

Compatibility

Does this PR introduce any user-facing change?

No

How was this patch tested?

UT

@grundprinzip
Copy link
Contributor Author

R: @zhengruifeng @HyukjinKwon

@bjornjorgensen
Copy link
Contributor

Not the biggest issue in the world but, it's common to import pandas as pd.

@grundprinzip
Copy link
Contributor Author

Not the biggest issue in the world but, it's common to import pandas as pd.

Fixed.

@@ -205,6 +207,31 @@ def __init__(self, connectionString: str, userId: Optional[str] = None):
# Create the reader
self.read = DataFrameReader(self)

def createDataFrame(self, data: "pd.DataFrame") -> "DataFrame":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, the implementation here isn't matched to what we have in createDataFrame(pandas).

By default, the Arrow message conversion (more specifically in https://github.com/apache/spark/pull/38659/files#diff-d630cc4be6c65a3c3f7d6dbfe990f99ba992ccc26d9c3aaf6cfe46e163cb7389R514-R521) have to happen in RDD so we can parallelize this.

For a bit of history, PySpark added the initial version with RDD first, and added this local relation as an optimization for small dataset (see also #36683) later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine with the current approach but the main problem here is that 1. we can't stream the input, 2. it will have the size limit (likely 4KB). cc @hvanhovell FYI

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is impossible to match the implementation because in Pyspark to parallelize a first serialization is already happening to pass the input DF to the executors.

In our case to even send the data to spark we have to serialize it.

That said you're right that this currently does not support streaming of local data to the client. But the limit is not 4kb but probably whatever the max message size of GRPC is so in the megabytes.

I think we need to add the client side streaming APIs at some point but I'd like to defer that for a bit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a large pd.dataframe, I guess we can optimize it in this way in the future: split it into several batches, and create a localRelation for each batch, and finally Union them.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couple of comments. I am fine with this as the first initial version.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

self._pdf = pdf

def plan(self, session: "SparkConnectClient") -> proto.Relation:
assert self._pdf is not None
Copy link
Contributor

@amaliujia amaliujia Nov 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: is this a bit redundant though that plan.py is internal API, the constructor does not accepts Optional pandas dataframe and we have mypy to do type checking?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right. It makes sense to move the assertion into the session.

As an FYI, all of the mypy checks are really just for the code that we write. During runtime, the user can pass whatever they want and we should make sure that we have proper checks for it. But since plan is internal API it makes a lot of sense to have the checking on the public API instead.


sink = pa.BufferOutputStream()
table = pa.Table.from_pandas(self._pdf)
with pa.ipc.new_stream(sink, table.schema) as writer:
Copy link
Contributor

@amaliujia amaliujia Nov 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not familiar here so a question:

any possible that an empty panda dataframe are used here (e.g. has schema but no data). If so maybe have a test case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a test for that, thanks for the proposal!

"""
assert data is not None
if len(data) == 0:
raise ValueError("Input data cannot be empty")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, createDataFrame in pyspark doesnot support empty pandas dataframe. I think it would be fine to throw an error here.

@hvanhovell
Copy link
Contributor

alright merging this one.

@amaliujia
Copy link
Contributor

LGTM!

beliefer pushed a commit to beliefer/spark that referenced this pull request Dec 15, 2022
…ocal data

### What changes were proposed in this pull request?
Since the Spark Connect server now supports reading local data from the client. This patch implements the necessary changes in the Python client to support reading from a local Pandas Data frame.

```
import pandas

pdf = pandas.DataFrame({"a": [1, 2, 3], "b": ["a", "b", "c"]})
df = spark.createDataFrame(pdf)
rows = df.filter(df.a == lit(3)).collect()
self.assertTrue(len(rows) == 1)
self.assertEqual(rows[0][0], 3)
self.assertEqual(rows[0][1], "c")
```

### Why are the changes needed?
Compatibility

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
UT

Closes apache#38803 from grundprinzip/SPARK-41114.

Authored-by: Martin Grund <martin.grund@databricks.com>
Signed-off-by: Herman van Hovell <herman@databricks.com>
beliefer pushed a commit to beliefer/spark that referenced this pull request Dec 18, 2022
…ocal data

### What changes were proposed in this pull request?
Since the Spark Connect server now supports reading local data from the client. This patch implements the necessary changes in the Python client to support reading from a local Pandas Data frame.

```
import pandas

pdf = pandas.DataFrame({"a": [1, 2, 3], "b": ["a", "b", "c"]})
df = spark.createDataFrame(pdf)
rows = df.filter(df.a == lit(3)).collect()
self.assertTrue(len(rows) == 1)
self.assertEqual(rows[0][0], 3)
self.assertEqual(rows[0][1], "c")
```

### Why are the changes needed?
Compatibility

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
UT

Closes apache#38803 from grundprinzip/SPARK-41114.

Authored-by: Martin Grund <martin.grund@databricks.com>
Signed-off-by: Herman van Hovell <herman@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
8 participants