In [1]:
import numpy as np
import pandas as pd
import pprint as pp
from numpy import *

In [125]:
# pull in data
X = pd.read_csv('heart_disease_uci.csv')
# drop irrelevant columns 
X = X.drop("id", axis=1)
X = X.drop("dataset", axis=1)
# drop rows with NaN in relevant feature columns
feature_names = ["age", "sex", "cp", "trestbps", "chol", "fbs", "restecg", "thalch", "exang", "oldpeak", "slope", "ca", "thal"]
X = X.dropna(subset=feature_names)
# get feature indices 
features = [X.columns.get_loc(col) for col in feature_names]
display(X)
# convert to numpy matrix for calculations
X = X.to_numpy()

Unnamed: 0,age,sex,cp,trestbps,chol,fbs,restecg,thalch,exang,oldpeak,slope,ca,thal,num
0,63,Male,typical angina,145.0,233.0,True,lv hypertrophy,150.0,False,2.3,downsloping,0.0,fixed defect,0
1,67,Male,asymptomatic,160.0,286.0,False,lv hypertrophy,108.0,True,1.5,flat,3.0,normal,2
2,67,Male,asymptomatic,120.0,229.0,False,lv hypertrophy,129.0,True,2.6,flat,2.0,reversable defect,1
3,37,Male,non-anginal,130.0,250.0,False,normal,187.0,False,3.5,downsloping,0.0,normal,0
4,41,Female,atypical angina,130.0,204.0,False,lv hypertrophy,172.0,False,1.4,upsloping,0.0,normal,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
299,68,Male,asymptomatic,144.0,193.0,True,normal,141.0,False,3.4,flat,2.0,reversable defect,2
300,57,Male,asymptomatic,130.0,131.0,False,normal,115.0,True,1.2,flat,1.0,reversable defect,3
301,57,Female,atypical angina,130.0,236.0,False,lv hypertrophy,174.0,False,0.0,flat,1.0,normal,1
508,47,Male,asymptomatic,150.0,226.0,False,normal,98.0,True,1.5,flat,0.0,reversable defect,1


In [126]:
# Normalize Data 
X = normalize(X)
# one-hot encode data
X = onehot_encode(X)

In [127]:
# split data into training:validation:test := 6:2:2
s1 = int(len(X) * .6)
s2 = int(len(X) * .2)

X_t = mat(X[0:s1, :])
X_v = mat(X[s1:s1+s2, :])
X_test = mat(X[s1 + s2:, :])

In [131]:
# test model on validation data with K = 1, 2,...
accuracy_K = []
for k in range(1, 50):
    res = validate(k)
    accuracy_K.append(calc_accuracy(res))

max_accuracy = max(accuracy_K)
max_K = accuracy_K.index(max_accuracy) + 1
print(max_K)
# K = 4 gives the highest accuracy

4
[0.864406779661017, 0.9152542372881356, 0.9152542372881356, 0.9322033898305084, 0.9152542372881356, 0.9152542372881356, 0.9152542372881356, 0.8813559322033898, 0.8813559322033898, 0.8813559322033898, 0.8813559322033898, 0.8813559322033898, 0.8813559322033898, 0.8813559322033898, 0.8983050847457628, 0.9152542372881356, 0.9152542372881356, 0.8983050847457628, 0.8983050847457628, 0.8983050847457628, 0.8983050847457628, 0.8983050847457628, 0.8983050847457628, 0.8983050847457628, 0.8983050847457628, 0.8983050847457628, 0.8983050847457628, 0.8983050847457628, 0.8983050847457628, 0.8983050847457628, 0.8813559322033898, 0.8983050847457628, 0.8983050847457628, 0.8983050847457628, 0.8983050847457628, 0.8983050847457628, 0.8983050847457628, 0.8983050847457628, 0.8983050847457628, 0.8983050847457628, 0.8983050847457628, 0.8983050847457628, 0.8813559322033898, 0.8813559322033898, 0.8813559322033898, 0.8813559322033898, 0.864406779661017, 0.8983050847457628, 0.8813559322033898]


In [132]:
# test model on test data with K value choosen from validation step
K=4
test_result = test(K)
# construct confusion matrix
conf_matrix = get_confusion_matrix(test_result)
TrueP = conf_matrix[0][0]
FalsN = conf_matrix[1][0]
TrueN = conf_matrix[1][1]
FalsP = conf_matrix[0][1]
# calculate performance metrics from confusion matrix
accuracy = (TrueP + TrueN) / (TrueP + FalsN + FalsP + TrueN)
precision = TrueP / (TrueP + FalsP)
recall = TrueP / (TrueP + FalsN)
f_score = 2 * ((precision * recall) / (precision + recall))
print(f"accuracy: {accuracy}")
print(f"precision: {precision}")
print(f"recall: {recall}")
print(f"F-score: {f_score}")
print(conf_matrix)



accuracy: 0.8852459016393442
precision: 0.9642857142857143
recall: 0.8181818181818182
F-score: 0.8852459016393442
[[27, 1], [6, 27]]


In [114]:
def get_confusion_matrix(R):
    # in: training results => row 0 := true value; row 1 := test value
    # out: confusion matrix
    confusion = [[0, 0], [0, 0]]
    for i in zip(R[0], R[1]):
        if i[0] == 1 and i[1] == 1:
            confusion[0][0] += 1
    for i in zip(R[0], R[1]):
        if i[0] == 1 and i[1] == 0:
            confusion[1][0] += 1
    for i in zip(R[0], R[1]):
        if i[0] == 0 and i[1] == 0:
            confusion[1][1] += 1
    for i in zip(R[0], R[1]):
        if i[0] == 0 and i[1] == 1:
            confusion[0][1] += 1
    
    return confusion

In [115]:
def calc_accuracy(R):
    # in: results from validation data
    # out: accuracy
    correct = 0
    incorrect = 0
    for i in range(len(R[0])):
        if R[0][i] == R[1][i]:
            correct += 1
        else:
            incorrect += 1
    
    return correct / (correct + incorrect)

In [116]:
def test(K):
    # in: K value
    # out: results = row 0 := true value; row 1 := test value
    results = [[], []]
    for i in range(X_test.shape[0]):
        results[0].append(0 if X_test[i, -1] == 0 else 1)
        results[1].append(classifyPoint(X_test[i, :], X_t, K))

    return(results)

In [117]:
def validate(K):
    # in: K value
    # out: results = row 0 := true value; row 1 := prediction value
    results = [[], []]
    for i in range(X_v.shape[0]):

        results[0].append(0 if X_v[i, -1] == 0 else 1)
        results[1].append(classifyPoint(X_v[i, :], X_t, K))

    return(results)

In [118]:
def test_training(K):
    # in: K value
    # out: results = row 0 := true value; row 1 := test value
    results = [[], []]
    for i in range(X_v.shape[0]):
        results[0].append(0 if X_v[i, -1] == 0 else 1)
        results[1].append(classifyPoint(X_t[i, :], X_t, K))

    return(results)

In [119]:
def classifyPoint(X, A, K):
    # in: test point, training data, K value
    # out: classification result
    k_nearest = get_k_nearest(K, X, A)
    k_count_0 = 0
    k_count_1 = 0
    for i in range(k_nearest.shape[0]):
        # class label is 2nd to last column
        if k_nearest[i, -2] == 0:
            k_count_0 += 1
        else:
            k_count_1 += 1
    
    return (0 if k_count_0 > k_count_1 else 1)


In [120]:
def get_k_nearest(K, X, A):
    # in: K value, test point, training data
    # out: K nearest points in test set
    dist = []
    for i in range(A.shape[0]):
        curr_dist = calc_eucl_dist(asarray(X)[0], asarray(A[i, :])[0])
        dist.append(curr_dist)

    dist = mat(dist)
    dist = dist.T
    # append distances to training data
    A_dist = hstack((A, dist))
    # sort by distance column
    sorted_indices = argsort(reshape(A_dist[:, -1], -1))
    A_dist = A_dist[asarray(sorted_indices)[0]]

    return(A_dist[0:K + 1, :])
    

In [121]:
def calc_eucl_dist(A, B):
    # in: two observations/rows
    # out: the euclidean distance between them
    radicand = 0
    
    for i in range(len(A)):
        radicand += (A[i] - B[i]) ** 2
        
    return sqrt(radicand)

In [122]:
def onehot_encode(A):
    # in: training data
    # out: training data one-hot encoded

    B = copy(A)
    labels = mat(B[:, -1]).T
    to_remove = [B.shape[1] - 1]
    for j in range(A.shape[1] - 1):

        if type(A[0, j]) == str or type(A[0, j]) == bool:

            curr = onehot_column(A[:, j])
            to_remove.append(j)
            B = hstack((B, curr))
    
    B = delete(B, to_remove, axis=1)
    B = hstack((B, labels))
    return B

In [123]:
def onehot_column(C):
    # in: categorical column
    # out: matrix one-hot encoding of column
            
    categories = unique(C)
    curr_mat = zeros((C.shape[0], categories.shape[0]))

    for i in range(C.shape[0]):
        for j in range(len(categories)):
             if C[i] == categories[j]:
                  curr_mat[i, j] = 1

    return curr_mat

In [124]:
def normalize(A):
    # in: training data
    # out: training data with normalized continuous columns 
    for j in range(A.shape[1] - 1):

        if type(A[0, j]) == float or type(A[0, j]) == int:
      
            mn = mean(A[:, j])
            stdDev = std(A[:, j].astype(float))
            A[:, j] = (A[:, j] - mn) * (stdDev ** -1)
        

    return(A)  