## Prediction on test set

In [1]:
import csv
import os
import cv2
import tensorflow as tf
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import json
from sklearn.metrics import accuracy_score, confusion_matrix
from tensorflow.keras.applications.resnet import preprocess_input as resnet_preprocess_input
from tensorflow.keras.applications.xception import preprocess_input as xception_preprocess_input
from tensorflow.keras import layers, metrics, regularizers
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.applications import ResNet152
from tensorflow.keras.applications import Xception
from tensorflow.keras.models import load_model


In [2]:
HOME = os.path.split(os.getcwd())[0]
print(HOME)
os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE"

e:\Code\CowId


In [3]:
def classify_images(encoder, cows_list1, cows_list2, threshold=0.5):
    # Getting the encodings for the passed faces
    tensor1 = encoder.predict(cows_list1)
    tensor2 = encoder.predict(cows_list2)
    
    distance = np.sum(np.square(tensor1-tensor2), axis=-1)
    prediction = np.where(distance<=threshold, 0, 1)
    print(prediction)
    return prediction

In [4]:
def read_image(path, target_size=(128, 128)):
    image = cv2.imread(path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = cv2.resize(image, target_size)
    return image

In [5]:
def generate_pairs(output_loc, test_dataset_path):
    pos_pairs, neg_pairs = [], []
    for cows in os.listdir(test_dataset_path):
        neg_dir_list = os.listdir(test_dataset_path)
        neg_dir_list.remove(cows)
        all_cows_except_current = []
        for neg_cows in neg_dir_list:
            neg_list = os.listdir(os.path.join(test_dataset_path, neg_cows))
            neg_list = [os.path.join(test_dataset_path, neg_cows, x) for x in neg_list]
            all_cows_except_current += neg_list
        path_to_cow = os.path.join(test_dataset_path, cows)
        print(path_to_cow)
        if "cow" in path_to_cow:
            pos_list = os.listdir(os.path.join(test_dataset_path, cows))
            pos_list = [os.path.join(test_dataset_path, cows, x) for x in pos_list]
            anchor_image = os.path.join(path_to_cow, 'anchor.jpg')
            pos_pairs += [(anchor_image, x) for x in pos_list if x != anchor_image]
            neg_pairs += [(anchor_image, x) for x in all_cows_except_current]
    print(len(pos_pairs), len(neg_pairs))
    with open(os.path.join(output_loc, 'pos_pairs.csv'), 'w', newline='') as file:
        writer = csv.writer(file)
        for row in pos_pairs:
            writer.writerow(row)
    with open(os.path.join(output_loc, 'neg_pairs.csv'), 'w', newline='') as file:
        writer = csv.writer(file)
        for row in neg_pairs:
            writer.writerow(row)


In [6]:
def save_params(params, filename):
    """Saves parameters to a json file"""
    with open(filename, 'w') as json_file:
        json.dump(params, json_file, indent=4)

In [7]:
def create_dir(base_folder, iteration):
    """Creates a new directory for storing data"""
    new_dir = os.path.join(HOME, 'data', base_folder, str(iteration))
    if not os.path.exists(new_dir):
        os.makedirs(new_dir)
    return new_dir

In [8]:
def get_next_iteration(base_folder):
    """Determines the next iteration number based on existing directories"""
    folder_names = [
        int(name) for name in os.listdir(os.path.join(HOME, 'data', base_folder))
        if os.path.isdir(os.path.join(HOME, 'data', base_folder, name))
    ]
    return 0 if len(folder_names) == 0 else max(folder_names) + 1

In [9]:
def read_dataset(file_path):
    """Reads a dataset from a csv file and returns it as a list"""
    with open(file_path, newline='') as file:
        return list(csv.reader(file))

In [10]:
def get_batch(tuples_list, batch_size=64, preprocess=True, encoder_architecture='resnet'):
    batch_steps = len(tuples_list)//batch_size
    
    for i in range(batch_steps+1):
        anchor   = []
        comparison = []
        
        j = i*batch_size
        while j<(i+1)*batch_size and j<len(tuples_list):
            a, c = tuples_list[j]
            anchor.append(read_image(a))
            comparison.append(read_image(c))
            j+=1
            
        anchor = np.array(anchor)
        comparison = np.array(comparison)
        
        if preprocess:
            if encoder_architecture == 'resnet':
                anchor = resnet_preprocess_input(anchor)
                comparison = resnet_preprocess_input(comparison)
            elif encoder_architecture == 'xception':
                anchor = xception_preprocess_input(anchor)
                comparison = xception_preprocess_input(comparison)
        
        yield ([anchor, comparison])

In [11]:
def classify_data(dataset, model, batch_size, threshold=0.5, encoder_architecture='resnet'):
    """Classifies images using the siamese model"""
    result = np.array([])
    for data in get_batch(dataset, batch_size=batch_size, preprocess=True, encoder_architecture=encoder_architecture):
        a, c = data
        result = np.append(result, classify_images(model, a, c, threshold))
    return result

In [12]:
def plot_confusion_matrix(true, pred, dir):
    
    # Compute and print the accuracy
    print(f"\nAccuracy of model: {accuracy_score(true, pred)}\n")
    
    # Compute and plot the Confusion matrix
    cf_matrix = confusion_matrix(true, pred)

    categories  = ['Similar','Different']
    names = ['True Similar','False Similar', 'False Different','True Different']
    percentages = ['{0:.2%}'.format(value) for value in cf_matrix.flatten() / np.sum(cf_matrix)]

    labels = [f'{v1}\n{v2}' for v1, v2 in zip(names, percentages)]
    labels = np.asarray(labels).reshape(2,2)
    plt.clf()
    sns.heatmap(cf_matrix, annot = labels, cmap = 'Blues',fmt = '',
                xticklabels = categories, yticklabels = categories)

    plt.xlabel("Predicted", fontdict = {'size':14}, labelpad = 10)
    plt.ylabel("Actual"   , fontdict = {'size':14}, labelpad = 10)
    plt.title ("Confusion Matrix", fontdict = {'size':12}, pad = 20)
    
    plt.tight_layout()  # Adjust layout

    plt.savefig(os.path.join(dir, 'confusion_matrix.png'))
    plt.clf()

In [31]:
params = {
    "STORE_FOLDER": "optuna_siamese_runs_tests",
    "TEST_DATASET_PATH": os.path.join(HOME, "data", "siamese_v8_augmented", "test"),
    "SIAMESE_MODEL_LOC": os.path.join(HOME, "data", "optuna_siamese_runs", "148")
}

def create_test_metrics():
    siamese_run_iteration = get_next_iteration(params["STORE_FOLDER"])
    siamese_dir = create_dir(params["STORE_FOLDER"], siamese_run_iteration)
    generate_pairs(siamese_dir, params["TEST_DATASET_PATH"])

    pos_dataset = read_dataset(os.path.join(siamese_dir, "pos_pairs.csv"))
    neg_dataset = read_dataset(os.path.join(siamese_dir, "neg_pairs.csv"))

    with open(os.path.join(params["SIAMESE_MODEL_LOC"], 'metrics.json'), 'r') as f:
        training_metrics = json.load(f)
        best_threshold = 16 # training_metrics['best_threshold']

    with open(os.path.join(params["SIAMESE_MODEL_LOC"], 'params.json'), 'r') as f:
        training_params = json.load(f)
        encoder_architecture = training_params['ARCHITECTURE']
    
    model_path = os.path.join(params["SIAMESE_MODEL_LOC"], "encoder.h5")
    encoder = load_model(model_path)
    
    pos_list = classify_data(pos_dataset, encoder, batch_size=256, threshold=best_threshold, encoder_architecture=encoder_architecture)
    neg_list = classify_data(neg_dataset, encoder, batch_size=256, threshold=best_threshold,  encoder_architecture=encoder_architecture)

    # Calculate overall accuracy
    true_overall = np.array([0]*len(pos_list) + [1]*len(neg_list))
    pred_overall = np.append(pos_list, neg_list)
    print(f"Accuracy of model overall: {accuracy_score(true_overall, pred_overall)}")

    # Calculate accuracy for positives
    true_pos = np.array([0]*len(pos_list))
    print(f"Accuracy of model on positives: {accuracy_score(true_pos, pos_list)}")

    # Calculate accuracy for negatives
    true_neg = np.array([1]*len(neg_list))
    print(f"Accuracy of model on negatives: {accuracy_score(true_neg, neg_list)}")

    # Add metrics to params
    params["OVERALL_ACCURACY"] = accuracy_score(true_overall, pred_overall)
    params["POSITIVE_ACCURACY"] = accuracy_score(true_pos, pos_list)
    params["NEGATIVE_ACCURACY"] = accuracy_score(true_neg, neg_list)
    params["ARCHICTECTURE"] = encoder_architecture
    params["BEST_THRESHOLD"] = best_threshold
    # # Save results in a readable format, joining pred with true to one string and save a list of all results
    # results = []
    # for i in range(len(true_overall)):
    #     results.append(f"True: {true_overall[i]}, Pred: {pred_overall[i]}")
    # params["RESULTS"] = results
    

    save_params(params, os.path.join(siamese_dir, "params.json"))

    # Plot confusion matrix
    plot_confusion_matrix(true_overall, pred_overall, siamese_dir)

In [32]:
create_test_metrics()

e:\Code\CowId\data\siamese_v8_augmented\test\cow_22
e:\Code\CowId\data\siamese_v8_augmented\test\cow_24
e:\Code\CowId\data\siamese_v8_augmented\test\cow_3
e:\Code\CowId\data\siamese_v8_augmented\test\cow_6
e:\Code\CowId\data\siamese_v8_augmented\test\negatives
171 1817
[0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 1 1 1 0 1 0 0 0 0 0 0 0
 0 0 0 0 0 0 0 0 0 0 0 0 1 1 0 0 0 1 1 1 1 1 1 0 0 1 0 0 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1]
[1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 

<Figure size 640x480 with 0 Axes>