In [62]:
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StructField, FloatType, StructType
import pandas as pd
import numpy as np
from numpy import random
import time

StatementMeta(large, 10, 62, Finished, Available)

In [63]:
one_k = 1000
num_records_k = 10 
num_records = num_records_k * one_k
compute_time = 0.05

StatementMeta(large, 10, 63, Finished, Available)

In [64]:
def simple_numpy_calc(narr, compute_time=compute_time):
    avg_arr=[]
    for x in narr:
        time.sleep(compute_time)
        avg_arr.append(np.mean(x))
    return np.array(avg_arr)

StatementMeta(large, 10, 64, Finished, Available)

## Show the logic over 5 records

In [65]:
pdf=pd.DataFrame([tuple(random.uniform(1,10,3)) for i in range(5)], columns=["x", "y","z"])
pdf


StatementMeta(large, 10, 65, Finished, Available)

Unnamed: 0,x,y,z
0,2.651284,5.048348,4.525751
1,9.571427,3.858004,1.450848
2,3.805154,1.522073,5.050001
3,8.305749,2.944056,3.630231
4,6.882069,6.28254,1.411243


In [66]:
arr = pdf.to_numpy()
arr

StatementMeta(large, 10, 66, Finished, Available)

array([[2.65128395, 5.04834829, 4.52575085],
       [9.57142734, 3.85800376, 1.45084838],
       [3.80515353, 1.52207297, 5.05000142],
       [8.30574893, 2.9440563 , 3.6302309 ],
       [6.88206903, 6.28254018, 1.41124278]])

In [67]:
# process 5 records, 2 seconds per record. Will take 10 seconds...
start = time.time()
mean_arr = simple_numpy_calc(arr, 2)
print(f"Run in {time.time()-start} seconds")
mean_arr

StatementMeta(large, 10, 67, Finished, Available)

Run in 10.008042573928833 seconds


array([4.0751277 , 4.96009316, 3.45907597, 4.96001204, 4.85861733])

In [68]:
pdf["mean"]=mean_arr
pdf

StatementMeta(large, 10, 68, Finished, Available)

Unnamed: 0,x,y,z,mean
0,2.651284,5.048348,4.525751,4.075128
1,9.571427,3.858004,1.450848,4.960093
2,3.805154,1.522073,5.050001,3.459076
3,8.305749,2.944056,3.630231,4.960012
4,6.882069,6.28254,1.411243,4.858617


## Doing it in Python (Pandas/Numpy)

Creating a Pandas dataframe with **num_records** records

In [69]:
pdf=pd.DataFrame([tuple(random.uniform(1,10,3)) for i in range(num_records)], columns=["x", "y","z"])
print(f"{len(pdf)/one_k} M")
pdf[:3]

StatementMeta(large, 10, 69, Finished, Available)

10.0 M


Unnamed: 0,x,y,z
0,4.745272,5.360582,1.78937
1,7.505946,6.770241,4.589608
2,2.971494,2.126862,2.466982


Running it. With 0.05 seconds per record and 10K records it will take approx 500 seconds

In [70]:
start = time.time()
arr = pdf.to_numpy()
mean_arr = simple_numpy_calc(arr)
pdf["mean"]=mean_arr
print(f"Run in {time.time()-start} seconds")
pdf[:3]

StatementMeta(large, 10, 70, Finished, Available)

Run in 501.4050033092499 seconds


Unnamed: 0,x,y,z,mean
0,4.745272,5.360582,1.78937,3.965075
1,7.505946,6.770241,4.589608,6.288599
2,2.971494,2.126862,2.466982,2.521779


## Doing it at scale in Spark

Creating a Spark dataframe

In [71]:
df=spark.createDataFrame(pdf)
assert df.count()==num_records
display(df.limit(3))


StatementMeta(large, 10, 71, Finished, Available)

SynapseWidget(Synapse.DataFrame, 98a0997b-5df1-4a5e-8bba-fe065a592531)

In [72]:
df.rdd.getNumPartitions()

StatementMeta(large, 10, 72, Finished, Available)

8

Creating a function to apply with **mapInPandas**

https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.mapInPandas.html#pyspark-sql-dataframe-mapinpandas

In [73]:
def some_func(iterator):
    for p_df in iterator:
        arr = p_df.to_numpy()
        mean_arr = simple_numpy_calc(arr)
        p_df["mean"]=mean_arr
        yield p_df


StatementMeta(large, 10, 73, Finished, Available)

In [74]:
schema = StructType([StructField("mean", FloatType())])

StatementMeta(large, 10, 74, Finished, Available)

In [75]:
df2=df.mapInPandas(some_func, schema)
start = time.time()
display(df2)
print(time.time()-start)

StatementMeta(large, 10, 75, Finished, Available)

SynapseWidget(Synapse.DataFrame, 4e82ecb9-ec0a-4f72-819e-f0bcd5f0566a)

63.123297929763794


In [76]:
63*8

StatementMeta(large, 10, 76, Finished, Available)

504