# Tests of PySpark UDF with mapInArrow vs. mapInPandas

In [1]:
# This is a new feature, candidate from Spark 3.3.0
# See https://issues.apache.org/jira/browse/SPARK-37227

import findspark
findspark.init("/home/luca/Spark/spark-3.3.0-SNAPSHOT-bin-spark_21220128")

# use only 1 core to make performance comparisons easier/cleaner

from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .appName("dimuon mass")  \
        .master("local[1]") \
        .config("spark.driver.memory", "2g") \
        .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/01/31 21:11:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Dataset preparation

In [2]:
# simple tests: create data from memory

# We use array as this is where converting to pandas is slow

df = spark.sql("select Array(rand(),rand(),rand()) col3 from range(1e8)")


In [3]:
%%time
# write to a noop source
# this is to test the speed of processing the dataframe with no additional operations

df.write.format("noop").mode("overwrite").save()

[Stage 0:>                                                          (0 + 1) / 1]

CPU times: user 4.2 ms, sys: 539 µs, total: 4.74 ms
Wall time: 5.63 s


                                                                                

## mapInPAndas tests

### Test 1 - dummy UDF

In [4]:
%%time 
  
# A dummy UDF that just returns the input data
def UDF_dummy(iterator):
    for batch in iterator:
        yield batch

df.mapInPandas(UDF_dummy, df.schema).write.format("noop").mode("overwrite").save()

[Stage 1:>                                                          (0 + 1) / 1]

CPU times: user 190 ms, sys: 28.6 ms, total: 218 ms
Wall time: 44.8 s


                                                                                

### Test 2: square the array with mapInPandas

In [5]:
%%time 

# UDF function that squares the input
def UDF_pandas_square(iterator):
    for batch in iterator:
        yield batch*batch
        
df.mapInPandas(UDF_pandas_square, df.schema).write.format("noop").mode("overwrite").save()

[Stage 2:>                                                          (0 + 1) / 1]

CPU times: user 23.1 ms, sys: 2.11 ms, total: 25.2 ms
Wall time: 1min 34s


                                                                                

## mapInArrow tests

### Test 3: dummy UDF using mapInArrow

In [6]:
%%time 

# A dummy UDF that just returns the input data
def UDF_dummy(iterator):
    for batch in iterator:
        yield batch
        
df.mapInArrow(UDF_dummy, df.schema).write.format("noop").mode("overwrite").save() 

[Stage 3:>                                                          (0 + 1) / 1]

CPU times: user 9.33 ms, sys: 2 ms, total: 11.3 ms
Wall time: 23.7 s


                                                                                

In [7]:
### Test 4: dummy UDF using mapInArrow and awkward array

In [12]:
%%time

# this requires pip install awkward
import awkward as ak

# a dummy UDF that convert back and forth to awkward arrays
# it just returns the input data
def UDF_dummy_with_awkward_array(iterator):
    for batch in iterator:
        b = ak.from_arrow(batch)
        yield from ak.to_arrow_table(b).to_batches()

df.mapInArrow(UDF_dummy_with_awkward_array, df.schema).write.format("noop").mode("overwrite").save()

[Stage 7:>                                                          (0 + 1) / 1]

CPU times: user 18.5 ms, sys: 2.55 ms, total: 21 ms
Wall time: 19.5 s


                                                                                

### Test 5: square the array using awkward array

In [13]:
%%time

import awkward as ak
import numpy as np

def UDF_awkward_array_square(iterator):
    for batch in iterator:
        b = ak.from_arrow(batch)
        b2 = ak.zip({"col3": np.square(b["col3"])}, depth_limit=1)
        yield from ak.to_arrow_table(b2).to_batches()

df.mapInArrow(UDF_awkward_array_square, df.schema).write.format("noop").mode("overwrite").save()

[Stage 8:>                                                          (0 + 1) / 1]

CPU times: user 17.7 ms, sys: 2.06 ms, total: 19.8 ms
Wall time: 21.3 s


                                                                                

## DataFrame API with higher-order functions

### Square the array using Spark higher order function for array processing

In [10]:
%%time

df2 = df.selectExpr("transform(col3, x -> x * x) as col3_squared")

df2.write.format("noop").mode("overwrite").save()


[Stage 5:>                                                          (0 + 1) / 1]

CPU times: user 4.99 ms, sys: 4.24 ms, total: 9.22 ms
Wall time: 18.7 s


                                                                                