#Importing Libraries

In [1]:
!pip install pyspark
from pyspark.sql import SparkSession
import numpy as np
import pyspark.sql.functions as f
from pyspark.sql.window import Window
import sys
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.ml.feature import *
from pyspark.ml import Pipeline
import random

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 44 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 50.4 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=bda81e355db4c47db05788d1d2781a0f5af278ae35525b712a7fdb83a3666360
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [2]:
random.seed(10)

#Initialising Spark session

In [3]:
spark = (SparkSession
           .builder
           .appName("MySpark")
           .getOrCreate())

In [4]:
sc = spark.sparkContext

#Reading and Displaying the Data

In [5]:
df = spark.read.csv("diabetic_data_valid.csv", inferSchema=True, header=True,nullValue="?")

AnalysisException: ignored

In [None]:
df.show()

#Counting distinct values in each column

In [None]:
df.select([f.countDistinct(f.col(c)).alias(c) for c in df.columns]).show()

#Changing the datatypes of required variables

In [None]:
df = df.withColumn("admission_type_id",df.admission_type_id.cast("String"))
df = df.withColumn("discharge_disposition_id",df.discharge_disposition_id.cast("String"))
df = df.withColumn("admission_source_id",df.admission_source_id.cast("String"))

#Separating numeric, categorical and target variables

In [None]:
num_variables = df.columns[:6]
cat_variables = df.columns[6:12] + df.columns[15:-2]
target = df.columns[-1]
#cat_variables

#Outlier Treatment on Numeric Variables

In [None]:
for variable in num_variables:
    q1, q2, q3 = df.select(f.percentile_approx(variable, [0.25, 0.5, 0.75], 1000000).alias("quantiles")).collect()[0]["quantiles"]
    IQR = q3 - q1
    lower_value = q1 - 1.5 * IQR
    upper_value = q3 + 1.5 * IQR
    df = df.withColumn(variable,
                       f.when(f.col(variable) < lower_value,
                              lower_value
                              ).when(f.col(variable) > upper_value,
                                     upper_value
                                     ).otherwise(f.col(variable))
                        )

In [None]:
df.groupby(num_variables[0]).agg(f.percentile_approx(variable, [0.25, 0.5, 0.75], 1000000)\
                                 ,f.percentile_approx(variable, [0.25, 0.5, 0.75], 1000000)\
                                 ,f.percentile_approx(variable, [0.25, 0.5, 0.75], 1000000))

#Defining stages for model data preparation

In [None]:
stages = []

for variable in cat_variables:

    # Index Categorical variable
    indexer = StringIndexer(inputCol=variable, outputCol=variable + "_index")
    
    # One Hot Encode Categorical variable
    encoder = OneHotEncoder(inputCols=[indexer.getOutputCol()],
                                     outputCols=[variable + "_OHE_vec"])
    # Append Pipeline Stages
    stages += [indexer, encoder]



# Scale Feature: Select the Features to Scale using helper 'select_features_to_scale' function above and Standardize 
assembler_num = VectorAssembler(inputCols=num_variables, outputCol="num_variables")
scaler_num = StandardScaler(inputCol="num_variables", outputCol="scaled_num_variables")
stages += [assembler_num,scaler_num]


# Assemble or Concat the Categorical Features Features
assembler_cat = [variable + "_OHE_vec" for variable in cat_variables]

assembler = VectorAssembler(inputCols=assembler_cat, outputCol="assembler_cat") 

stages += [assembler]

# Assemble Final Training Data of Scaled, Numeric, and Categorical Engineered Features
assembler_final = VectorAssembler(inputCols=["assembler_cat","scaled_num_variables"], outputCol="variables")

stages += [assembler_final]

# Index Target Variable
target_str_index =  StringIndexer(inputCol=target, outputCol="target")

stages += [target_str_index]

In [None]:
stages

[StringIndexer_ab5e61a0e715,
 OneHotEncoder_0ca7aacb145e,
 StringIndexer_7d000eff4cda,
 OneHotEncoder_ecf15cb82ea4,
 StringIndexer_d407683c6781,
 OneHotEncoder_91be010ec58a,
 StringIndexer_426c93eae931,
 OneHotEncoder_6d2083c61c76,
 StringIndexer_03765c53badc,
 OneHotEncoder_f66501af670d,
 StringIndexer_a1f9877205aa,
 OneHotEncoder_814e51d4efc6,
 StringIndexer_0444206e96e7,
 OneHotEncoder_f81f3af21934,
 StringIndexer_213e5dcf2641,
 OneHotEncoder_858960a08f1b,
 StringIndexer_7e68f829c174,
 OneHotEncoder_cc895f59db81,
 StringIndexer_020365cb0ba4,
 OneHotEncoder_e7f89f9d2633,
 StringIndexer_05575c8ef7ed,
 OneHotEncoder_ae05f74a34c7,
 StringIndexer_47690a933557,
 OneHotEncoder_e3c17b709c10,
 StringIndexer_b9bd614b1e79,
 OneHotEncoder_855065bab6da,
 StringIndexer_cd01b87b8e91,
 OneHotEncoder_bf2d6cbd04ce,
 StringIndexer_b6a1d29c56f8,
 OneHotEncoder_4f0478836b6a,
 StringIndexer_4b8bf2b5f2c4,
 OneHotEncoder_c2f8efc57437,
 StringIndexer_dc0c710d7967,
 OneHotEncoder_de8e3d2176fd,
 StringIndexer

In [None]:
# Set Pipeline
pipeline = Pipeline(stages=stages)

# Fit Pipeline to Data
pipeline_model = pipeline.fit(df)

# Transform Data using Fitted Pipeline
transf_df = pipeline_model.transform(df)

In [None]:
transf_df.show()

+----------------+------------------+--------------+---------------+----------------+----------------+---------------+------+--------+-----------------+------------------------+-------------------+------+------+------+-------------+---------+---------+-----------+-----------+---------+---------+------------+-------------+-------+------+-----------+----------------------+----------+----------+-------------+------------+--------------+---------+-------------+-----------------------+-------------------------+------------------------------+--------------------------------+-------------------------+---------------------------+-------------------+---------------------+---------------+-----------------+---------------+-----------------+-----------------+-------------------+-----------------+-------------------+---------------+-----------------+---------------+-----------------+------------------+--------------------+-------------------+---------------------+-------------+---------------+-----

#Keeping the columns only with the prepared data

In [None]:
model_df = transf_df[["variables","target"]]

#Spliting data into train and test

In [None]:
# Split Data into Train / Test Sets
train_data, test_data = model_df.randomSplit([.7, .3],seed=1234)

In [None]:
train_rdd = train_data.rdd

In [None]:
test_rdd = test_data.rdd
n = test_rdd.count()

#Setting hyper-parameters

In [None]:
m = train_rdd.count()
beta = sc.broadcast(np.array([random.random() for _ in range(103)]))
alpha = 0.2
lambda_ = 0.1

In [None]:
beta.value

array([0.16853039, 0.09545323, 0.79057274, 0.93573575, 0.21987634,
       0.74491269, 0.95114372, 0.19172723, 0.57189445, 0.43931268,
       0.97486093, 0.86959191, 0.01604811, 0.20132727, 0.84798407,
       0.57424107, 0.52281932, 0.80981358, 0.76404171, 0.95474635,
       0.32669824, 0.04765403, 0.69228279, 0.05013805, 0.56517922,
       0.85973635, 0.39702824, 0.60115884, 0.17143016, 0.15773571,
       0.60605507, 0.86448317, 0.99983797, 0.03113873, 0.92098233,
       0.51111825, 0.34935567, 0.39741467, 0.57788946, 0.34769689,
       0.14550897, 0.8655634 , 0.70648944, 0.60985494, 0.72255903,
       0.9860259 , 0.1750929 , 0.82416877, 0.82230431, 0.34334782,
       0.55877659, 0.4583951 , 0.19433626, 0.43289757, 0.15255887,
       0.93220312, 0.18746338, 0.63388145, 0.57001251, 0.97465765,
       0.7935981 , 0.36019087, 0.37611624, 0.04989812, 0.05183773,
       0.31097344, 0.94983074, 0.04954204, 0.17674324, 0.63618611,
       0.36046181, 0.18557703, 0.94738485, 0.04276935, 0.59805

#Defining functions for Training the model

In [None]:
def l_beta(x):
    x_ = np.insert(x["variables"].toArray(),0,1.0)
    beta_x = np.dot(x_,beta.value)
    sig = 1/(1 + np.exp(-beta_x))
    log1 = np.log(sig)
    log2 = np.log(1 - sig)
    y = x["target"]
    loss = y*log1 + (1-y)*log2
    return -loss

def grad(x):
    x_ = np.insert(x["variables"].toArray(),0,1.0)
    beta_x = np.dot(x_,beta.value)
    sig = 1/(1 + np.exp(-beta_x))
    y = x["target"]
    y_h = sig - y
    return x_ * y_h

#Model Training 

In [None]:
#beta_x = X.map(betaT_x)
#h_x = beta_x.map(sigmoid)
#log_h_x = h_x.map(log)
for i in range(50):
    L = train_rdd.map(l_beta)
    train_loss = (L.reduce(lambda x,y: x+y) + lambda_ * np.dot(beta.value[1:],beta.value[1:])/2)/m
    L_test = test_rdd.map(l_beta)
    test_loss = (L_test.reduce(lambda x,y: x+y))/n
    print("Iteration",i+1," Train_Loss ->",train_loss," Test_loss ->",test_loss)
    del_beta =  train_rdd.map(grad)
    d_beta = del_beta.reduce(lambda x,y: x+y)
    beta = sc.broadcast(beta.value - alpha *(d_beta + lambda_ * np.insert(beta.value[1:],0,0)) / m)
    #print(beta.value[0:5])

Iteration 1  Train_Loss -> 8.173973064417215  Test_loss -> 8.172129753602322
Iteration 2  Train_Loss -> 5.953400070673113  Test_loss -> 5.951811757101489
Iteration 3  Train_Loss -> 3.7349335190093313  Test_loss -> 3.733624956094102
Iteration 4  Train_Loss -> 1.6462044747757065  Test_loss -> 1.6461525603729203
Iteration 5  Train_Loss -> 1.0291180119896197  Test_loss -> 1.034156869889754


#Prediction and Test Results 

In [None]:
def metric_cal(x):
    x_ = np.insert(x["variables"].toArray(),0,1.0)
    beta_x = np.dot(x_,beta.value)
    sig = 1/(1 + np.exp(-beta_x))
    y = x["target"]
    y_hat = 1 if sig >= cuttoff.value else 0
    acc = 1 if y == y_hat else 0
    pre_num = 1 if y == y_hat and y_hat == 1 else 0
    pre_denom = 1 if y_hat == 1 else 0
    rec_num = 1 if y == y_hat and y == 1 else 0
    rec_denom = 1 if y == 1 else 0
    return np.array([acc, pre_num, pre_denom, rec_num, rec_denom])

In [None]:
cuttoff = sc.broadcast(0.5)
metrics = test_rdd.map(metric_cal)
cal_metrcis = metrics.reduce(lambda x,y: x+y)
Acc = float(cal_metrcis[0]/n )
Precision = float(cal_metrcis[1]/cal_metrcis[2] )
Recall = float(cal_metrcis[3]/cal_metrcis[4])
F1 = (2 * Precision * Recall)/(Precision + Recall)
print("Cutoff -> {:.2f} Acc -> {:.2f} Precision -> {:.2f} Recall -> {:.2f} F1 -> {:.2f}".format(cuttoff.value,Acc, Precision, Recall,F1))