In [1]:
import matplotlib.pyplot as plt
from IPython.display import display
import numpy as np
import pandas as pd
import math
import csv
import heapq
import random

In [2]:
def find_attributes(d):
    with open(d.filename, 'r', newline='') as file:
        reader = csv.reader(file)
        return (next(reader))

In [3]:
def get_raw_data(d):
    unsorted_values = []   
    with open(d.filename, 'r', newline='') as file:
        reader = csv.reader(file)
        next(reader)#skip labels
        for row in reader:
            unsorted_values.append(row)
            
    return unsorted_values

In [4]:
def attribute_values(d):
    vals = np.transpose(d.raw)
    new_vals = []
        
    for i in range(len(vals)):
        inner = []
        for j, val in enumerate(vals[i]):
            
            try:
                inner.append(float(val))
            except:
                inner.append(val)
        new_vals.append(inner)
    return new_vals
    

In [5]:
# sorts a 2d array by some column, using merge sort (smallest to largest)
def merge_sort_2d(array, index):######ONLY USE FOR INTEGERS OR DOUBLES########NO STRINGS########

    if len(array) == 1:
        return array
    
    
    split_location = len(array)//2
    
    half1 = merge_sort_2d(array[:split_location], index)
    half2 = merge_sort_2d(array[split_location:], index)
    
    merged_list = []
    
    counter1 = 0
    counter2 = 0
    
    for i in range(len(array)):
        if(counter1 < len(half1) and counter2 < len(half2)):
            if(half1[counter1][index] < half2[counter2][index]):
                merged_list.append(half1[counter1])
                counter1+=1
            else:
                merged_list.append(half2[counter2])
                counter2+=1
        else:
            break
            
    while(counter1 < len(half1)):#one of these will not be true
        merged_list.append(half1[counter1])
        counter1+=1
    while(counter2 < len(half2)):
        merged_list.append(half2[counter2])
        counter2+=1
        
        
    return merged_list

In [6]:
def normalize_list(data):
    min_val = min(data)
    max_val = max(data)
    
    return [(x - min_val) / (max_val - min_val) for x in data]

In [39]:
class Dataset():
    def __init__(self, filename):
        self.filename = f"{filename}.csv"
        self.raw = get_raw_data(self)
        self.attributes = find_attributes(self)
        self.number_of_attributes = len(self.attributes)
        self.attribute_values = attribute_values(self)
        
        
    def get_attributes():
        return self.attributes
    
    def sort_by_raw(self,index):#return a dataset object sorted by some specific attribute
        return merge_sort_2d(self.raw,index)
        
    def update_attribute_values(self, new_d):#ex: after sorted by some index
        self.attribute_values = new_d
        
    def restore_raw(self):#once the attributes have been updated, restore raw data so that it can happen again 
        self.raw = get_raw_data(self)
    
    def pre_normalize_raw(self):#normalize the data and update the raw data
        temp = attribute_values(self)#get values transposed and converted to a floats
        temp_2 = []
        for i, vals in enumerate(temp):
            try:
                temp_2.append(normalize_list(vals))#normalize
            except TypeError:
                temp_2.append(vals)#except strings
        self.raw = np.transpose(temp_2)#update raw data so that the yoink works more cleanly
    
    def yoink_data_point(self): #grab a random point and reset the dataset to not include it
        p = random.randint(0,149)
        self.pre_normalize_raw()
        point = self.raw[p]
        
        self.raw = np.delete(self.raw, p, axis=0)#change raw data
        self.attribute_values = attribute_values(self)#recreate attribute values excluding point yoinked
        self.restore_raw()
        
        converted_point = []
        
        for p in point:
            try:
                converted_point.append(float(p))
            except ValueError:
                converted_point.append(p)
        return converted_point

In [40]:
class ScatterPlot:
    def __init__(self, x_vals, y_vals, figsize=(8, 6), title="KNN model visual", xlabel="X", ylabel="Y", grid=True):
        self.x_vals = [float(x) for x in x_vals]
        self.y_vals = [float(y) for y in y_vals]
        self.size = figsize
        self.title = title
        self.xlabel = xlabel
        self.ylabel = ylabel
        self.grid = grid
        self.data_points = [x_vals,y_vals]  
    def display(self):
        # Create the plot
        plt.figure(figsize = (14, 6))
        plt.scatter(self.x_vals, self.y_vals)
        plt.title(self.title)
        plt.xlabel(self.xlabel)
        plt.ylabel(self.ylabel)
        plt.grid(self.grid)
        plt.show()

In [44]:
def euclidian_distance(d, p):
    
    distances = [0] * len(d.attribute_values[0])#list to store each distance
    p_location = 0 
    for att in d.attribute_values:
        counter = 0
        for val in att:
            try:  
                distances[counter] += (val - p[p_location])** 2
                counter+=1
            except TypeError:#no strings
                pass
        p_location += 1
        
        
        
    for i, dist in enumerate (distances):
        distances[i] = math.sqrt(dist)
    return distances

In [45]:
class KNN_Object():
    def __init__ (self, d):
        self.d = d#the dataset object
        self.graph = ScatterPlot(d.attribute_values[0],
                                 d.attribute_values[1],
                                 "KNN model(2d)", 
                                 d.attributes[0], 
                                 d.attributes[1])
                   #the graph will only display the first two attributes in the list
                   #can also serve as an example of how high dimensional data doesn't calssify well using this algorithm
    def display(self):
        self.graph.display()
        
    
    def categorize(self, p):#takes some point p and decides its category based on the KNN algorithm
        neighbor_distances = euclidian_distance(self.d, p)
        closest_7 = heapq.nsmallest(7,enumerate(neighbor_distances), key=lambda x: x[1])
        
        indicies = [idx for idx, dist in closest_7]
        
        print(f"The closest neighbors identified have indicies {indicies}")
        
        Setosa_counter = 0
        Versicolor_counter = 0
        Virginica_counter = 0
        
        
        for num in indicies:
            match self.d.attribute_values[4][num]:
                case 'Setosa':
                    Setosa_counter += 1
                case 'Versicolor':
                    Versicolor_counter += 1
                case 'Virginica':
                    Virginica_counter += 1
        
        if(Setosa_counter > Versicolor_counter):
            if(Setosa_counter > Virginica_counter):
                print("The KNN-Algorithm has classified this point as Setosa")
            else:
                print("The KNN-Algorithm has classified this point as Virginica")
        else:
            if(Versicolor_counter > Virginica_counter):
                print("The KNN-Algorithm has classified this point as Versicolor")
            else:
                print("The KNN-Algorithm has classified this point as Virginica")


In [62]:
iris_dataset = Dataset("iris")#this returns an object with attributes and their values 
sorted_by_sepal_length = iris_dataset.sort_by_raw(0)#kinda unecessary
sorted_by_sepal_length = np.transpose(sorted_by_sepal_length)
iris_dataset.update_attribute_values(sorted_by_sepal_length)

model = KNN_Object(iris_dataset)
#model.display()


p = iris_dataset.yoink_data_point()
print(f"The datapoint randomly pulled from the set had type {p[4]}")
model.categorize(p)


The datapoint randomly pulled from the set had type Setosa
The closest neighbors identified have indicies [8, 13, 3, 41, 46, 2, 44]
The KNN-Algorithm has classified this point as Setosa
