Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13534][PySpark] Using Apache Arrow to increase performance of DataFrame.toPandas #15821

Conversation

BryanCutler
Copy link
Member

@BryanCutler BryanCutler commented Nov 9, 2016

What changes were proposed in this pull request?

Integrate Apache Arrow with Spark to increase performance of DataFrame.toPandas. This has been done by using Arrow to convert data partitions on the executor JVM to Arrow payload byte arrays where they are then served to the Python process. The Python DataFrame can then collect the Arrow payloads where they are combined and converted to a Pandas DataFrame. All non-complex data types are currently supported, otherwise an UnsupportedOperation exception is thrown.

Additions to Spark include a Scala package private method Dataset.toArrowPayloadBytes that will convert data partitions in the executor JVM to ArrowPayloads as byte arrays so they can be easily served. A package private class/object ArrowConverters that provide data type mappings and conversion routines. In Python, a public method DataFrame.collectAsArrow is added to collect Arrow payloads and an optional flag in toPandas(useArrow=False) to enable using Arrow (uses the old conversion by default).

How was this patch tested?

Added a new test suite ArrowConvertersSuite that will run tests on conversion of Datasets to Arrow payloads for supported types. The suite will generate a Dataset and matching Arrow JSON data, then the dataset is converted to an Arrow payload and finally validated against the JSON data. This will ensure that the schema and data has been converted correctly.

Added PySpark tests to verify the toPandas method is producing equal DataFrames with and without pyarrow. A roundtrip test to ensure the pandas DataFrame produced by pyspark is equal to a one made directly with pandas.

@SparkQA
Copy link

SparkQA commented Nov 9, 2016

Test build #68381 has finished for PR 15821 at commit 4227ec6.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 9, 2016

Test build #68425 has finished for PR 15821 at commit 3f855ec.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 9, 2016

Test build #68427 has finished for PR 15821 at commit b06e11f.

  • This patch fails Python style tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 17, 2016

Test build #68806 has finished for PR 15821 at commit 053e3a6.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 18, 2016

Test build #68812 has finished for PR 15821 at commit 9191b96.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 21, 2016

Test build #68954 has finished for PR 15821 at commit 9191b96.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -1508,7 +1518,7 @@ def toDF(self, *cols):
return DataFrame(jdf, self.sql_ctx)

@since(1.3)
def toPandas(self):
def toPandas(self, useArrow=True):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it maybe make more sense to default this to false or have more thorough checking that the dataframe being written with arrow is supported? At least initially the set of supported dataframes might be rather small.

@BryanCutler
Copy link
Member Author

Hey @holdenk, I just had this in to do my own testing and hadn't thought
about keeping the option, but if we do keep it then yeah you're right, it
would be better to default to the original way.

On Nov 22, 2016 12:02 PM, "Holden Karau" notifications@github.com wrote:

@holdenk commented on this pull request.

In python/pyspark/sql/dataframe.py
#15821 (review):

@@ -1508,7 +1518,7 @@ def toDF(self, *cols):
return DataFrame(jdf, self.sql_ctx)

 @since(1.3)
  • def toPandas(self):
  • def toPandas(self, useArrow=True):

Would it maybe make more sense to default this to false or have more
thorough checking that the dataframe being written with arrow is supported?
At least initially the set of supported dataframes might be rather small.


You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
#15821 (review),
or mute the thread
https://github.com/notifications/unsubscribe-auth/AEUwdd_Y8jogGipNikWJG3JAPy8DoLV8ks5rA0pygaJpZM4KtGBc
.

@mariusvniekerk
Copy link
Member

So this is very cool stuff.

Would it be reasonable to add some api pieces so that on the python side things like DataFrame.mapPartitions makes use of Apache Arrow to lower the serialization costs? Or is that more a follow-on piece of work

@holdenk
Copy link
Contributor

holdenk commented Nov 30, 2016

@mariusvniekerk I think just getting this working for local connection is going to be hard so breaking up using arrow on the driver side into a separate follow up piece of work would make sense.

@BryanCutler
Copy link
Member Author

Thanks @mariusvniekerk, as @holdenk said we are going to try to get something basic working first and after we show some performance improvement, we can follow up with more things

@wesm
Copy link
Member

wesm commented Nov 30, 2016

Luckily we are on the home stretch for making the Java and C++ libraries binary compatible -- e.g. I'm working on automated testing today: apache/arrow#219

@wesm
Copy link
Member

wesm commented Dec 1, 2016

@BryanCutler I'm working with @icexelloss on my end to get involved in this, we were going to start working on unit tests to validate converting each of the Spark SQL data types to Arrow format while the Arrow Java-C++ compatibility work progresses, but we don't want to duplicate any efforts if you're started on this. Perhaps we can create an integration branch someplace to make pull requests into since it will probably take a while until this patch will get accepted into Spark?

@wesm
Copy link
Member

wesm commented Dec 1, 2016

Related to this we'll also want to be able to precisely instrument and benchmark the Dataset <-> Arrow conversion -- @icexelloss suggested might be able to push down the conversion into the executors instead of doing all the work in the driver, but I'm not sure how feasible that is

@BryanCutler
Copy link
Member Author

Hi @wesm and @icexelloss , that sounds good on our end. @yinxusen has been working on validating some basic conversion so far, but everything is still very preliminary so it would be great to work with you guys. I'll setup a new integration branch and ping you all when ready.

Related to this we'll also want to be able to precisely instrument and benchmark the Dataset <-> Arrow conversion -- @icexelloss suggested might be able to push down the conversion into the executors instead of doing all the work in the driver, but I'm not sure how feasible that is

We were thinking about that too, as it would be more ideal. For simplicity we decided to first do the conversion on the driver side, which should hopefully still show a performance increase, then follow up with some work to better optimize it.

@icexelloss
Copy link
Contributor

@BryanCutler , I have been working based on your branch here:
https://github.com/BryanCutler/spark/tree/wip-toPandas_with_arrow-SPARK-13534

Is this the right one?

@BryanCutler
Copy link
Member Author

@icexelloss, @wesm I branched off here for us to integrate our changes https://github.com/BryanCutler/spark/tree/arrow-integration
cc @yinxusen

@wesm
Copy link
Member

wesm commented Dec 2, 2016

OK, let's open pull requests into that branch to help with not stepping on each other's toes. thank you

@wesm
Copy link
Member

wesm commented Jan 18, 2017

Shall we update this PR to the latest and solicit from involvement from Spark committers?

@BryanCutler
Copy link
Member Author

Shall we update this PR to the latest and solicit from involvement from Spark committers?

Yeah, I think it's about ready for that. After we integrate the latest changes, I'll go over once more for some minor cleanup and update this. Probably in the next day or so.

@icexelloss
Copy link
Contributor

icexelloss commented Jan 23, 2017 via email

@BryanCutler
Copy link
Member Author

BryanCutler commented Jan 23, 2017 via email

@BryanCutler BryanCutler force-pushed the wip-toPandas_with_arrow-SPARK-13534 branch from 9191b96 to 9bb75de Compare January 24, 2017 22:40
@SparkQA
Copy link

SparkQA commented Jan 24, 2017

Test build #71950 has finished for PR 15821 at commit 9bb75de.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@BryanCutler
Copy link
Member Author

BryanCutler commented Jan 24, 2017

This has been updated after integrating changes made with @icexelloss and @wesm. There has been good progress made and it would be great if others could take a look and review/test this out.

The current state of toPandas() with Arrow has support for Datasets with primitive, string, and timestamp data types. Complex types such as Structs, Array, and Mapped are not yet supported but are a wip. There is a suite of tests in Scala to test Dataset -> ArrowRecordBatch conversion and a collection on JSON files that serve to validate the converted data is correct. Also, added PySpark tests to verify Pandas frame is correct. It is compiled with the current arrow master 0.1.1-SNAPSHOT at commit apache/arrow@7d3e2a3

The performance so far shows a significant increase and I will follow up with a script to run and details of the results seen. Please ping me with any questions on setting up the build of Arrow or running the benchmarks. It would be great if this could be considered for Spark 2.2 as Arrow 0.2 will be released soon and be able to support the functionality used here.

@holdenk @davies @rxin, I would love to hear your thoughts on this so far. Thanks!
Also cc'ing some on the watch list, @mariusvniekerk @zjffdu @nchammas @zero323 @rdblue

@BryanCutler
Copy link
Member Author

BryanCutler commented Jan 25, 2017

Old Benchmarks with Conversion on Driver

Here are some rough benchmarks done locally on machine with 16GB mem and 8 cores, using Spark config defaults and taken from 50 trials of calling toPandas() measuring wall time in seconds with and without Arrow enabled:

1mm Longs

13.52x speedup on average

_ With Arrow Without Arrow
count 50.000000 50.000000
mean 0.190573 2.576587
std 0.078450 0.114455
min 0.139911 2.259916
25% 0.148212 2.516289
50% 0.163769 2.555433
75% 0.184402 2.631316
max 0.518090 2.946415

1mm Doubles

8.07x speedup on average

_ With Arrow Without Arrow
count 50.000000 50.000000
mean 0.259145 2.090295
std 0.069620 0.123091
min 0.196666 1.998588
25% 0.209051 2.015083
50% 0.230751 2.032701
75% 0.270519 2.122219
max 0.439556 2.485232

Script to generate these can be found here
Happy to run more if there is interest.

@holdenk
Copy link
Contributor

holdenk commented Jan 25, 2017

On a personal note, those benchmarks certainly look very exciting (<3 max of with arrow less than min of without arrow) :)

It certainly seems it would probably be worth the review bandwidth to start looking this over but since this is pretty big and adds a new dependency this could take awhile to move forwards.

It would be great to hear what the other Python focused committers (maybe @davies ?) think of this approach :)

@leifwalsh
Copy link

The next iteration of this for perf would likely involve generating the arrow batches on executors and having the driver use the new streaming arrow format to just forward this to python. In our experiments, assembling arrays of internal rows dominates time, then transposing them and forming an arrow record batch is pretty quick. If we can do that work in parallel on the executors, we're likely to get another big win.

@wesm
Copy link
Member

wesm commented Jan 25, 2017

Very nice to see the improved wall clock times. I have been busy engineering the pipeline between the byte stream from Spark and the resulting DataFrame -- the only major thing still left on the table that might help is converting strings in C++ to pandas.Categorical rather than returning a dense array of strings.

I'll review this patch in more detail when I can

I'll do a bit of performance analysis (esp. on the Python side) and flesh out some of the architectural next-steps (e.g. what @leifwalsh has described) in advance of Spark Summit in a couple weeks. Parallelizing the record batch conversion and streaming it to Python would be another significant perf win. Having these tools should also be helpful for speeding up UDF evaluation

@BryanCutler
Copy link
Member Author

Parallelizing the record batch conversion and streaming it to Python would be another significant perf win.

Right, I should have also mentioned that this PR takes a simplistic approach and collects rows to the driver, where all the conversion is done. Offloading this to the executors should boost the performance more.

@shaneknapp
Copy link
Contributor

@holdenk yeah, another set of eyes would be great! i haven't actually touched the test infra code in a long time and i'm currently wrapping my brain around the order of operations that run-pip-tests goes through in conjunction w/everything else.

i have a feeling that the chain of scripts (run-tests-jenkins -> run-tests-jenkins.py -> run-tests -> run-pip-tests) besides being confusing for humans (ie: me), is also fragile WRT conda envs (aka munging PATH) in our environment.

would installing pyarrow 0.4.0 in the py3k conda env fix things? if so, i can bang that out in moments.

@holdenk
Copy link
Contributor

holdenk commented Jun 26, 2017

@shaneknapp it might, assuming the Conda cache is shared it should avoid needing to fetch the package. I'm not super sure but I think we might have better luck updating conda on the jenkins machines (if people are ok with that) since it seems like this is probably from an out of date conda.

@MaheshIBM
Copy link

MaheshIBM commented Jun 27, 2017

This does not seem like a timeout issue, the certificate CN and the what is used as the hostname are not matching. So clearly the client downloads the certificate but is not able to verify (no timeout). If anything it may be possible to configure the code/command to ignore ssl cert errors.

At this point I checked there is no host with hostname as below. so clearly some cert on the anaconda side is not set up properly, if I get the host name from where the package download is attempted I can investigate more.

ping conda.binstart.org
ping: unknown host conda.binstart.org

For troubleshooting from the problematic host, you can try using openssl to verify the certs, below is a sample of a successful negotiation.

openssl s_client -showcerts -connect anaconda.org:443

Lot of output here.
Server certificate
subject=/C=US/postalCode=78701/ST=TX/L=Austin/street=221 W 6th St, Ste 1550/O=Continuum Analytics Inc/OU=Information Technology/OU=PremiumSSL Wildcard/CN=*.anaconda.org
issuer=/C=GB/ST=Greater Manchester/L=Salford/O=COMODO CA Limited/CN=COMODO RSA Organization Validation Secure Server CA

No client certificate CA names sent
Server Temp Key: ECDH, prime256v1, 256 bits

SSL handshake has read 5488 bytes and written 373 bytes

New, TLSv1/SSLv3, Cipher is ECDHE-RSA-AES128-GCM-SHA256
Server public key is 4096 bit
Secure Renegotiation IS supported
Compression: NONE
Expansion: NONE
SSL-Session:
Protocol : TLSv1.2
Cipher : ECDHE-RSA-AES128-GCM-SHA256
Session-ID: 32C19785E3801B08BB2C5997BC437A54C13C6D3F4D678F5927B028B5AAE7E2C1
Session-ID-ctx:
Master-Key: 5B2AD811A5D131CD9565311AD0A4749DC0D03657E91B32B22B77813905B9CD1865FF7DB0E67395EB1DE194848DD0037A
Key-Arg : None
Krb5 Principal: None
PSK identity: None
PSK identity hint: None
Start Time: 1498537784
Timeout : 300 (sec)
Verify return code: 0 (ok) `

@BryanCutler
Copy link
Member Author

BryanCutler commented Jun 27, 2017 via email

@MaheshIBM
Copy link

MaheshIBM commented Jun 27, 2017

That lends me to believe that the download request could be resolving to different hosts every time, can it happen if there is a CDN working in the background? Not all hosts are configured to use the bad certificate. While one (or more possibly) are using a certificate with DN of conda.binstar.org and responding to the domain name in the hostname of the url from where the package download is attempted.

If there is a way for configuring pip to ignore ssl errors (only for purpose of troubleshooting and find root cause of the problem here), then that is one possible direction to take. I am looking for ways to ignore ssl errors when using pip, will update the comment if i find something.

--Update
There is a --trusted-host param that can be passed to pip

--Update 2
To double check I downloaded the certificate from binstar.org and saw the values in field below, which exactly matches what pip is complaining about.

 X509v3 Subject Alternative Name: 
                DNS:anaconda.com, DNS:anacondacloud.com, DNS:anacondacloud.org, DNS:binstar.org, DNS:wakari.io

@shaneknapp
Copy link
Contributor

i agree w/@MaheshIBM that we're looking at a bad CA cert. i think we're looking at a problem on continuum.io's side, not our side.

however, i do no like the thought of ignoring certs (on principle). :)

and finally, if i'm reading the run-pip-tests code correctly (and please correct me if i'm wrong @holdenk ), we're just creating a temp python environment in /tmp, installing some packages, running the tests, and then moving on.

some thoughts/suggestions:

  • our conda environment is pretty stagnant and hasn't been explicitly upgraded since we deployed anaconda python over a year ago.
  • the py3k environment that exists in the workers' conda installation is solely used by spark builds, so updating said environment w/the packages in the run-pip-tests will remove the need to download them, but at the same time, make the tests a NOOP.
  • we can hope that continuum fixes their cert issue asap. :\

@holdenk
Copy link
Contributor

holdenk commented Jun 27, 2017

@shaneknapp your understanding about what run-pip-tests code is pretty correct. It's important to note that part of the test is installing the pyspark package its self to makesure we didn't break the packaging, and pyarrow is only installed because we want to be able to run some pyarrow tests with it -- we don't need that to be part of the packaging tests infact it would be simpler to have it be part of the normal tests.

So one possible approach to fix this I think would be updating conda on the machines because its old, installing pyarrow into the py3k worker env, and then taking the pyarrow tests out of the packaging test and instead have them run in the normal flow.

I'm not super sure this is a cert issue per-se, it seems that newer versions of conda are working fine (it's possible the SSL lib is slightly out of date and not understanding wildcards or something else in the cert)?

@shaneknapp
Copy link
Contributor

shaneknapp commented Jun 27, 2017

okie dokie. how about i install pyarrow in the py3k conda environment right now... and once that's done, we can remove the pyarrow test from run-pip-tests and add it to the regular tests.

so, who wants to take care of the test updating? :)

@holdenk
Copy link
Contributor

holdenk commented Jun 27, 2017

I can do the test updating assuming that @BryanCutler is traveling. I've got a webinar this afternoon but I can do it after I'm done with that.

Also I don't think its the wild card issue now that I think about it some more, its that new conda deprecates binstar and our old conda is going to binstar which is just pointing to the conda host but the conda host now has an SSL cert just for conda not conda and binstar. I don't think contium is going to fix that, rather I suspect the answer is going to be just to upgrade to a newer version of conda.

@shaneknapp
Copy link
Contributor

yeah, i think you're right. however, upgrading to a new version of conda on a live environment does indeed scare me a little bit. :)

w/the new jenkins, i'll have a staging server dedicated to testing crap like this. ah, the future: so shiny and bright!

@shaneknapp
Copy link
Contributor

anyways: installing pyarrow right now.

@shaneknapp
Copy link
Contributor

done

@shaneknapp
Copy link
Contributor

(py3k)-bash-4.1$ pip install pyarrow
Requirement already satisfied: pyarrow in /home/anaconda/envs/py3k/lib/python3.4/site-packages
Requirement already satisfied: six>=1.0.0 in /home/anaconda/envs/py3k/lib/python3.4/site-packages (from pyarrow)
Requirement already satisfied: numpy>=1.9 in /home/anaconda/envs/py3k/lib/python3.4/site-packages (from pyarrow)

@BryanCutler
Copy link
Member Author

BryanCutler commented Jun 27, 2017 via email

@shaneknapp
Copy link
Contributor

btw, do we want pyarrow-0.4.0 or -0.4.1? i'm assuming the latter based on #15821 (comment)

@wesm
Copy link
Member

wesm commented Jun 27, 2017

I recommend using the latest. The data format is forward/backward compatible so the JAR doesn't necessarily need to be 0.4.1 if you're using pyarrow 0.4.1 (0.4.1 fixed a Decimal regression in the Java library, but that isn't relevant here quite yet)

@shaneknapp
Copy link
Contributor

roger copy... "latest" is 0.4.1, which is what's currently on the jenkins workers.

@holdenk
Copy link
Contributor

holdenk commented Jun 27, 2017

Great, thanks @shaneknapp . @BryanCutler I've got a webinar and if you don't have a chance to change the tests around until after I'm done teaching I'll do it, but if your flight lands first then go for it :)

@cloud-fan
Copy link
Contributor

the last build still failed, shall we update dev/run-pip-tests to use pip?

@cloud-fan
Copy link
Contributor

Some PRs are blocked because of this failures, for days. I'm reverting it, @BryanCutler please reopen this PR after fixing the pip stuff, thanks!

@HyukjinKwon
Copy link
Member

Wait @cloud-fan! just want to ask a quesiton.

@HyukjinKwon
Copy link
Member

Should we maybe wait for #18443? Actually, I think there is an alternative for this - #18439 rather than reverting whole PR.

Reverting is also an option. I hope these were considered (or I assume already considered).

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Jun 28, 2017

FWIW, I am not against reverting. Just wanted to provide some contexts just in case missed.

@holdenk
Copy link
Contributor

holdenk commented Jun 28, 2017

I'm against #18439 , I'd rather revert this and fix it later than installing packages without SSL.

@BryanCutler
Copy link
Member Author

BryanCutler commented Jun 28, 2017 via email

@cloud-fan
Copy link
Contributor

Sorry I didn't know there is a PR fixing the issue, and I already reverted it. Please cherry-pick this commit in the new PR and apply the pip fixing. Sorry for the trouble.

@rxin
Copy link
Contributor

rxin commented Jun 28, 2017

In the future we should revert PRs that fail builds IMMEDIATELY. There is no way we should've let the build be broken for days.

robert3005 pushed a commit to palantir/spark that referenced this pull request Jun 29, 2017
…DataFrame.toPandas

## What changes were proposed in this pull request?
Integrate Apache Arrow with Spark to increase performance of `DataFrame.toPandas`.  This has been done by using Arrow to convert data partitions on the executor JVM to Arrow payload byte arrays where they are then served to the Python process.  The Python DataFrame can then collect the Arrow payloads where they are combined and converted to a Pandas DataFrame.  All non-complex data types are currently supported, otherwise an `UnsupportedOperation` exception is thrown.

Additions to Spark include a Scala package private method `Dataset.toArrowPayloadBytes` that will convert data partitions in the executor JVM to `ArrowPayload`s as byte arrays so they can be easily served.  A package private class/object `ArrowConverters` that provide data type mappings and conversion routines.  In Python, a public method `DataFrame.collectAsArrow` is added to collect Arrow payloads and an optional flag in `toPandas(useArrow=False)` to enable using Arrow (uses the old conversion by default).

## How was this patch tested?
Added a new test suite `ArrowConvertersSuite` that will run tests on conversion of Datasets to Arrow payloads for supported types.  The suite will generate a Dataset and matching Arrow JSON data, then the dataset is converted to an Arrow payload and finally validated against the JSON data.  This will ensure that the schema and data has been converted correctly.

Added PySpark tests to verify the `toPandas` method is producing equal DataFrames with and without pyarrow.  A roundtrip test to ensure the pandas DataFrame produced by pyspark is equal to a one made directly with pandas.

Author: Bryan Cutler <cutlerb@gmail.com>
Author: Li Jin <ice.xelloss@gmail.com>
Author: Li Jin <li.jin@twosigma.com>
Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes apache#15821 from BryanCutler/wip-toPandas_with_arrow-SPARK-13534.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet