In [None]:
import pandas as pd
import tensorflow as tf
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
import numpy as np
import pyspark.sql.functions as f
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import pandas_udf
import time
import timeit
from pyspark.sql.types import DoubleType
from math import radians, sin, cos, atan2, sqrt


#Benchmark Tests of Feature Engineering

Import the housing dataset

In [3]:
data = pd.read_csv('https://raw.githubusercontent.com/byui-cse/cse450-course/master/data/housing.csv')
# df.drop('date', axis=1, inplace=True) # remove this column, it messes with the model we are using
data = data.query('bedrooms <= 12')

In [5]:
# Import SparkSession
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("MySparkApp") \
    .getOrCreate()

# Now you can use spark
df = spark.createDataFrame(data)

display(df.limit(5))

DataFrame[id: bigint, date: string, bedrooms: bigint, bathrooms: double, sqft_living: bigint, sqft_lot: bigint, floors: double, waterfront: bigint, view: bigint, condition: bigint, grade: bigint, sqft_above: bigint, sqft_basement: bigint, yr_built: bigint, yr_renovated: bigint, zipcode: bigint, lat: double, long: double, sqft_living15: bigint, sqft_lot15: bigint, price: double]

## Years between house being built and house being re-modeled

Pyspark built in functions

In [7]:
start_time = time.time()

# If yr_renovated is 0 (not renovated), return 0; otherwise, compute the difference
df = df.withColumn(
    "yrs_between_built_and_rennovation_built_in",
    f.when(f.col("yr_renovated") == 0, 0).otherwise(f.col("yr_renovated") - f.col("yr_built"))
)

df.count()  # This will trigger the computation and run the transformation

# End timing the execution
execution_time = time.time() - start_time
print(f"Built in Execution Time: {execution_time:.2f} seconds")


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "C:\Users\Spencer\AppData\Roaming\Python\Python311\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Spencer\AppData\Roaming\Python\Python311\site-packages\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Program Files\Python311\Lib\socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

Standard UDF

In [0]:
# Standard UDF function
def yrs_between_built_and_rennovation(yr_built, yr_renovated):
    if yr_renovated == 0:  # Not renovated
        return 0
    return yr_renovated - yr_built

# Register and use the UDF
years_udf = udf(yrs_between_built_and_rennovation, IntegerType())

start_time = time.time()

df = df.withColumn("yrs_between_built_and_rennovation_udf", years_udf(f.col("yr_built"), f.col("yr_renovated")))

df.count()  # This will trigger the computation and run the transformation

# End timing the execution
execution_time = time.time() - start_time
print(f"Standard UDF Execution Time: {execution_time:.2f} seconds")


Standard UDF Execution Time: 1.17 seconds


Vectorized/Pandas UDF

In [0]:
# Define Pandas UDF
@pandas_udf(IntegerType())
def years_since_built_pandas(yr_built: pd.Series, yr_renovated: pd.Series) -> pd.Series:
    return np.where(yr_renovated == 0, 0, yr_renovated - yr_built)

start_time = time.time()

# Apply the Pandas UDF
df = df.withColumn("yrs_between_built_and_rennovation_pandas", years_since_built_pandas(f.col("yr_built"), f.col("yr_renovated")))

df.count()  # This will trigger the computation and run the transformation

# End timing the execution
execution_time = time.time() - start_time
print(f"Pandas UDF Execution Time: {execution_time:.2f} seconds")


Pandas UDF Execution Time: 1.16 seconds


## Finding the distance from the house and water

Pyspark built in functions

In [0]:
from pyspark.sql.functions import col, radians, sin, cos, sqrt, atan2, lit
from pyspark.sql.types import DoubleType
import time

# Earth radius in km
R = 6371.0  

# Fixed latitude and longitude for comparison
lat2, lon2 = 47.6204, -122.3491  

# Start timing execution
start_time = time.time()

# Apply the transformation using built-in functions
df = df.withColumn(
    "distance_built_in",
    (2 * lit(R) * atan2(
        sqrt(
            sin((radians(f.lit(lat2)) - radians(f.col("lat"))) / 2) ** 2 +
            cos(radians(f.col("lat"))) * cos(radians(lit(lat2))) * 
            sin((radians(f.lit(lon2)) - radians(f.col("long"))) / 2) ** 2
        ),
        sqrt(
            1 - (
                sin((radians(f.lit(lat2)) - radians(f.col("lat"))) / 2) ** 2 +
                cos(radians(f.col("lat"))) * cos(radians(f.lit(lat2))) * 
                sin((radians(f.lit(lon2)) - radians(f.col("long"))) / 2) ** 2
            )
        )
    )).cast(DoubleType())
)

df.count()  # Trigger computation

# End timing execution
execution_time = time.time() - start_time
print(f"Built-in Function Execution Time: {execution_time:.2f} seconds")


Built-in Function Execution Time: 1.32 seconds


Standard UDF

In [0]:
# Standard Haversine function
def haversine_distance(lat1, lon1, lat2=47.6204, lon2=-122.3491):
    R = 6371  # Earth radius in km
    dlat = radians(lat2 - lat1)
    dlon = radians(lon2 - lon1)
    a = sin(dlat / 2) ** 2 + cos(radians(lat1)) * cos(radians(lat2)) * sin(dlon / 2) ** 2
    c = 2 * atan2(sqrt(a), sqrt(1 - a))
    return R * c

# Register UDF
haversine_udf = udf(lambda lat1, lon1: haversine_distance(lat1, lon1), DoubleType())

# Benchmark Standard UDF
start_time = time.time()
df = df.withColumn("distance_udf", haversine_udf(df["lat"], df["long"]))
df.count()  # This will trigger the computation and run the transformation

# End timing the execution
execution_time = time.time() - start_time
print(f"Standard UDF Execution Time: {execution_time:.2f} seconds")


Standard UDF Execution Time: 1.19 seconds


Vectorized UDF

In [0]:
# Define Pandas UDF
@pandas_udf(DoubleType())
def haversine_pandas_udf(lat1: pd.Series, lon1: pd.Series) -> pd.Series:
    R = 6371  # Earth radius in km
    lat1, lon1 = np.radians(lat1), np.radians(lon1)
    lat2, lon2 = np.radians(47.6204), np.radians(-122.3491)  # Fixed lat/lon

    dlat = lat2 - lat1
    dlon = lon2 - lon1
    a = np.sin(dlat / 2) ** 2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon / 2) ** 2
    c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1 - a))
    
    return R * c

# Benchmark Pandas UDF
start_time = time.time()
df = df.withColumn("distance_pandas", haversine_pandas_udf(df["lat"], df["long"]))
df.count()  # This will trigger the computation and run the transformation

# End timing the execution
execution_time = time.time() - start_time
print(f"Pandas UDF Execution Time: {execution_time:.2f} seconds")


Pandas UDF Execution Time: 1.18 seconds


## Boolean column if the house maybe had lead paint

Pyspark built in functions

In [0]:
start_time = time.time()

# Apply the transformation using built-in functions
df = df.withColumn("lead_paint_built_in", f.when(col("yr_built") < 1978, 1).otherwise(0).cast(IntegerType()))

df.count()  # Trigger computation

# End timing execution
execution_time = time.time() - start_time
print(f"Built-in Function Execution Time: {execution_time:.2f} seconds")

Built-in Function Execution Time: 1.13 seconds


Standard UDf

In [0]:
# Standard UDF function
def lead_paint_udf(yr_built):
    if yr_built < 1978:
        return 1
    else:
        return 0

# Register UDF
lead_paint = udf(lead_paint_udf, IntegerType())

start_time = time.time()
# Apply UDF to create the new 'lead_paint' column
df = df.withColumn("lead_paint_udf", lead_paint(df["yr_built"]))

df.count()  # This will trigger the computation and run the transformation

# End timing the execution
execution_time = time.time() - start_time
print(f"Standard UDF Execution Time: {execution_time:.2f} seconds")


Standard UDF Execution Time: 1.20 seconds


Vectorized UDF

In [0]:
# Define Pandas UDF
@pandas_udf(IntegerType())
def lead_paint_pandas_udf(yr_built: pd.Series) -> pd.Series:
    return yr_built.apply(lambda x: 1 if x < 1978 else 0)

start_time = time.time()
# Apply Pandas UDF to create the new 'lead_paint' column
df = df.withColumn("lead_paint_pandas", lead_paint_pandas_udf(df["yr_built"]))

df.count()  # This will trigger the computation and run the transformation

# End timing the execution
execution_time = time.time() - start_time
print(f"Pandas UDF Execution Time: {execution_time:.2f} seconds")


Pandas UDF Execution Time: 1.21 seconds


Edit test

In [0]:
import tensorflow as tf
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import FloatType, ArrayType
import pandas as pd

# Load the TensorFlow model
model = tf.keras.models.load_model('my_model.h5')

# Define the prediction function
def predict(data: pd.Series) -> pd.Series:
    # Preprocess the data if needed
    data = tf.constant(data.to_numpy())
    predictions = model(data)
    return pd.Series(predictions.numpy().flatten())

# Register the UDF
predict_udf = pandas_udf(predict, returnType=FloatType())

# Apply the UDF to a Spark DataFrame
df = spark.createDataFrame([(1.0,), (2.0,), (3.0,)], ["value"])
df = df.withColumn("prediction", predict_udf(df["value"]))
df.show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mFileNotFoundError[0m                         Traceback (most recent call last)
File [0;32m<command-5456963511037883>, line 7[0m
[1;32m      4[0m [38;5;28;01mimport[39;00m [38;5;21;01mpandas[39;00m [38;5;28;01mas[39;00m [38;5;21;01mpd[39;00m
[1;32m      6[0m [38;5;66;03m# Load the TensorFlow model[39;00m
[0;32m----> 7[0m model [38;5;241m=[39m tf[38;5;241m.[39mkeras[38;5;241m.[39mmodels[38;5;241m.[39mload_model([38;5;124m'[39m[38;5;124mmy_model.h5[39m[38;5;124m'[39m)
[1;32m      9[0m [38;5;66;03m# Define the prediction function[39;00m
[1;32m     10[0m [38;5;28;01mdef[39;00m [38;5;21mpredict[39m(data: pd[38;5;241m.[39mSeries) [38;5;241m-[39m[38;5;241m>[39m pd[38;5;241m.[39mSeries:
[1;32m     11[0m     [38;5;66;03m# Preprocess the data if needed[39;00m

File [0;32m/databricks/python/lib/python3.11/site-packages/keras/src/saving/saving_api.py: