<a href="https://colab.research.google.com/github/ankesh86/PySparkNotebooks/blob/main/ModelEvaluation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark==3.4.0

Collecting pyspark==3.4.0
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317122 sha256=f75d2bb4fb844140bb65466ca667669307c506809efeb1f897e8440849661f29
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


# Load file

In [3]:
filename = "sample_data/bank-full.csv"
target_variable_name = "y"

# Start Session

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.csv(filename, header=True, inferSchema=True, sep=';')
df.show()

+---+------------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+
|age|         job| marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+------------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+
| 58|  management| married| tertiary|     no|   2143|    yes|  no|unknown|  5|  may|     261|       1|   -1|       0| unknown| no|
| 44|  technician|  single|secondary|     no|     29|    yes|  no|unknown|  5|  may|     151|       1|   -1|       0| unknown| no|
| 33|entrepreneur| married|secondary|     no|      2|    yes| yes|unknown|  5|  may|      76|       1|   -1|       0| unknown| no|
| 47| blue-collar| married|  unknown|     no|   1506|    yes|  no|unknown|  5|  may|      92|       1|   -1|       0| unknown| no|
| 33|     unknown|  single|  unknown|     no|      1|     no|  no|unknown|  5|  may

In [5]:
from pyspark.sql import functions as F

df = df.withColumn('label', F.when(F.col("y") == 'yes', 1).otherwise(0))
df = df.drop('y')

# Simple Random sampling

In [6]:
train, test = df.randomSplit([0.7, 0.3], seed = 12345)

In [7]:
for k, v in df.dtypes:
    if v not in ['string']:
        print(k)

age
balance
day
duration
campaign
pdays
previous
label


In [8]:
df = df.select(['age', 'balance', 'day', 'duration', 'campaign', 'pdays', 'previous', 'label'])

In [9]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline

#assemble individual columns to one column - 'features'
def assemble_vectors(df, features_list, target_variable_name):
    stages = []
    #assemble vectors
    assembler = VectorAssembler(inputCols=features_list, outputCol='features')
    stages = [assembler]
    #select all the columns + target + newly created 'features' column
    selectedCols = [target_variable_name, 'features']
    #use pipeline to process sequentially
    pipeline = Pipeline(stages=stages)
    #assembler model
    assembleModel = pipeline.fit(df)
    #apply assembler model on data
    df = assembleModel.transform(df).select(selectedCols)

    return df

In [10]:
#exclude target variable and select all other feature vectors
features_list = df.columns
#features_list = char_vars #this option is used only for ChiSqselector
features_list.remove('label')
# apply the function on our dataframe
assembled_df = assemble_vectors(train, features_list, 'label')

## using ml tuning library

In [11]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

#model initialization
lr = LogisticRegression(maxIter=10, featuresCol='features', labelCol='label')

#model parameters to try
paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.1,0.01]).addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]).build()

# 70% of data will be used for taining, 30% for validation
train_valid_clf = TrainValidationSplit(estimator=lr, estimatorParamMaps=paramGrid, evaluator=BinaryClassificationEvaluator(), trainRatio=0.7)

#assembled_df is the output of the vector assembler
model = train_valid_clf.fit(assembled_df)

# Straightified Sampling

In [12]:
# split data for 0s and 1s
zero_df = df.filter(df["label"]==0)
one_df = df.filter(df["label"]==1)

# split data into train and test
train_zero, test_zero = zero_df.randomSplit([0.7,0.3], seed=12345)
train_one, test_one = one_df.randomSplit([0.7,0.3], seed=12345)

# union datasets
train = train_zero.union(train_one)
test = test_zero.union(test_one)

# k-Fold cross validation
using ML tuning library

In [13]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

#model initialization
lr = LogisticRegression(maxIter=10, featuresCol='features', labelCol='label')

#model parameters to try
paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.1, 0.01]).addGrid(lr.elasticNetParam, [0.0,0.5,1.0]).build()

#number of folds = 3
crossval_clf = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=BinaryClassificationEvaluator(), numFolds=3)

#assembled_df is the output of the vector assembler
model = crossval_clf.fit(assembled_df)

# **Holdout**

In [14]:
train, test, holdout = df.randomSplit([0.7, 0.2, 0.1], seed=12345)
train.count(), test.count(), holdout.count()

(31527, 9051, 4633)

# **Leave-one Group-out**

In [15]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import countDistinct
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql import functions as F

import numpy as np

filename = 'sample_data/bank-full.csv'
spark = SparkSession.builder.getOrCreate()
df = spark.read.csv(filename, header=True, inferSchema=True, sep=';')
df = df.withColumn('label', F.when(F.col("y") == 'yes', 1).otherwise(0))
df.drop('y')

df = df.select(['education','age','balance','day','duration','campaign','pdays','previous','label'])
features_list = ['age','balance','day','duration','campaign','pdays','previous']

#assemble individual column to one columns
def assemble_vectors(df,features_list, target_variable_name, group_variable_name):
  stages = []
  #assemble vectors
  assembler = VectorAssembler(inputCols=features_list, outputCol='features')
  stages=[assembler]

  selectedCols = [group_variable_name, target_variable_name, 'features']

  #use pipeline to process sequentially
  pipeline = Pipeline(stages=stages)

  #assemble Model
  assembleModel = pipeline.fit(df)

  # apply assemble model on data
  df = assembleModel.transform(df).select(selectedCols)

  return df

# apply the functon on our dataframe
joined_df = assemble_vectors(df, features_list, 'label', 'education')

#find the groups to apply cross validation
groups = list(joined_df.select('education').toPandas()['education'].unique())

# leave-one-group-out validation
def leave_one_group_out_validator(df, var_name, groups):
  train_metric_score =  []
  test_metric_score =  []

  for i in groups:
    train = df.filter(df[var_name] != i)
    test = df.filter(df[var_name]==i)

    #model initialization
    lr = LogisticRegression(maxIter = 10, featuresCol='features', labelCol='label')

    evaluator = BinaryClassificationEvaluator(labelCol='label', rawPredictionCol='rawPrediction', metricName='areaUnderROC')

    #fit model
    lrModel = lr.fit(train)

    #make predictions
    predict_train = lrModel.transform(train)
    predict_test = lrModel.transform(test)
    train_metric_score.append(evaluator.evaluate(predict_train))
    test_metric_score.append(evaluator.evaluate(predict_test))
    print(str(i) + "Group Evaluation")
    print(" Train AUC - ", train_metric_score[-1])
    print(" Test AUC - ", test_metric_score[-1])

  print('Final evaluation for model')
  print('Train ROC', np.mean(train_metric_score))
  print('Test ROC', np.mean(test_metric_score))

In [16]:
leave_one_group_out_validator(joined_df, 'education', groups)

tertiaryGroup Evaluation
 Train AUC -  0.8405036511556044
 Test AUC -  0.8117014214186873
secondaryGroup Evaluation
 Train AUC -  0.8249617583391624
 Test AUC -  0.8352564395071835
unknownGroup Evaluation
 Train AUC -  0.8314701472433579
 Test AUC -  0.8111358354349009
primaryGroup Evaluation
 Train AUC -  0.8256814880628227
 Test AUC -  0.8695884216387456
Final evaluation for model
Train ROC 0.8306542612002368
Test ROC 0.8319205294998793
