# Apache Arrow in PySpark

Apache Arrow is an in-memory columnar data format that is used in Spark to efficiently transfer data between JVM and Python processes. This currently is most beneficial to Python users that work with Pandas/NumPy data. Its usage is not automatic and might require some minor changes to configuration or code to take full advantage and ensure compatibility. This guide will give a high-level description of how to use Arrow in Spark and highlight any differences when working with Arrow-enabled data.


Source: https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html

## Enabling for Conversion to/from Pandas¶

Arrow is available as an optimization when converting a Spark DataFrame to a Pandas DataFrame using the call `DataFrame.toPandas()` and when creating a Spark DataFrame from a Pandas DataFrame with `SparkSession.createDataFrame()`. 

To use Arrow when executing these calls, users need to first set the Spark configuration `spark.sql.execution.arrow.pyspark.enabled` to `true`. This is disabled by default.

In addition, optimizations enabled by `spark.sql.execution.arrow.pyspark.enabled` could fallback automatically to non-Arrow optimization implementation if an error occurs before the actual computation within Spark. This can be controlled by `spark.sql.execution.arrow.pyspark.fallback.enabled`.

In [2]:
import numpy as np  # type: ignore[import]
import pandas as pd  # type: ignore[import]

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

# Generate a Pandas DataFrame
pdf = pd.DataFrame(np.random.rand(100, 3))

# Create a Spark DataFrame from a Pandas DataFrame using Arrow
df = spark.createDataFrame(pdf)

# Convert the Spark DataFrame back to a Pandas DataFrame using Arrow
result_pdf = df.select("*").toPandas()

21/09/10 16:36:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/09/10 16:36:28 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
pdf

Unnamed: 0,0,1,2
0,0.335206,0.872822,0.525587
1,0.222986,0.157885,0.079865
2,0.130469,0.124919,0.155066
3,0.283798,0.036921,0.213622
4,0.694392,0.033615,0.228624
...,...,...,...
95,0.865067,0.574494,0.044925
96,0.214923,0.592693,0.480419
97,0.094417,0.156847,0.993263
98,0.094011,0.629550,0.336109


In [5]:
df.show()

+-------------------+--------------------+--------------------+
|                  0|                   1|                   2|
+-------------------+--------------------+--------------------+
| 0.3352064396423272|  0.8728215732965877|  0.5255868363642634|
|0.22298630129708763| 0.15788549485574022| 0.07986513662620776|
|0.13046907890359405| 0.12491941927507966| 0.15506613582365447|
|0.28379815025111577|0.036920973547738045| 0.21362185315294058|
| 0.6943917755476801|0.033614914396026596|  0.2286238337492128|
| 0.6922019133658045|   0.322579627827808| 0.24837074880596055|
| 0.8248378057998459|   0.558282550414758|  0.9165533624760218|
| 0.9931701853128085| 0.15161830990489822| 0.24755974801264768|
|0.13798023759648048|0.023771983689898524|  0.5922570232302837|
| 0.7568596701802832| 0.07381728978908786|  0.1602515982165701|
| 0.6658604239282875|  0.4869311869948135|  0.5982709119489679|
|0.17263241847966715| 0.30046606350736116| 0.23089694635345193|
|0.21767647095392706| 0.3190558613926789

In [6]:
result_pdf

Unnamed: 0,0,1,2
0,0.335206,0.872822,0.525587
1,0.222986,0.157885,0.079865
2,0.130469,0.124919,0.155066
3,0.283798,0.036921,0.213622
4,0.694392,0.033615,0.228624
...,...,...,...
95,0.865067,0.574494,0.044925
96,0.214923,0.592693,0.480419
97,0.094417,0.156847,0.993263
98,0.094011,0.629550,0.336109
