In [203]:
import os
import pickle
import threading
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from random import shuffle
from sklearn import tree
from sklearn.tree import DecisionTreeRegressor
from sklearn.model_selection import train_test_split
from PIL import Image
from numpy import ndarray
from scipy import linalg, stats
from scipy.spatial.distance import cityblock
from typing import List, Tuple

combined_directory = "./combined/"
directory_n_1 = "./chars/"
directory_n_2 = "./chars 2/"

In [204]:
#Takes a directory path and returns all the names of the subdirectories and their paths
def open_directory(directory_path):
    subdirectory_names = os.listdir(directory_path)
    subdirectory_paths = [directory_path + subdirectory_name + "/" for subdirectory_name in subdirectory_names]
    return subdirectory_paths, subdirectory_names

In [205]:
#Takes a list of image paths and returns a list of image
def open_images(image_paths):
    image_arrays = [np.array(Image.open(image_path)) for image_path in image_paths]
    for image_path in image_paths:
        image_arrays.append(np.array(Image.open(image_path)))
    return image_arrays

In [206]:

#Takes a list of image np.arrays and turns them into a large feature vector array where rows correnspond to images and columns correspond to features (pixels) of the image
def images_to_feature_vectors(image_arrays):
    images = open_images(image_arrays)
    h, w = images[0].shape
    n_features = h * w
    fvectors = np.empty((len(images), n_features))
    for i, image in enumerate(images):
        fvectors[i, :] = image.reshape(1, n_features)
    return fvectors

In [207]:
#Splits an image list into training and testing data
def split_two(image_list, ratio=[0.7, 0.3]):
    train_ratio = ratio[0]
    indices_for_splittin = [int(len(image_list) * train_ratio)]
    train, test = np.split(image_list, indices_for_splittin)
    return train, test

#Splits an image list into training, validation and testing data
def split_three(image_list, ratio=[0.8, 0.1, 0.1]):
    train_r, val_r, test_r = ratio
    assert(np.sum(ratio) == 1.0)
    indicies_for_splitting = [int(len(image_list) * train_r), int(len(image_list) * (train_r+val_r))]
    train, val, test = np.split(image_list, indicies_for_splitting)
    return train, val, test

In [208]:
#Takes the path of a directory where every image is placed into a directory with the name of the label
#and returns a dictionary with the feature vectors and their corresponding labels
def label_data(directory):
    data_labelled = {}
    data_fvectors = []
    data_labels = []
    subdirectory_paths, subdirectory_names = open_directory(directory)
    for i in range(len(subdirectory_names)):
        images = os.listdir(subdirectory_paths[i])
        images = [subdirectory_paths[i] + "/" + image for image in images]
        data_fv = images_to_feature_vectors(images)
        for fv in data_fv:
            data_fvectors.append(fv)
            data_labels.append(subdirectory_names[i])
    data_labelled["fvectors"] = data_fvectors
    data_labelled["labels"] = data_labels

    return data_labelled

In [209]:
#Does the same as label_data but it also splits the images into training and testing and returns seperate dictionaries
def split_train_test(directory):
    train_model = {}
    train_fvectors = []
    train_labels = []
    test_model = {}
    test_fvectors = []
    test_labels = []
    subdirectory_paths, subdirectory_names = open_directory(directory)
    for i in range(len(subdirectory_names)):
        images = os.listdir(subdirectory_paths[i])
        shuffle(images)
        images = [subdirectory_paths[i] + "/" + image for image in images]
        train, test = split_two(images)
        train_fv = images_to_feature_vectors(train)
        for fv in train_fv:
            train_fvectors.append(fv)
            train_labels.append(subdirectory_names[i])
        test_fv = images_to_feature_vectors(test)
        for fv in test_fv:
            test_fvectors.append(fv)
            test_labels.append(subdirectory_names[i])
    train_model["fvectors"] = train_fvectors
    train_model["labels"] = train_labels
    test_model["fvectors"] = test_fvectors
    test_model["labels"] = test_labels

    return train_model, test_model

In [210]:
#Save the training model to a pickle file
def save_pickle(data: dict) -> None:
    a_file = open("data.pkl", "wb")
    pickle.dump(data, a_file)
    a_file.close()

#Loads the training model from the pickle file
def load_pickle() -> dict:
    a_file = open("data.pkl", "rb")
    model = pickle.load(a_file)
    return model

In [211]:
#Computes the dot distance between arrays of training and testing data and returns the distance where rows correspond to test images and columns correspond to train images, very quick
def dot_distance(training, testing):
    tdott = np.dot(testing, training.transpose())
    modtrain = np.sqrt(np.sum(training*training, axis=1))
    modtest = np.sqrt(np.sum(testing*testing, axis=1))
    dist = -tdott / np.outer(modtest, modtrain.transpose())
    return dist

In [212]:
#computes the euclidean distance between arrays of training and testing data, pretty slow
def euclidean_distance(training, testing):
    dist = np.full([len(testing), len(training)], 0)
    for testrow in range(0, testing.shape[0]):
        for trainrow in range(0, training.shape[0]):
            dist[testrow][trainrow] = np.linalg.norm(testing[testrow]-training[trainrow])
    return dist

In [213]:
#Computes the manhattan distance between arrays of training and testing data, extremely slow
def manhattan_distance(training, testing):
    dist = np.full([len(testing), len(training)], 0)
    for testrow in range(0, testing.shape[0]):
        for trainrow in range(0, training.shape[0]):
            dist[testrow][trainrow] = cityblock(testing[testrow], training[trainrow])

In [214]:
KNNC = 5 #K in K nearest neighbours

#Takes a training dict and a testing dict, computes the distance between the feature vectors, finds the k nearest images, extracts their labels, 
#finds the most common label, and classifies it as such. Returns labels in the same order as the test feature vectors
def classify(train_model:dict, test_data:dict, k, distance) -> List[str]:
    train = np.array(train_model["fvectors"])
    train_labels = train_model["labels"]
    test = np.array(test_data["fvectors"])

    if distance == 0:
        dist = dot_distance(train, test)
    else:
        dist = euclidean_distance(train, test)
    
    knearest = np.argsort(dist, axis=1)[:, 0:k]
    klabels = []
    for i in range(len(knearest)):
        individual_labels = []
        for j in range(len(knearest[0])):
            individual_labels.append(train_labels[knearest[i][j]])
        klabels.append(individual_labels)
    klabels = pd.DataFrame(klabels)
    labels = klabels.mode(axis='columns')
    label = np.array(labels[0].tolist())
    return label

In [215]:
#Computes the accuracy of the model by running classify and checking the percentage of true labels
def evaluate(train_model: dict, test:dict, k, distance) -> Tuple[float, float]:

    true_labels = test["labels"]
    output_labels = classify(train_model, test, k, distance)
    n_of_correct_labels = 0
    #wrong_predictions = []
    #print(len(true_labels))
    for i in range(len(true_labels)):
        if output_labels[i] == true_labels[i]:
            n_of_correct_labels += 1
    #     else:
    #         wrong_labels = []
    #         wrong_labels.append(output_labels[i])
    #         wrong_labels.append(true_labels[i])
    #         wrong_predictions.append(wrong_labels)
    # print(len(wrong_predictions))
    # print(wrong_predictions)
    score= 100.0 * n_of_correct_labels / len(true_labels)
    return score


In [216]:
#Creates a random test/train split and runs the classifier once
def test_one(directory_of_images, k, distance):
    #train = label_data(directory_of_test_images)
    #test = label_data(directory_of_images)
    train, test = split_train_test(directory_of_images)
    #save_pickle(train)
    return evaluate(test, train, k, distance)


In [217]:
#Runs the classifier n times, each time creating a new training/testing split, then prints the average accuracy of the n runs
def test_n_times(directory_of_images, k, n, distance):
    accuracy = []
    for i in range(n):
        accuracy.append(test_one(combined_directory, k, distance))
    average=sum(accuracy)/n
    final_score = "Average score for k=" + str(k) + " = " + str(round(average, 2))
    print(final_score)


In [218]:
t1 = threading.Thread(target=test_n_times, args=(combined_directory, 3, 100, 0))
t2 = threading.Thread(target=test_n_times, args=(combined_directory, 4, 100, 0))
t3 = threading.Thread(target=test_n_times, args=(combined_directory, 5, 100, 0))
t4 = threading.Thread(target=test_n_times, args=(combined_directory, 6, 100, 0))
t5 = threading.Thread(target=test_n_times, args=(combined_directory, 7, 100, 0))
t6 = threading.Thread(target=test_n_times, args=(combined_directory, 8, 100, 0))

t1.start()
t2.start()
t3.start()
t4.start()
t5.start()
t6.start()

t1.join()
t2.join()
t3.join()
t4.join()
t5.join()
t6.join()

print("Done")

Average score for k=3 = 95.2
Average score for k=8 = 93.82
Average score for k=5 = 94.44
Average score for k=4 = 93.88
Average score for k=6 = 94.2Average score for k=7 = 94.21

Done


In [219]:
t7 = threading.Thread(target=test_n_times, args=(combined_directory, 3, 100, 1))
t8 = threading.Thread(target=test_n_times, args=(combined_directory, 4, 100, 1))
t9 = threading.Thread(target=test_n_times, args=(combined_directory, 5, 100, 1))
t10 = threading.Thread(target=test_n_times, args=(combined_directory, 6, 100, 1))
t11 = threading.Thread(target=test_n_times, args=(combined_directory, 7, 100, 1))
t12 = threading.Thread(target=test_n_times, args=(combined_directory, 8, 100, 1))

t7.start()
t8.start()
t9.start()
t10.start()
t11.start()
t12.start()

t7.join()
t8.join()
t9.join()
t10.join()
t11.join()
t12.join()

print("Done")