Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22221][DOCS] Adding User Documentation for Arrow #19575

Closed
Closed
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
148 changes: 148 additions & 0 deletions docs/sql-programming-guide.md
Expand Up @@ -1640,6 +1640,154 @@ Configuration of Hive is done by placing your `hive-site.xml`, `core-site.xml` a
You may run `./bin/spark-sql --help` for a complete list of all available
options.

# Usage Guide for Pandas with Arrow
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we leave a word "PySpark" somewhere at the first?


## Arrow in Spark
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's say "Apache Arrow" here and in L1643? (ASF policy...)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, I just learnt it .. 👍

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should also add "Apache" to the headline above?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, what I mean by L1643.
We should at least use the full name in header(s) and the first mention before simplifying it to "Arrow" afterwards


Apache Arrow is an in-memory columnar data format that is used in Spark to efficiently transfer
data between JVM and Python processes. This currently is most beneficial to Python users that
work with Pandas/NumPy data. Its usage is not automatic and might require some minor
changes to configuration or code to take full advantage and ensure compatibility. This guide will
give a high-level description of how to use Arrow in Spark and highlight any differences when
working with Arrow-enabled data.

## Ensure pyarrow Installed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, pyarrow -> PyArrow

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems there are more sub topics than I thought. Probably, we could consider remove this one too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove the section on installing or just the header and merge with the above section?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed to a sub-heading, let me know if you think that is better


If you install pyspark using pip, then pyarrow can be brought in as an extra dependency of the sql
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe, pyspark -> PySpark

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sql -> SQL

module with the command "pip install pyspark[sql]". Otherwise, you must ensure that pyarrow is
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe `pip install pyspark[sql]`

installed and available on all cluster node Python environments. The current supported version is
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"on all cluster node Python environments" this one looks a bit awkward ..

0.8.0. You can install using pip or conda from the conda-forge channel. See pyarrow
[installation](https://arrow.apache.org/docs/python/install.html) for details.

## How to Enable for Conversion to/from Pandas
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe "Enabling for Conversion to/from Pandas" just to match the sentence form


Arrow is available as an optimization when converting a Spark DataFrame to Pandas using the call
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tiny nit: there's a trailing whitespace

`toPandas()` and when creating a Spark DataFrame from Pandas with `createDataFrame(pandas_df)`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a Spark DataFrame from Pandas -> a Spark DataFrame from Pandas DataFrame?

To use Arrow when executing these calls, it first must be enabled by setting the Spark conf
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

conf -> configuration

'spark.sql.execution.arrow.enabled' to 'true', this is disabled by default.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

, this -> . This


<div class="codetabs">
<div data-lang="python" markdown="1">
{% highlight python %}

import numpy as np
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a example file separately?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that too as they were a little bit longer than I thought. How about we leave them here for now and then follow up with separate files with proper runnable examples?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yuo, sounds fine.

import pandas as pd

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took another spin at this section and below to hopefully make it a bit clearer and indicate that pandas_udf doesn't need any configuration to be set. I also capitalized Pandas UDF to make it consistent and we kind of use it like a proper noun.

# Enable Arrow, 'spark' is an existing SparkSession
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enable Arrow, 'spark' is an existing SparkSession
->
Enable Arrow-based columnar data transfers

spark.conf.set("spark.sql.execution.arrow.enabled", "true")

# Generate sample data
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generate sample data
->
Generate a Pandas DataFrame

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, please be consistent. If we want to use lower-case pandas, we need to do it everywhere

pdf = pd.DataFrame(np.random.rand(100, 3))

# Create a Spark DataFrame from Pandas data using Arrow
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create a Spark DataFrame from Pandas data using Arrow
->
Create a Spark DataFrame from Pandas DataFrame using Arrow

df = spark.createDataFrame(pdf)

# Convert the Spark DataFrame to a local Pandas DataFrame
selpdf = df.select(" * ").toPandas()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: " * " -> "*"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing that caused my editor to change all formatting for some reason.. I think it's just the editor so I'll change it back


{% endhighlight %}
</div>
</div>

Using the above optimizations with Arrow will produce the same results as when Arrow is not
enabled. Not all Spark data types are currently supported and an error will be raised if a column
has an unsupported type, see [Supported Types](#supported-types).

## How to Write Vectorized UDFs

A vectorized UDF is similar to a standard UDF in Spark except the inputs and output will be
Pandas Series, which allow the function to be composed with vectorized operations. This function
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pandas Series -> Pandas Series/DataFrame maybe also saying please check the API doc. Maybe this one needs a help from @icexelloss to generally organise these and clean up. This description sounds only for scalar UDFs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I can help with that. @BryanCutler do you mind if I make some change to this section?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I don't mind please go ahead

can then be run very efficiently in Spark where data is sent in batches to Python and then
is executed using Pandas Series as the inputs. The exected output of the function is also a Pandas
Series of the same length as the inputs. A vectorized UDF is declared using the `pandas_udf`
keyword, no additional configuration is required.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to warn users that, the Group Map Pandas UDF requires to load all the data of a group into memory, which is not controlled by spark.sql.execution.arrow.maxRecordsPerBatch, and may OOM if the data is skewed and some partitions have a lot of records.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah good point, I'll add that

The following example shows how to create a vectorized UDF that computes the product of 2 columns.

<div class="codetabs">
<div data-lang="python" markdown="1">
{% highlight python %}

import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def multiply_func(a, b):
return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())

# The function for a pandas_udf should be able to execute with local Pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0 1
# 1 4
# 2 9
# dtype: int64

# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# | 1|
# | 4|
# | 9|
# +-------------------+

{% endhighlight %}
</div>
</div>

## GroupBy-Apply UDFs

## Usage Notes

### Supported types
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this is a bit generic to list in the table of content? perhaps add SQL-Arrow type or something?
note: there is a link from L1677


Currently, all Spark SQL data types are supported except `MapType`, `ArrayType` of `TimestampType`, and
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are supported -> are supported by Arrow-based conversion

nested `StructType`.

### Setting Arrow Batch Size

Data partitions in Spark are converted into Arrow record batches, which can temporarily lead to
high memory usage in the JVM. To avoid possible out of memory exceptions, the size of the Arrow
record batches can be adjusted by setting the conf "spark.sql.execution.arrow.maxRecordsPerBatch"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on this description, it sounds like we should not use the number of records, but the size, right? cc @cloud-fan @ueshin too

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, both can be used where applicable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We went with maxRecordsPerBatch because it's easy to implement, otherwise we may need some way to estimate/calculate the memory consumption of arrow data. @BryanCutler is it easy to do?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible to estimate the size of the arrow buffer used, but it does make it more complicated to implement in Spark. I also wonder how useful this would be if the user hits memory problems. At least with a number of records, it's easy to understand and change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current approach is just to make the external users hard to tune. Now, maxRecordsPerBatch also depends on the width your output schema. This is not user friendly to end users.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's not an ideal approach. I'm happy to make a JIRA to followup and look into other ways to break up the batches, but that won't be in before 2.3. So does that mean our options here are (unless I'm not understanding internal/external conf correctly)

  1. Keep maxRecordsPerBatch internal and remove this doc section.
  2. Externalize this conf and deprecate once a better approach is found.

I think (2) is better because if the user hits memory issues, then they can at least find someway to adjust it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it is too late to add a new conf for 2.3 release, we can do it in 2.4 release. In the 2.4 release, we can respect both conf. We just need to change the default of maxRecordsPerBatch to int.max in the 2.4 release. I am fine to externalize it in 2.3 release.

to an integer that will determine the maximum number of rows for each batch. The default value is
10,000 records per batch and does not take into account the number of columns, so it should be
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default value is 10,000 records per batch. Since the number of columns could be huge, the value should be adjusted accordingly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about "The default value is 10,000 records per batch. If the number of columns is large, the value should be adjusted accordingly"

adjusted accordingly. Using this limit, each data partition will be made into 1 or more record
batches for processing.

### Timestamp with Time Zone Semantics

Spark internally stores timestamps as UTC values, and timestamp data that is brought in without
a specified time zone is converted as local time to UTC with microsecond resolution. When timestamp
data is exported or displayed in Spark, the session time zone is used to localize the timestamp
values. The session time zone is set with the conf 'spark.sql.session.timeZone' and will default
to the JVM system local time zone if not set. Pandas uses a `datetime64` type with nanosecond
resolution, `datetime64[ns]`, and optional time zone that can be applied on a per-column basis.

When timestamp data is transferred from Spark to Pandas it will be converted to nanoseconds
and each column will be made time zone aware using the Spark session time zone. This will occur
when calling `toPandas()` or `pandas_udf` with a timestamp column. For example if the session time
zone is 'America/Los_Angeles' then the Pandas timestamp column will be of type
`datetime64[ns, America/Los_Angeles]`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid this is not correct.
The timestamp value will be timezone naive anyway which represents the timestamp respecting the session timezone, but the timezone info will be dropped. As a result, the timestamp column will be of type datetime64[ns].

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I should have refreshed my memory better before writing.. fixing now


When timestamp data is transferred from Pandas to Spark, it will be converted to UTC microseconds. This
occurs when calling `createDataFrame` with a Pandas DataFrame or when returning a timestamp from a
`pandas_udf`. These conversions are done automatically to ensure Spark will have data in the
expected format, so it is not necessary to do any of these conversions yourself. Any nanosecond
values will be truncated.

Note that a standard UDF (non-Pandas) will load timestamp data as Python datetime objects, which is
different than a Pandas timestamp. It is recommended to use Pandas time series functionality when
working with timestamps in `pandas_udf`s to get the best performance, see
[here](https://pandas.pydata.org/pandas-docs/stable/timeseries.html) for details.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto for a trailing whitespace


# Migration Guide

## Upgrading From Spark SQL 2.2 to 2.3
Expand Down