In [None]:
 !pip install pyspark
 !pip install findspark



In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [None]:
spark

In [None]:
import matplotlib.pyplot as plt 
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

from pyspark.sql.types import *
import pyspark.sql.functions as f
from pyspark.sql.functions import col,sum
import seaborn as sns
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf

from pyspark.sql.types import IntegerType
import warnings
warnings.filterwarnings("ignore")

sc = SparkContext.getOrCreate()

In [None]:
data = spark.read.csv('clean_framingham.csv', header=True)

In [None]:
data.show(5)

+---+----+---+---------+-------------+----------+------+---------------+------------+--------+-------+-----+-----+-----+---------+-------+----------+
|_c0|male|age|education|currentSmoker|cigsPerDay|BPMeds|prevalentStroke|prevalentHyp|diabetes|totChol|sysBP|diaBP|  BMI|heartRate|glucose|TenYearCHD|
+---+----+---+---------+-------------+----------+------+---------------+------------+--------+-------+-----+-----+-----+---------+-------+----------+
|  1|   1| 39|        4|            0|         0|     0|              0|           0|       0|    195|  106|   70|26.97|       80|     77|         0|
|  2|   0| 46|        2|            0|         0|     0|              0|           0|       0|    250|  121|   81|28.73|       95|     76|         0|
|  3|   1| 48|        1|            1|        20|     0|              0|           0|       0|    245|127.5|   80|25.34|       75|     70|         0|
|  4|   0| 61|        3|            1|        30|     0|              0|           1|       0|    22

In [None]:
data.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- male: string (nullable = true)
 |-- age: string (nullable = true)
 |-- education: string (nullable = true)
 |-- currentSmoker: string (nullable = true)
 |-- cigsPerDay: string (nullable = true)
 |-- BPMeds: string (nullable = true)
 |-- prevalentStroke: string (nullable = true)
 |-- prevalentHyp: string (nullable = true)
 |-- diabetes: string (nullable = true)
 |-- totChol: string (nullable = true)
 |-- sysBP: string (nullable = true)
 |-- diaBP: string (nullable = true)
 |-- BMI: string (nullable = true)
 |-- heartRate: string (nullable = true)
 |-- glucose: string (nullable = true)
 |-- TenYearCHD: string (nullable = true)



In [None]:
#dropping irrelevant columns
data=data.drop('_c0')
names=data.columns



In [None]:
# numerical and categorical

num_cols = ["age","BMI", "cigsPerDay","diaBP","heartRate", "glucose",
              "totChol","sysBP"]
cat_cols=["male","education","BPMeds","currentSmoker","prevalentStroke","prevalentHyp","diabetes"]

for column in num_cols:
    data = data.withColumn(column,f.col(column).cast(IntegerType())) 

In [None]:
pipes = []

for categoricalCol in cat_cols:
    sIndex = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    enc = OneHotEncoder(inputCols=[sIndex.getOutputCol()], outputCols=[categoricalCol + "nums"])
    pipes += [sIndex, enc]

target_ids = StringIndexer(inputCol = 'TenYearCHD', outputCol = 'target')
pipes += [target_ids]

asINP = [a + "nums" for a in cat_cols] + num_cols
combine = VectorAssembler(inputCols=asINP, outputCol="features")
pipes += [combine]

In [None]:
stag = Pipeline(stages = pipes)
model = stag.fit(data)
data = model.transform(data)

comb = ['target', 'features'] 
data = data.select(comb)

In [None]:
data=data.toPandas()

data['target'] = (data['target']).astype(int)

data=spark.createDataFrame(data) 

In [None]:
import time
from sys import argv
from pyspark.sql import SparkSession
from pyspark.ml.feature import PCA
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import functions
import numpy as np
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, DoubleType
import matplotlib.pyplot as plt
from numpy import sum, sqrt
from pyspark import SparkContext

In [None]:
train, test = data.randomSplit([0.75, 0.25], seed = 2022)

In [None]:
from pyspark.ml.feature import PCA
import numpy as np



pca = PCA(k=3, inputCol='features', outputCol='pca') 
model = pca.fit(test)
train_data = model.transform(train).select('target', 'pca')
test_data = model.transform(test).select('target', 'pca')





In [None]:
train_matrix = []
train_label = []
train_rows = train_data.rdd.collect()
for i in train_rows:
    train_matrix.append(i.pca)
    train_label.append(i.target)

train_label = sc.broadcast(np.array(train_label)) 
train_matrix = sc.broadcast(np.array(train_matrix))

In [None]:
%%time
from numpy import sum, sqrt

K=11

def knnPred(line):
    pca = line
    cal = sqrt(sum((train_matrix.value - np.tile(pca,(len(train_matrix.value),1)))**2, axis=1)) # Calculate the Euclidean Distance
    cal = np.argsort(cal)        # return the index of The List of Distance in ascending order. Save the sorting time and cut down half of the runing time
    pred_label_list = train_label.value[cal][:K] # return the K nearest neibourages' label
    pred_label = np.bincount(pred_label_list).argmax() # return the label with max appearance
    return  int(pred_label)

knn_udf = udf(knnPred, IntegerType()) #user define function


In [None]:
def confusion_matrix(test_true, test_pred,class_list):
    num_class_list = len(class_list)
    confusion_matrix = np.zeros((num_class_list, num_class_list))
    match_count = 0
    n_test = len(test_true)
    for k in range(n_test):
        cm_j = class_list.index(test_pred[k])
        cm_i = class_list.index(test_true[k])
        confusion_matrix[cm_i, cm_j] += 1
        if test_pred[k] == test_true[k]:
            match = True
            match_count += 1    
        else:
            match = False
    return confusion_matrix

In [None]:
def precision(label, confusion_matrix):
    col = confusion_matrix[:, label]
    return confusion_matrix[label, label] / col.sum()
def recall(label, confusion_matrix):
    row = confusion_matrix[label, :]
    return confusion_matrix[label, label] / row.sum()
def f1_score(label, confusion_matrix):
    row = confusion_matrix[label, :]
    col = confusion_matrix[:, label]
    return confusion_matrix[label, label]*2 / (row.sum()+col.sum())

def accuracy(confusion_matrix):
    diagonal_sum = confusion_matrix.trace()
    sum_of_all_elements = confusion_matrix.sum()
    return diagonal_sum / sum_of_all_elements 

In [None]:
pred = test_data.withColumn("pred_label",knn_udf(test_data.pca)) 

label_list = pred.select('pred_label','target').collect()
pred_label=[]
real_label=[]
for i in label_list:
    pred_label.append(i.pred_label)
    real_label.append(i.target)

In [None]:
pred

DataFrame[target: bigint, pca: vector, pred_label: int]

In [None]:
full_class_list = [0,1]
cm = np.array(confusion_matrix(real_label,pred_label,full_class_list)).astype(int)
print('\n')
print("Confusion Matrix")
print(cm)
print('\n')
print("label precision recall f1-score")
pc=[]
rc=[]
f1=[]
sp=[]
for label in full_class_list:
    pc.append(precision(label, cm.astype(float)))
    rc.append(recall(label, cm.astype(float)))
    f1.append(f1_score(label,cm.astype(float)))

print("average  {:6.3f} {:6.3f} {:6.3f}".format(sum(pc)/len(full_class_list),sum(rc)/len(full_class_list),sum(f1)/len(full_class_list)))
print('\n')
acc = accuracy(cm.astype(float))
print('Accuracy: {}%'.format(round(acc*100,3)))
print('\n')



Confusion Matrix
[[795   6]
 [133   0]]


label precision recall f1-score
average   0.428  0.496  0.460


Accuracy: 85.118%


