# Apache Arrow

> Apache Arrow is an in-memory columnar data format that is used in Spark to efficiently transfer data between JVM and Python processes. This currently is most beneficial to Python users that work with Pandas/NumPy data.

# Installation

```bash
pip install pyspark[sql]
```

Users need to first set the Spark configuration `spark.sql.execution.arrow.pyspark.enabled` to true. This is disabled by default.

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/01/20 23:04:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Convert to spark dataframe / pandas dataframe 

In [2]:
import pandas as pd
import numpy as np

# Generate a Pandas DataFrame
pdf = pd.DataFrame(np.random.rand(100, 3))

# Create a Spark DataFrame from a Pandas DataFrame using Arrow
df = spark.createDataFrame(pdf)

# Convert the Spark DataFrame back to a Pandas DataFrame using Arrow
result_pdf = df.select("*").toPandas()

  [(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)]
                                                                                

## Pandas user defined function

[doc](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.pandas_udf.html#pyspark.sql.functions.pandas_udf)

imports:
```python
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType
import pandas as pd
```

Syntax
```python
pandas_udf(f=None, returnType=None, functionType=None)
```
- `f` - User defined function
- `returnType` - This is optional but when specified it should be either a DDL-formatted type string or any type of pyspark.sql.types.DataType
- `functionType` - int, optional

In [3]:
columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders")]

# Create DataFrame
df = spark.createDataFrame(data=data,schema=columns)
df.show()

                                                                                

+-----+------------+
|Seqno|        Name|
+-----+------------+
|    1|  john jones|
|    2|tracey smith|
|    3| amy sanders|
+-----+------------+



In [4]:
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType
import pandas as pd

@pandas_udf(StringType())
def to_upper(series : pd.Series) -> pd.Series:
    return series.str.upper()

df.withColumn("UpperCase", to_upper("Name")).show()

                                                                                

+-----+------------+------------+
|Seqno|        Name|   UpperCase|
+-----+------------+------------+
|    1|  john jones|  JOHN JONES|
|    2|tracey smith|TRACEY SMITH|
|    3| amy sanders| AMY SANDERS|
+-----+------------+------------+



## Use `pyspark.pandas` apply function

In [5]:
import pyspark.pandas as ps
import numpy as np

technologies = ({'Fee' :[20000,25000,30000,22000,np.NaN],
                'Discount':[1000,2500,1500,1200,3000]})

psdf = ps.DataFrame(technologies)
print(psdf)

def add(data):
   return data[0]+data[1]

# Apply the function to DataFrame   
addDF = psdf.apply(add, axis=1)
print(addDF)

  fields = [
  [(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)]


       Fee  Discount
0  20000.0      1000
1  25000.0      2500
2  30000.0      1500
3  22000.0      1200
4      NaN      3000


  fields = [
  [(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)]


0    21000.0
1    27500.0
2    31500.0
3    23200.0
4        NaN
dtype: float64


## use DDL in return type when input/output StructType

In [16]:
from pyspark.sql.types import *

            #struct DDL string
@pandas_udf("col1 string, col2 long")
def func(s1: pd.Series, s2: pd.Series, s3: pd.DataFrame) -> pd.DataFrame:
    s3['col2'] = s1 + s2.str.len()
    return s3

# Create a Spark DataFrame that has three columns including a struct column.
df = spark.createDataFrame(
    [[1, "a string", ("a nested string",)]],
    "long_col long, string_col string, struct_col struct<col1:string>")
df.printSchema()

root
 |-- long_col: long (nullable = true)
 |-- string_col: string (nullable = true)
 |-- struct_col: struct (nullable = true)
 |    |-- col1: string (nullable = true)



In [20]:
@pandas_udf("first string, last string")
def split_expand(s: pd.Series) -> pd.DataFrame:
    return s.str.split(expand=True)

df = spark.createDataFrame([("John Doe",)], ("name",))
df.select(split_expand("name")).show()

[Stage 23:>                                                         (0 + 3) / 3]

+------------------+
|split_expand(name)|
+------------------+
|       {John, Doe}|
+------------------+



                                                                                