In [1]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import math

## PREPROCESSING

In [3]:
# Category features
continuous_features  = ['age', 'chol', 'oldpeak', 'thalch', 'trestbps']
categorical_features = ['ca', 'cp', 'restecg', 'slope', 'thal', 'sex', 'fbs', 'exang']
label = ['num']
updated_categorical_features = []
#read training data, normalize continuous features using (X - mean) / std
def readdata():
    df = pd.read_csv('heart_disease_uci.csv')
    
    # replacing nan value with std 
    for col in continuous_features:
        df[col] = df[col].fillna(df[col].mean())
        
    # normalization of continuous features
    for col in continuous_features:
        df[col] = (df[col] - df[col].mean()) / df[col].std()
        
    # replacing nan value with mode 
    for col in categorical_features:
        mode_val = df[col].mode()[0]
        df.loc[df[col].isna(), col] = mode_val
    df[categorical_features] = df[categorical_features].astype("category")
    
    # replacing nan value with mode 
    for col in label:
        mode_val = df[col].mode()[0]
        df.loc[df[col].isna(), col] = mode_val
        
    # replacing 0 with negative, other values with positive
    df['num'] = df['num'].apply(lambda x: "positive" if x > 0 else "negative")
    
    # One-hot encode categorical features
    df = pd.get_dummies(df, columns=categorical_features, prefix=categorical_features, drop_first=True)
    for col in df.columns:
        if col not in continuous_features and col != 'num' and col != 'id' and col != 'dataset':
            updated_categorical_features.append(col)
    return df



## KNN CLASSIFIER


In [5]:
# separate data into training, validation, and testing sets
def dataDivision():
    new_df = readdata()
    shuffled_df = new_df.sample(frac=1, random_state=42).reset_index(drop=True)
    label_col = 'num'
    n = len(shuffled_df)
    
    # separating data for test and training
    train_length = int(0.6 * n)
    validation_length = int(0.2 * n)
    train_df = shuffled_df.iloc[:train_length]
    validation_df = shuffled_df.iloc[train_length: train_length + validation_length]
    test_df = shuffled_df.iloc[train_length + validation_length:]
    
    train_df_mod = train_df.drop(columns=['id','dataset'])
    validation_df_mod = validation_df.drop(columns=['id','dataset'])
    test_df_mod = test_df.drop(columns=['id','dataset'])
    
    #print("Columns in DataFrame:", train_df_mod.columns, validation_df_mod.columns)
    return train_df_mod, validation_df_mod, test_df_mod
train_df, validation_df, test_df = dataDivision()

In [6]:
# Calculate the L2 distance between the current row and a neighbor
def distanceCalculator(x1, x2):
    distance =  np.sqrt(np.sum((x1 - x2) ** 2))
    return distance

# get the K closest neighbors
def getClosestNeighbours(train_df, validation_row, k):
    distances = []
    neighbours = []
    x_train_df = train_df.drop(columns=['num']).values.astype(float)
    y_train_df = train_df['num']
    x_validation_row = validation_row.drop(labels=['num']).values.astype(float)
    
    for i in range(len(x_train_df)):
        dst = distanceCalculator(x_validation_row, x_train_df[i])
        distances.append((dst, y_train_df[i]))
    distances.sort(key = lambda x:x[0])
    for (dst, label) in distances[:k]:
        neighbours.append(label)
    #print(neighbours)
    return neighbours
        
# knn classifier
def knnClassify(train_df, validation_df, k):
    neighbours = getClosestNeighbours(train_df, validation_df, k)
    
    class0 = neighbours.count("negative")
    class1 = neighbours.count("positive")
    
    if class0 > class1:
        prediction = "negative"
    else:
        prediction = "positive"
    
    return prediction
    

In [7]:
def printConfusionMatrix(tp, fp, tn, fn):
    print("\n%15sActual" % "")
    print("%6s %7s %7s" % ("", "1", "0"))
    print("P%6s +--------+--------+" % "")
    print("r%6s | %-6s | %-6s |" % ("1", 'TP='+str(tp), 'FP='+str(fp)))
    print("e%6s +--------+--------+" % "")
    print("d%6s | %-6s | %-6s |" % ("0", 'FN='+str(fn), 'TN='+str(tn)))
    print(".%6s +--------+--------+\n" % "")

def getConfusionMatrix(y_true, y_pred):
    y_true = np.array(y_true)
    y_pred = np.array(y_pred)
    tp = int(np.sum((y_true == "positive") & (y_pred == "positive")))
    tn = int(np.sum((y_true == "negative") & (y_pred == "negative")))
    fp = int(np.sum((y_true == "negative") & (y_pred == "positive")))
    fn = int(np.sum((y_true == "positive") & (y_pred == "negative")))
    return tp, fp, tn, fn

def getAccuracy(tp, fp, tn, fn):
    if (tp+tn+fp+fn) > 0:
        return (tp + tn) / (tp + tn + fp + fn)
    else:
        return 0.0

def getPrecision(tp, fp, tn, fn):
    if (tp + fp) > 0:
       return tp / (tp + fp)
    else:
        return 0.0

def getRecall(tp, fp, tn, fn):
    if (tp + fn) > 0:
        return tp / (tp + fn)
    else:
        return 0.0

def getFScore(tp, fp, tn, fn):
    p = getPrecision(tp, fp, tn, fn)
    r = getRecall(tp, fp, tn, fn)
    #print(p,r)
    if (p + r) > 0:
        return 2 * p * r / (p + r)
    else:
        return 0.0


In [8]:
# fine-tuning k value on validation set
def bestKfinding(train_df, validation_df):
    best_k = 1
    best_f1 = 0
    #train_df, validation_df, test_df = dataDivision()
    for k in range(1,6):
    #find k that gives best performance 
        y_true = validation_df['num'].values
        y_pred = []
        for _,row in validation_df.iterrows():
            pred = knnClassify(train_df, row, k)   
            y_pred.append(pred)
        tp, fp, tn, fn = getConfusionMatrix(y_true, y_pred)
        f1 = getFScore(tp, fp, tn, fn)
        #print(f1)
        if f1 > best_f1:
            best_f1 = f1
            best_k = k
    #print(best_k)
    return best_k


In [9]:
# best k is
best_k = bestKfinding(train_df, validation_df)

In [10]:
# testing using the best k
y_pred_best = []
for _,row in test_df.iterrows():
    pred = knnClassify(train_df, row, best_k)   
    y_pred_best.append(pred)
    
# report final performance

tp, fp, tn, fn = getConfusionMatrix(test_df['num'].values, y_pred_best)
printConfusionMatrix(tp, fp, tn, fn)
                
print('Accuracy:  %8.5f' % getAccuracy(tp, fp, tn, fn))
print('Precison:  %8.5f' % getPrecision(tp, fp, tn, fn))
print('Recall:    %8.5f' % getRecall(tp, fp, tn, fn))
print('F-Measure: %8.5f' % getFScore(tp, fp, tn, fn)) 
print('Best k:    %8.5f' % best_k)


               Actual
             1       0
P       +--------+--------+
r     1 | TP=84  | FP=22  |
e       +--------+--------+
d     0 | FN=14  | TN=64  |
.       +--------+--------+

Accuracy:   0.80435
Precison:   0.79245
Recall:     0.85714
F-Measure:  0.82353
Best k:     5.00000
