Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22221][DOCS] Adding User Documentation for Arrow #19575

Closed

Conversation

BryanCutler
Copy link
Member

What changes were proposed in this pull request?

Adding user facing documentation for working with Arrow in Spark

@BryanCutler
Copy link
Member Author

BryanCutler commented Oct 25, 2017

This is a WIP to start adding user documentation on how to use and describe any differences that the user might see working with Arrow enabled functionality. I'm not sure if the SQL programming guide is the right place to add it, but I'll start here and can move if needed.

Here is a high-level list of things to add:

  • brief description and how to install pyarrow
  • how to use Arrow for toPandas()
  • how to use pandas_udf for basic udfs
  • how to use pandas_udf for groupby apply etc
  • cover any differences user might see in data, i.e. timestamps
  • unsupported types
  • how to set arrow record batch size

@SparkQA
Copy link

SparkQA commented Oct 25, 2017

Test build #83053 has finished for PR 19575 at commit 0723e86.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@felixcheung felixcheung left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need some rudimentary doc for 2.3?

@HyukjinKwon
Copy link
Member

Yea, I would like to know it too.

@BryanCutler
Copy link
Member Author

Yes, I think we do need something to at least highlight some differences if using Arrow. I've been meaning to work on this, just been too busy lately. @icexelloss if you're able to help out on this, that would be great!

@icexelloss
Copy link
Contributor

I am happy to help out with some sections.

@SparkQA
Copy link

SparkQA commented Jan 17, 2018

Test build #86218 has finished for PR 19575 at commit 5699d1b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@BryanCutler
Copy link
Member Author

@HyukjinKwon @ueshin @gatorsmile does this seem like an appropriate place to put Arrow related user docs? I think we just need to add something for additional pandas_udfs and it's still a little rough so I will go over it all again.

@SparkQA
Copy link

SparkQA commented Jan 23, 2018

Test build #86540 has finished for PR 19575 at commit 47bfc21.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@ueshin ueshin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this! I left some comments.


## How to Write Vectorized UDFs

A vectorized UDF is similar to a standard UDF in Spark except the inputs and output of the will
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

output of the will -> output of the udf will?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ooops, I think I meant the inputs and output will be Pandas Series

a `datetime64` type with nanosecond resolution, `datetime64[ns]` with optional time zone.

When timestamp data is transferred from Spark to Pandas it will be converted to nanoseconds
and made time zone aware using the Spark session time zone, if set, or local Python system time
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use session time zone anyway. If not set, the default is JVM system timezone (the value returned by TimeZone.getDefault()).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I was thinking of when respectSessionTimeZone is false, but since it is true by default and will is deprecated. But to keep things simple maybe best not to mention this conf and just say session tz or JVM default?


Apache Arrow is an in-memory columnar data format that is used in Spark to efficiently transfer
data between JVM and Python processes. This currently is most beneficial to Python users that
work with Pandas/NumPy data. It's usage is not automatic and might require some minor
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's usage is -> Its usage is?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, you're right

high memory usage in the JVM. To avoid possible out of memory exceptions, the size of the Arrow
record batches can be adjusted by setting the conf "spark.sql.execution.arrow.maxRecordsPerBatch"
to an integer that will determine the maximum number of rows for each batch. Using this limit,
each data partition will be made into 1 or more record batches for processing.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we mention about the default value of spark.sql.execution.arrow.maxRecordsPerBatch?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

@BryanCutler
Copy link
Member Author

Thanks for the review @ueshin! If the RC passes, will this still be able to get in before the docs are updated? @icexelloss will you be able to write a brief section on groupby/apply soon, in case this can be merged?

@icexelloss
Copy link
Contributor

Hi Bryan, sorry I haven't got chance to take a look at this. Yes I can write the groupby apply section tomorrow.

@SparkQA
Copy link

SparkQA commented Jan 25, 2018

Test build #86610 has finished for PR 19575 at commit b33346c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

and each column will be made time zone aware using the Spark session time zone. This will occur
when calling `toPandas()` or `pandas_udf` with a timestamp column. For example if the session time
zone is 'America/Los_Angeles' then the Pandas timestamp column will be of type
`datetime64[ns, America/Los_Angeles]`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid this is not correct.
The timestamp value will be timezone naive anyway which represents the timestamp respecting the session timezone, but the timezone info will be dropped. As a result, the timestamp column will be of type datetime64[ns].

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I should have refreshed my memory better before writing.. fixing now

@felixcheung
Copy link
Member

it looks like maybe we have a blocker for RC2?
let's try to get this in soon so it could get into 2.3.0?

@HyukjinKwon
Copy link
Member

How about a critical @felixcheung? I will focus on this one anyway to get this into 2.3.0. Seems RC3 will be going on if I didn't misunderstand.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Insane nitpicking. Thoughts and suggestions are mixed. Maybe you could just pick up what makes sense to you. Doc is kind of an important but grunting job to be honest .. thank you for doing this.


## How to Enable for Conversion to/from Pandas

Arrow is available as an optimization when converting a Spark DataFrame to Pandas using the call
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tiny nit: there's a trailing whitespace

Note that a standard UDF (non-Pandas) will load timestamp data as Python datetime objects, which is
different than a Pandas timestamp. It is recommended to use Pandas time series functionality when
working with timestamps in `pandas_udf`s to get the best performance, see
[here](https://pandas.pydata.org/pandas-docs/stable/timeseries.html) for details.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto for a trailing whitespace

give a high-level description of how to use Arrow in Spark and highlight any differences when
working with Arrow-enabled data.

## Ensure pyarrow Installed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, pyarrow -> PyArrow


## Ensure pyarrow Installed

If you install pyspark using pip, then pyarrow can be brought in as an extra dependency of the sql
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe, pyspark -> PySpark


## Ensure pyarrow Installed

If you install pyspark using pip, then pyarrow can be brought in as an extra dependency of the sql
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sql -> SQL

## How to Write Vectorized UDFs

A vectorized UDF is similar to a standard UDF in Spark except the inputs and output will be
Pandas Series, which allow the function to be composed with vectorized operations. This function
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pandas Series -> Pandas Series/DataFrame maybe also saying please check the API doc. Maybe this one needs a help from @icexelloss to generally organise these and clean up. This description sounds only for scalar UDFs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I can help with that. @BryanCutler do you mind if I make some change to this section?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I don't mind please go ahead

<div data-lang="python" markdown="1">
{% highlight python %}

import numpy as np
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a example file separately?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that too as they were a little bit longer than I thought. How about we leave them here for now and then follow up with separate files with proper runnable examples?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yuo, sounds fine.

0.8.0. You can install using pip or conda from the conda-forge channel. See pyarrow
[installation](https://arrow.apache.org/docs/python/install.html) for details.

## How to Enable for Conversion to/from Pandas
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe "Enabling for Conversion to/from Pandas" just to match the sentence form

give a high-level description of how to use Arrow in Spark and highlight any differences when
working with Arrow-enabled data.

## Ensure pyarrow Installed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems there are more sub topics than I thought. Probably, we could consider remove this one too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove the section on installing or just the header and merge with the above section?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed to a sub-heading, let me know if you think that is better

@@ -1640,6 +1640,154 @@ Configuration of Hive is done by placing your `hive-site.xml`, `core-site.xml` a
You may run `./bin/spark-sql --help` for a complete list of all available
options.

# Usage Guide for Pandas with Arrow
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we leave a word "PySpark" somewhere at the first?

@icexelloss
Copy link
Contributor

@BryanCutler I added a section for groupby apply here: https://github.com/BryanCutler/spark/pull/29/files

@BryanCutler
Copy link
Member Author

Thanks all, I'll merge the groupby PR and do an update now

@BryanCutler BryanCutler changed the title [WIP][SPARK-22221][DOCS] Adding User Documentation for Arrow [SPARK-22221][DOCS] Adding User Documentation for Arrow Jan 25, 2018
@SparkQA
Copy link

SparkQA commented Jan 25, 2018

Test build #86648 has finished for PR 19575 at commit 85e895c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@BryanCutler
Copy link
Member Author

Thanks everyone for reviewing! I think I addressed all the comments, so please take one more look.

@BryanCutler
Copy link
Member Author

Updated screens:
arrow_doc_1
arrow_doc_2
arrow_doc_3

@SparkQA
Copy link

SparkQA commented Jan 26, 2018

Test build #86719 has finished for PR 19575 at commit 67ab5e9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 26, 2018

Test build #86720 has finished for PR 19575 at commit 8b629bc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

</div>

### Group Map
Group map Pandas UDFs are used with `groupBy().apply()` which implements the "split-apply-combine" pattern.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rxin WDYT about this name?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which name? If you mean "split-apply-combine", I think it's fine - https://pandas.pydata.org/pandas-docs/stable/groupby.html

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Group Map

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Grouped Vectorized UDFs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can change to whatever you guys like, but I think these two section names were made to reflect the different pandas_udf types - scalar and group map. Is that right @icexelloss ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct. The names in this section matches the enums in PandasUDFType

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@icexelloss we already agreed on the names when we wrote the blog, right?


## Usage Notes

### Supported SQL-Arrow Types
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-> Supported SQL Types


### Supported SQL-Arrow Types

Currently, all Spark SQL data types are supported except `MapType`, `ArrayType` of `TimestampType`, and
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are supported -> are supported by Arrow-based conversion

high memory usage in the JVM. To avoid possible out of memory exceptions, the size of the Arrow
record batches can be adjusted by setting the conf "spark.sql.execution.arrow.maxRecordsPerBatch"
to an integer that will determine the maximum number of rows for each batch. The default value is
10,000 records per batch and does not take into account the number of columns, so it should be
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default value is 10,000 records per batch. Since the number of columns could be huge, the value should be adjusted accordingly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about "The default value is 10,000 records per batch. If the number of columns is large, the value should be adjusted accordingly"


Data partitions in Spark are converted into Arrow record batches, which can temporarily lead to
high memory usage in the JVM. To avoid possible out of memory exceptions, the size of the Arrow
record batches can be adjusted by setting the conf "spark.sql.execution.arrow.maxRecordsPerBatch"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on this description, it sounds like we should not use the number of records, but the size, right? cc @cloud-fan @ueshin too

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, both can be used where applicable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We went with maxRecordsPerBatch because it's easy to implement, otherwise we may need some way to estimate/calculate the memory consumption of arrow data. @BryanCutler is it easy to do?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible to estimate the size of the arrow buffer used, but it does make it more complicated to implement in Spark. I also wonder how useful this would be if the user hits memory problems. At least with a number of records, it's easy to understand and change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current approach is just to make the external users hard to tune. Now, maxRecordsPerBatch also depends on the width your output schema. This is not user friendly to end users.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's not an ideal approach. I'm happy to make a JIRA to followup and look into other ways to break up the batches, but that won't be in before 2.3. So does that mean our options here are (unless I'm not understanding internal/external conf correctly)

  1. Keep maxRecordsPerBatch internal and remove this doc section.
  2. Externalize this conf and deprecate once a better approach is found.

I think (2) is better because if the user hits memory issues, then they can at least find someway to adjust it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it is too late to add a new conf for 2.3 release, we can do it in 2.4 release. In the 2.4 release, we can respect both conf. We just need to change the default of maxRecordsPerBatch to int.max in the 2.4 release. I am fine to externalize it in 2.3 release.

To use `groupBy().apply()`, the user needs to define the following:
* A Python function that defines the computation for each group.
* A `StructType` object or a string that defines the schema of the output `DataFrame`.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to warn users that, the Group Map Pandas UDF requires to load all the data of a group into memory, which is not controlled by spark.sql.execution.arrow.maxRecordsPerBatch, and may OOM if the data is skewed and some partitions have a lot of records.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah good point, I'll add that

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

</div>

### Group Map
Group map Pandas UDFs are used with `groupBy().apply()` which implements the "split-apply-combine" pattern.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Grouped Vectorized UDFs?

or to wrap the function, no additional configuration is required. Currently, there are two types of
Pandas UDF: Scalar and Group Map.

### Scalar
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scalar Vectorized UDFs?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the side note, I think in the context here. Pandas UDFs and Vectorized UDFs are interchangeable from a user's point of view, I am not sure the need for introducing both to the users. Maybe we should just stick to one of them?


## Enabling for Conversion to/from Pandas

Arrow is available as an optimization when converting a Spark DataFrame to Pandas using the call
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a Spark DataFrame to Pandas -> a Spark DataFrame to Pandas DataFrame?

## Enabling for Conversion to/from Pandas

Arrow is available as an optimization when converting a Spark DataFrame to Pandas using the call
`toPandas()` and when creating a Spark DataFrame from Pandas with `createDataFrame(pandas_df)`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a Spark DataFrame from Pandas -> a Spark DataFrame from Pandas DataFrame?

@SparkQA
Copy link

SparkQA commented Jan 29, 2018

Test build #86755 has finished for PR 19575 at commit e46ff0f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

I have two major comments.

  • group map -> grouped map We need to also update PythonEvalType.

    SQL_PANDAS_GROUP_MAP_UDF -> SQL_PANDAS_GROUPED_MAP_UDF
    SQL_PANDAS_GROUP_AGG_UDF -> SQL_PANDAS_GROUPED_AGG_UDF

  • Open a JIRA to add another limit in the next release (2.4) based on memory consumption, instead of number of rows. My major reason is the row size might be different and thus it is possible that the session-based SQLConf spark.sql.execution.arrow.maxRecordsPerBatch needs to be adjusted for different queries. It is hard for users to tune such a conf.

@cloud-fan
Copy link
Contributor

Actually, aggregation can only be executed on grouped data, so SQL_PANDAS_GROUPED_AGG_UDF doesn't seem to be very concise. How about SQL_PANDAS_UDAF? My only concern is how to support partial aggregate in the future, will we add new APIs?

@BryanCutler
Copy link
Member Author

Thanks @gatorsmile , I made https://issues.apache.org/jira/browse/SPARK-23258 to track changing the maxRecordsPerBatch conf and I will externalize it in this PR.

group map -> grouped map We need to also update PythonEvalType.

It seems like we are changing around the groupBy-apply name a lot and I don't want to change things here unless this has been agreed upon, can you confirm @icexelloss ?

@BryanCutler
Copy link
Member Author

is it possible to decide on the names for groupBy()-apply() UDFs as a followup? it sounds like there are still things that need discussion

@gatorsmile
Copy link
Member

Thanks! I will submit a follow-up PR to rename it.

Merged to 2.3 and master.

@gatorsmile
Copy link
Member

BTW, Thanks for your great works! I will add all your names in the contributors of this PR

asfgit pushed a commit that referenced this pull request Jan 29, 2018
## What changes were proposed in this pull request?

Adding user facing documentation for working with Arrow in Spark

Author: Bryan Cutler <cutlerb@gmail.com>
Author: Li Jin <ice.xelloss@gmail.com>
Author: hyukjinkwon <gurwls223@gmail.com>

Closes #19575 from BryanCutler/arrow-user-docs-SPARK-2221.

(cherry picked from commit 0d60b32)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
@asfgit asfgit closed this in 0d60b32 Jan 29, 2018
@BryanCutler
Copy link
Member Author

Thanks to everyone for contributing and reviewing!

@icexelloss
Copy link
Contributor

@gatorsmile I don't think any change of naming (group map, group agg, etc) has been agreed upon yet. We can certainly open an PR to discuss it.

@gatorsmile
Copy link
Member

@icexelloss Thanks for your reply. Welcome your comment in my PR #20428

asfgit pushed a commit that referenced this pull request Jan 30, 2018
…xRecordsPerBatch

## What changes were proposed in this pull request?

This is a followup to #19575 which added a section on setting max Arrow record batches and this will externalize the conf that was referenced in the docs.

## How was this patch tested?
NA

Author: Bryan Cutler <cutlerb@gmail.com>

Closes #20423 from BryanCutler/arrow-user-doc-externalize-maxRecordsPerBatch-SPARK-22221.

(cherry picked from commit f235df6)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
asfgit pushed a commit that referenced this pull request Jan 30, 2018
…xRecordsPerBatch

## What changes were proposed in this pull request?

This is a followup to #19575 which added a section on setting max Arrow record batches and this will externalize the conf that was referenced in the docs.

## How was this patch tested?
NA

Author: Bryan Cutler <cutlerb@gmail.com>

Closes #20423 from BryanCutler/arrow-user-doc-externalize-maxRecordsPerBatch-SPARK-22221.
@BryanCutler BryanCutler deleted the arrow-user-docs-SPARK-2221 branch March 6, 2018 23:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
8 participants