1.   Problem: 
- Spark has a lazy evaluation mechanism, so everytime you invoke in the code has to calculate DAG
-  The time-consuming part is to infer the prediction on new data
- Therefore, training the model should take place in Python and inference should take place on Spark
<br />
2.   Solution:
- Save each model in a dictionary
- Load it using pandas_udf (see [run your native Python code with PySpark](https://www.databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html))
- In pandas_udf, you can write Python code as you would write it in python code.




In [24]:
!pip install PySpark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
time: 2.83 s (started: 2023-02-16 20:34:56 +00:00)


In [25]:
!pip install ipython-autotime
%load_ext autotime

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
The autotime extension is already loaded. To reload it, use:
  %reload_ext autotime
time: 3.14 s (started: 2023-02-16 20:34:59 +00:00)


In [26]:
import pandas as pd
import numpy as np
from functools import wraps
import time

import statsmodels.api as sm
from sklearn.metrics import roc_auc_score

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import pandas_udf, spark_partition_id
import pyspark.sql.functions as f

spark = SparkSession.builder.appName("test_regression").getOrCreate()

time: 7.3 ms (started: 2023-02-16 20:35:02 +00:00)


In [27]:
# Create dataset
observations = 500_000

target = np.random.binomial(n=1, p=0.2, size=(observations, 1))
y = target + np.random.normal(0, 0.1, size=(observations, 1))
z = target + np.random.normal(20, 10.0, size=(observations, 1))
w = np.random.normal(3, 1.0, size=(observations, 1))
df = pd.DataFrame(np.hstack([target, y, z, w]), 
                  columns=['target', 'y', 'z', 'w'])

# Create train test datasets
train = df.sample(frac=0.8, random_state=1)
test = df.drop(train.index)

# Convert to pyspark dataset
spark_train = spark.createDataFrame(train)
spark_test = spark.createDataFrame(test)

time: 19.5 s (started: 2023-02-16 20:35:02 +00:00)


In [28]:
# a dict to store 3 models
models = {}

time: 519 µs (started: 2023-02-16 20:35:21 +00:00)


In [29]:
# Create functions for timing, pyspark regression and statsmodels regression
def timeit(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        elapsed_time = end_time - start_time
        print(f"{func.__name__} took {elapsed_time:.2f} seconds to run.")
        return result
    return wrapper

@timeit
def logit_pyspark(var, train_data, test_data):
  # create a VectorAssembler to combine the independent variables
  assembler = VectorAssembler(inputCols=[var], outputCol='features')
  train_spark = assembler.transform(train_data)
  test_spark = assembler.transform(test_data)


  # Create model
  lr = LogisticRegression(featuresCol='features', labelCol='target')
  model = lr.fit(train_spark)


  # Test model
  predictions = model.transform(test_spark)
  evaluator = BinaryClassificationEvaluator(labelCol='target')
  auc = evaluator.evaluate(predictions)
  print(f"Model with {var} as the independent variable has AUC of {auc:.2f}")

@timeit
def logit_statmodels(train_df, test_df, var):
  # Create model
  logit_model = sm.Logit(train_df[['target']], train_df[[var]])
  result = logit_model.fit(disp=0)  

  # Create AUC
  test['y_pred'] = result.predict(test_df[[var]])
  auc = roc_auc_score(test_df['target'], test_df['y_pred'])
  print(f"Model with {var} as the independent variable has AUC of {auc:.2f}")
  return result

time: 5.37 ms (started: 2023-02-16 20:35:21 +00:00)


# Stats model

In [30]:
# Test Statsmodel
vars = ['y', 'z', 'w']

# Run regression
for var in vars:
  models[var] = logit_statmodels(train, test, var)

Model with y as the independent variable has AUC of 1.00
logit_statmodels took 0.37 seconds to run.
Model with z as the independent variable has AUC of 0.47
logit_statmodels took 0.19 seconds to run.
Model with w as the independent variable has AUC of 0.51
logit_statmodels took 0.31 seconds to run.
time: 870 ms (started: 2023-02-16 20:35:21 +00:00)


# Pyspark test 1

In [31]:
# Test PySpark
vars = ['w', 'z', 'y']

# Run regression
for var in vars:
  logit_pyspark(var, spark_train, spark_test)

Model with w as the independent variable has AUC of 0.49
logit_pyspark took 5.27 seconds to run.
Model with z as the independent variable has AUC of 0.53
logit_pyspark took 8.10 seconds to run.
Model with y as the independent variable has AUC of 1.00
logit_pyspark took 7.52 seconds to run.
time: 20.9 s (started: 2023-02-16 20:35:22 +00:00)


# Pyspark test 2

In [32]:
# Test PySpark
vars = ['y', 'z', 'w']

spark_train_part_1 = spark_train.coalesce(1)
spark_test_part_1 = spark_test.coalesce(1)

# Run regression
for var in vars:
  logit_pyspark(var, spark_train_part_1, spark_test_part_1)

Model with y as the independent variable has AUC of 1.00
logit_pyspark took 7.32 seconds to run.
Model with z as the independent variable has AUC of 0.53
logit_pyspark took 3.80 seconds to run.
Model with w as the independent variable has AUC of 0.49
logit_pyspark took 4.76 seconds to run.
time: 15.9 s (started: 2023-02-16 20:35:43 +00:00)


# Final version
1.   pred(df): a python function, it will be used as pandas_udf later
2.   spark_test.withColumn("id", f.lit(1)).groupBy('id'): since all rows has to feed into the model, this column is just a dummy column


In [33]:

def pred(df):
    df['y_pred'] = models['y'].predict(df['y'])
    df['z_pred'] = models['z'].predict(df['z'])
    df['w_pred'] = models['w'].predict(df['w'])
    df['auc_y'] = roc_auc_score(df['target'], df['y_pred'])
    df['auc_z'] = roc_auc_score(df['target'], df['z_pred'])
    df['auc_w'] = roc_auc_score(df['target'], df['w_pred'])
    
    return df[['auc_y', 'auc_z', 'auc_w']].iloc[[1]]

time: 1.02 ms (started: 2023-02-16 20:35:59 +00:00)


In [34]:
spark_test.withColumn("id", f.lit(1)).groupBy('id').applyInPandas(pred, schema='auc_y double, auc_z double, auc_w double').show()

+-----+-------------------+------------------+
|auc_y|              auc_z|             auc_w|
+-----+-------------------+------------------+
|  1.0|0.47158886270789624|0.5066707479655908|
+-----+-------------------+------------------+

time: 1.47 s (started: 2023-02-16 20:35:59 +00:00)
