# Tests of PySpark UDF with mapInArrow vs. mapInPandas
Spark 3.3.0 comes with an improvement in the Python UDF API: mapInArrow is a new feature in Spark 3.3.0, it works in a simmilar way to mapInPandas but it skips the pandas conversion steps.  
This notebook uses the awkward array library to process the UDF instead of pandas,
which has performance advantages for complex data types, in particular arrays.  

In [None]:
# download or install pyspark
! pip install pyspark

In [1]:
# This is a new feature in Spark 3.3.0
# See https://issues.apache.org/jira/browse/SPARK-37227

# optionally use findspark
# import findspark
# findspark.init("/home/luca/Spark/spark-3.3.0-bin-hadoop3")

# use only 1 core to make performance comparisons easier/cleaner

from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .appName("dimuon mass")  \
        .master("local[1]") \
        .config("spark.driver.memory", "2g") \
        .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/06/24 14:26:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Dataset preparation

In [2]:
# simple tests: create data from memory

# We use a schema with a column that is a numerical array 
# this is where converting to Pandas is often slow

df = spark.sql("select Array(rand(),rand(),rand()) col3 from range(1e8)")


In [3]:
%%time
# write to a noop sink for testing purposes
# this is to test the speed of processing the dataframe with no additional operations

df.write.format("noop").mode("overwrite").save()

[Stage 0:>                                                          (0 + 1) / 1]

CPU times: user 5.22 ms, sys: 1.03 ms, total: 6.25 ms
Wall time: 5.75 s


                                                                                

## mapInPAndas tests
These tests use mapInPandas, compare with the results when using mapInArrow

### Test 1 - dummy UDF

In [4]:
%%time 
  
# A dummy UDF that just returns the input data
def UDF_dummy(iterator):
    for batch in iterator:
        yield batch

df.mapInPandas(UDF_dummy, df.schema).write.format("noop").mode("overwrite").save()

[Stage 1:>                                                          (0 + 1) / 1]

CPU times: user 243 ms, sys: 29.8 ms, total: 273 ms
Wall time: 42.7 s


                                                                                

### Test 2: square the array elements with mapInPandas

In [5]:
%%time 

# UDF function that squares the input
def UDF_pandas_square(iterator):
    for batch in iterator:
        yield batch*batch
        
df.mapInPandas(UDF_pandas_square, df.schema).write.format("noop").mode("overwrite").save()

[Stage 2:>                                                          (0 + 1) / 1]

CPU times: user 18.5 ms, sys: 1.98 ms, total: 20.5 ms
Wall time: 1min 32s


                                                                                

## mapInArrow tests
These tests use mapInArrow, compare with the results when using mapInPandas

### Test 3: dummy UDF using mapInArrow

In [6]:
%%time 

# A dummy UDF that just returns the input data
def UDF_dummy(iterator):
    for batch in iterator:
        yield batch
        
df.mapInArrow(UDF_dummy, df.schema).write.format("noop").mode("overwrite").save() 

[Stage 3:>                                                          (0 + 1) / 1]

CPU times: user 12.2 ms, sys: 3.46 ms, total: 15.7 ms
Wall time: 22.3 s


                                                                                

### Test 4: dummy UDF processing using mapInArrow and awkward array

In [None]:
# Install the awkward arrays library
# https://awkward-array.readthedocs.io/en/latest/

! pip install awkward

In [8]:
%%time

import awkward as ak

# a dummy UDF that convert back and forth to awkward arrays
# it just returns the input data
def UDF_dummy_with_awkward_array(iterator):
    for batch in iterator:
        b = ak.from_arrow(batch)
        yield from ak.to_arrow_table(b).to_batches()

df.mapInArrow(UDF_dummy_with_awkward_array, df.schema).write.format("noop").mode("overwrite").save()

[Stage 4:>                                                          (0 + 1) / 1]

CPU times: user 170 ms, sys: 35.3 ms, total: 205 ms
Wall time: 18.8 s


                                                                                

### Test 5: square the array elements using awkward array

In [9]:
%%time

import awkward as ak
import numpy as np

def UDF_awkward_array_square(iterator):
    for batch in iterator:
        b = ak.from_arrow(batch)
        b2 = ak.zip({"col3": np.square(b["col3"])}, depth_limit=1)
        yield from ak.to_arrow_table(b2).to_batches()

df.mapInArrow(UDF_awkward_array_square, df.schema).write.format("noop").mode("overwrite").save()

[Stage 5:>                                                          (0 + 1) / 1]

CPU times: user 6.4 ms, sys: 5.14 ms, total: 11.5 ms
Wall time: 22.4 s


                                                                                

## DataFrame API with higher-order functions

### Test 6: Square the array elements using Spark higher order function for array processing
If you can process arrays within the JVM, with Dataframe/SQL functions, is often advantageous compared to Python UDFs.

In [10]:
%%time

df2 = df.selectExpr("transform(col3, x -> x * x) as col3_squared")

df2.write.format("noop").mode("overwrite").save()


[Stage 6:>                                                          (0 + 1) / 1]

CPU times: user 6.49 ms, sys: 1.55 ms, total: 8.04 ms
Wall time: 22.3 s


                                                                                