<h3>Imports</h3>

In [None]:
import torch
from torch import nn
import numpy as np
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
import torchvision.transforms.functional as tr
import torchvision.transforms.v2.functional as trv2
from torchvision.transforms import RandomRotation
from torchvision.transforms.functional import pil_to_tensor
import pandas as pd
import torch.nn.functional as F
from torchvision.transforms import InterpolationMode
from torchvision.io import read_image
import matplotlib.pyplot as plt
import math
import time
import os
import hickle
import albumentations as A
import cv2
import ast
from os import listdir, makedirs
from os.path import isfile, join

<h3>Global Variables</h3>

In [None]:
train_csv = "/home/tyler/Documents/School Material/Current Semester/Research Team/Code/PinDataCalibrateTrain/data.csv"
val_csv = "/home/tyler/Documents/School Material/Current Semester/Research Team/Code/PinDataCalibrateVal/data.csv"

In [None]:
best_model_path = "./best_model/best_calibration_weights.pt"
last_epoch_model_path = "./best_model/last_calibration_weights.pt"
best_loss_path = "./best_model/calibration_loss.txt"
os.makedirs("./best_model/", exist_ok=True)

In [None]:
num_of_inputs = 5
num_of_outputs = 6
num_of_frames = 101

hidden_size = 256
num_of_rnn_layers = 1
embedding_size = 192

In [None]:
SECTION_SIZE_HEIGHT = 80
SECTION_SIZE_WIDTH = 80
FONT_SIZE = 0.7
THICKNESS = 2

In [None]:
batch_size = 16

In [None]:
torch.manual_seed(12)

In [None]:
device = ("cuda" if torch.cuda.is_available() else "cpu" )
print(f"Using {device} device")

In [None]:
TRAIN_LOSS_KEY = "Training Loss"
TRAIN_ACCURACY_KEY = "Training Accuracy"

VAL_LOSS_KEY = "Validation Loss"
VAL_ACCURACY_KEY = "Validation Accuracy"

TEST_LOSS_KEY = "Testing Loss"
TEST_ACCURACY_KEY = "Testing Accuracy"

<h3>Aux Functions</h3>

In [None]:
def get_best_loss(current_loss: float, model):
    
    try:
        file = open(best_loss_path, "r+")
    except:
        file = open(best_loss_path, "w+")
        file.write("100")
        file.close()
        get_best_loss(current_loss, model)
        return
        
    line = file.readline()
    best_loss = float(line.replace("\n", ""))

    got_new_loss = False

    if current_loss < best_loss:
        file.seek(0)
        file.write(str(current_loss))
        file.truncate()

        torch.save(model.state_dict(), best_model_path)
        print(f"New best loss!")
        got_new_loss = True

    file.close()

    torch.save(model.state_dict(), last_epoch_model_path)

    return got_new_loss

In [None]:
def get_corrects_and_size(pred, y):
    
    pred = torch.argmax(pred, dim=0)
    y = torch.argmax(y, dim=0)
    corrects = torch.eq(pred,y).int()
    correct = corrects.sum().item()
    size = corrects.numel()

    return correct, size

In [None]:
def get_line(list: list):
    line = [str(x) for x in list]
    line = ','.join(line)
    line += "\n"
    line = line.replace("]", "").replace("[", "").replace(" ", "")

    return line

In [None]:
def get_dataloader(dataset):
    train_size = int(0.8 * len(dataset))
    val_size = int(0.1 * len(dataset))
    test_size = len(dataset) - (train_size + val_size) 
    train_dataset, test_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, test_size, val_size])
    train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)
    val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True)

    return train_dataloader, test_dataloader, val_dataloader

In [None]:
def show_and_save_graph(metrics:dict, ylabel:str,  key_one:str, key_two:str=None):
    
    metric_one = metrics[key_one]
    metric_two = metrics[key_two]

    plt.plot([i + 1 for i in range(len(metric_one))] ,metric_one)
    plt.plot([i + 1 for i in range(len(metric_two))] ,metric_two)

    if key_two is not None:

        title = f'{key_one} and {key_two} vs Epoch'

        plt.xlabel('Epoch')
        plt.ylabel(ylabel)
        plt.title(title)
        plt.legend([key_one, key_two])

    else:
        title = f'{key_one} vs Epoch'

        plt.xlabel('Epoch')
        plt.ylabel(ylabel)
        plt.title(title)
        plt.legend([key_one])
    
    plt.grid()

    os.makedirs("./Figures/", exist_ok=True) 
    plt.savefig(f"./Figures/{title}_{int(time.time())}")

    plt.show()

In [None]:
def init_metrics(*args):

    metrics = {}

    for key in args:
        metrics[key] = []
    
    return metrics

In [None]:
def add_grid_lines(img):
    for i in range(0, len(img), SECTION_SIZE_HEIGHT):
        for j in range(0, len(img[i])):
            img[i,j] = (0,0,255)
            

    for i in range(0, len(img)):
        for j in range(0, len(img[i]), SECTION_SIZE_WIDTH):
            img[i,j] = (0,0,255)
        
    
    col = 0
    for i in range(0, len(img), SECTION_SIZE_WIDTH):

        row = 0
        for j in range(0, len(img[i]), SECTION_SIZE_HEIGHT):
            
            text = f"{row}, {col}"
            cv2.putText(img,text, (i, j + 15), cv2.FONT_HERSHEY_SIMPLEX, FONT_SIZE, (255,0,0), thickness=THICKNESS)
            row += 1

        col += 1

In [None]:
def get_file_names(src_dir: str):
    file_names = [f for f in listdir(src_dir) if isfile(join(src_dir, f))]
    return file_names

In [None]:
transform_augmentation = A.Compose([
    A.GaussianBlur((9,21)),
    A.ColorJitter(),
    A.GaussNoise()
])

In [None]:
transform_normalize = A.Compose([
    A.Normalize(always_apply=True),
    A.ToFloat(always_apply=True),
])

In [None]:
transform_resize= A.Compose([
    A.LongestMaxSize(192, always_apply=True)
])

<h3>Custom Dataset Init</h3>

In [None]:
class CustomDataSet(Dataset):
    def __init__(self, csv):
        df = pd.read_csv(csv, header=0)
        self.df = df
        self.predictors = df["file_name"].to_numpy()
        self.cls = df["pin"].to_numpy()
        

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        if idx >= self.__len__():
            raise StopIteration
        
        file_name = self.predictors[idx]
        target = self.cls[idx]
        
        predictors = cv2.imread(file_name)
        predictors = transform_resize(image=predictors)["image"]
        predictors = transform_augmentation(image=predictors)["image"]
        # add_grid_lines(predictors)
        predictors = transform_normalize(image=predictors)["image"]
        predictors = torch.tensor(predictors, device=device).permute([2,0,1])

        target = [int(x) for x in target.split(",")]

        target_1 = torch.tensor([0,0,0,0,0,0], device=device).float()
        target_2 = torch.tensor([0,0,0,0,0,0], device=device).float()

        target_1[target[0]] = 1
        target_2[target[1]] = 1

        target = torch.vstack([target_1, target_2])

        # target = torch.tensor(target, device=device) / 5

        return (predictors, target)
        # return (predictors, target, file_name)

In [None]:
train_dataset = CustomDataSet(train_csv)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)


val_dataset = CustomDataSet(val_csv)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True)

In [None]:
# for i, X in enumerate(custom_dataset):
#     print(f'i : {i} X: {X}')

In [None]:
# for i, (predictors, target) in enumerate(train_dataset):
#     print(f'i : {i} input_file: {predictors}{predictors.shape}')
#     print(f'i : {i} target: {target}{target.shape}')
#     break

<h3>Neural Net Classes</h3>

In [None]:
class ConvBlockDown(nn.Module):
    def __init__(self, in_channels, out_channels):

        super().__init__()

        self.seq = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, 3, padding=1),
            nn.ELU(),
            nn.BatchNorm2d(out_channels),
            nn.MaxPool2d(2)
        )
    
    def forward(self, x):
        x = self.seq(x)
        
        return x

In [None]:
class ConvBlockUp(nn.Module):
    def __init__(self, in_channels, out_channels):

        super().__init__()

        self.seq = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, 3, padding=1),
            nn.ELU(),
            nn.BatchNorm2d(out_channels),
            nn.Upsample(2)
        )
    
    def forward(self, x):
        x = self.seq(x)
        
        return x

In [None]:
class SkipConnection(nn.Module):
    def __init__(self, in_channels, out_channels):

        super().__init__()

        self.conv2d = nn.Conv2d(in_channels, out_channels, 1, padding=0)
    
    def forward(self, x):
        x = self.conv2d(x)
        
        return x

In [None]:
class ConvBlock(nn.Module):
    def __init__(self, in_features, out_features):
        super().__init__()

        self.cnn1 = nn.Sequential(
            nn.Conv2d(in_features, out_features, 3, padding=1),
            nn.ELU()
        )
        self.cnn2 = nn.Sequential(
            nn.Conv2d(out_features, out_features, 3, padding=1),
            nn.ELU()
        )

        self.skip1 = SkipConnection(in_features, out_features)
        self.pool = nn.MaxPool2d((2,2))
        

    def forward(self, x1):

        x2 = self.cnn1(x1)
        x2 = self.cnn2(x2)

        x1 = self.skip1(x1)

        x2 += x1

        x2 = F.elu(x2)
        x2 = self.pool(x2)

        return x2

In [None]:
class DenseLayer(nn.Module):
    def __init__(self, in_features, out_features):
        super().__init__()

        self.dense = nn.Sequential(nn.Linear(in_features, out_features),
                                    nn.LayerNorm(out_features),
                                    nn.ELU())
        
    def forward(self, x1):
        return self.dense(x1)

In [None]:
class NeuralNet(nn.Module):
    def __init__(self):
        super().__init__()

        base = 1024

        self.start = nn.Sequential(
            nn.Conv2d(3, base // 32, 7, padding=3),
            nn.ELU(),
            nn.MaxPool2d((2,2))
        )
        
        self.block1 = ConvBlock(base // 32, base // 16)
        self.block2 = ConvBlock(base // 16, base // 8)
        self.block3 = ConvBlock(base // 8, base // 4)
        self.block4 = ConvBlock(base // 4, base // 2)
        self.block5 = ConvBlock(base // 2, base // 2)
        self.block6 = ConvBlock(base // 2, base)
        self.last = nn.Conv2d(base, base, 1, padding=0)

        self.flatten = nn.Flatten()

        self.fc1_1 = nn.Linear(base, 128)
        self.out1 = nn.Linear(128, num_of_outputs)

        self.fc1_2 = nn.Linear(base, 128)
        self.out2 = nn.Linear(128, num_of_outputs)
        
    def forward(self, x1):
        
        x1 = self.start(x1)
        x1 = self.block1(x1) 
        x1 = self.block2(x1) 
        x1 = self.block3(x1) 
        x1 = self.block4(x1) 
        x1 = self.block5(x1) 
        x1 = self.block6(x1) 
    
        x1 = self.last(x1)
        x1 = self.flatten(x1)
    
        o1 = self.fc1_1(x1)
        o1 = F.elu(o1)
        o1 = self.out1(o1)

        o2 = self.fc1_2(x1)
        o2 = F.elu(o2)
        o2 = self.out2(o2)

        return torch.vstack([o1.unsqueeze(0), o2.unsqueeze(0)]).permute([1,0,2])

In [None]:
# class NeuralNet(nn.Module):
#     def __init__(self):
#         super().__init__()

#         base = 1024

#         self.start = nn.Sequential(
#             nn.Conv2d(3, base // 32, 7, padding=3),
#             nn.ELU(),
#             nn.MaxPool2d((2,2))
#         )
        
#         self.block1 = ConvBlock(base // 32, base // 16)
#         self.block2 = ConvBlock(base // 16, base // 8)
#         self.block3 = ConvBlock(base // 8, base // 4)
#         self.block4 = ConvBlock(base // 4, base // 2)
#         self.block5 = ConvBlock(base // 2, base // 2)
#         self.block6 = ConvBlock(base // 2, base)
        
#         self.last = nn.Conv2d(base, base, 1, padding=0)
#         self.flatten = nn.Flatten()

        # self.out = nn.Sequential(
        #     DenseLayer(base, 128),
        #     DenseLayer(128, 64),
        #     nn.Linear(64, num_of_outputs)
        # )
        
#     def forward(self, x1):
        
#         x1 = self.start(x1)
#         x1 = self.block1(x1) 
#         x1 = self.block2(x1) 
#         x1 = self.block3(x1) 
#         x1 = self.block4(x1) 
#         x1 = self.block5(x1) 
#         x1 = self.block6(x1) 
    
#         x1 = self.last(x1)
#         x1 = self.flatten(x1)
    
#         x1 = self.out(x1)

#         return x1

<h3>Model Init</h3>

In [None]:
model = NeuralNet().to(device)
pytorch_total_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Total Params: {pytorch_total_params:,}")

In [None]:
criterion1 = nn.MSELoss()
optimizer = torch.optim.Adam(list(model.parameters()), lr=0.00001)
metrics = init_metrics(TRAIN_LOSS_KEY, TRAIN_ACCURACY_KEY, VAL_LOSS_KEY, VAL_ACCURACY_KEY)

<h3>Train and Test Init</h3>

In [None]:
def train_and_eval(dataloader, optimizer, is_training: bool, is_classification: bool):

    if is_training:
        model.train()
        mode_str = "Train"
        loss_history = metrics[TRAIN_LOSS_KEY]
        accuracy_history = metrics[TRAIN_ACCURACY_KEY]

    else:
        model.eval()
        mode_str = "Val"
        loss_history = metrics[VAL_LOSS_KEY]
        accuracy_history = metrics[VAL_ACCURACY_KEY]
    
    size = 0

    num_batches = len(dataloader)

    loss, loss_num, correct = 0, 0, 0
    correct_makes, correct_misses = 0, 0
    incorrect_on_class, incorrect_off_class = 0, 0

    
    
    for X, y in dataloader:

        pred = model(X)

        loss = criterion1(pred, y)

        loss_num += loss.item()

        if is_training:
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()

        if is_classification:

            corrects, sizes = get_corrects_and_size(pred, y)

            correct += corrects
            size += sizes

    loss_num /= num_batches
    loss_history.append(loss_num)

    # Calculate Classification Metrics
    if is_classification:

        correct /= size
        accuracy = 100*correct
        accuracy_history.append(accuracy)
        print(f"{mode_str} Accuracy: {(accuracy):>0.3f}")

    print(f"{mode_str} Loss: {loss_num:>12f}")

In [None]:
def print_metrics(start, t, t_with_best_loss, best_loss):
    train_loss_hist = metrics[TRAIN_LOSS_KEY]
    val_loss_hist = metrics[VAL_LOSS_KEY]

    if t > 1:
        train_loss_dif = (train_loss_hist[-2] - train_loss_hist[-1]) * 100
        val_loss_dif = (val_loss_hist[-2] - val_loss_hist[-1]) * 100
        train_val_loss_dif = (train_loss_hist[-1] - val_loss_hist[-1]) * 100
        print()
        print(f"Train Loss Difference: {train_loss_dif:>0.4f}\t\tVal Loss Difference: {val_loss_dif:>0.4f}")
        print(f"Train Val Loss Difference: {train_val_loss_dif:>0.4f}")
        print()

    got_new_loss = get_best_loss(val_loss_hist[-1], model)

    if got_new_loss:
        t_with_best_loss = t
        best_loss = val_loss_hist[-1]
    
    t_since_best_loss = t - t_with_best_loss
    
    print(f"Epoch with best loss: {t_with_best_loss}\t\tBest Loss: {best_loss:12f}")
    print(f"Epochs Since Best Loss: {t_since_best_loss}")

    print(f"Run Time: {round((time.time() - start), 2)}s")

    print()

    return t_with_best_loss, best_loss

<h3>Training and Validation</h3>

In [None]:
# t = 0
# t_with_best_loss = 0
# best_loss = 0

# # model.load_state_dict(torch.load(last_epoch_model_path))

# while True:
#     start = time.time()
#     t = t + 1

#     print(f"Epoch {t}\n-------------------------------")

#     train_and_eval(train_dataloader, optimizer, True, True)
#     train_and_eval(val_dataloader, optimizer, False, True)
#     t_with_best_loss, best_loss = print_metrics(start, t, t_with_best_loss, best_loss)

In [47]:
src_dir = "./PinDataCalibrateBack/"

fps = 100
conf = 0.3

number_locations = torch.zeros((6,6))

number_locations[0,3] = 0

number_locations[3,4] = 1
number_locations[3,3] = 2
number_locations[3,2] = 3

number_locations[2,4] = 4
number_locations[2,3] = 5
number_locations[2,2] = 6

number_locations[1,4] = 7
number_locations[1,3] = 8
number_locations[1,2] = 9


model.load_state_dict(torch.load(best_model_path))
model.eval()
file_names = get_file_names(src_dir)

correct, size = 0, 0



with torch.no_grad():

    for src in file_names:
        file_name_split = src.split("_")
        count = 0

        if file_name_split[0] != "Front":
            continue

        target_pin = file_name_split[1]
        target_pin = [int(x) for x in target_pin]

        pred_pin_other = []
        pred_pin_idx = 0

        max_x = 0
        max_y = 0

        gathering_pins = False

        vidcap = cv2.VideoCapture(join(src_dir, src))
        success,image = vidcap.read()

        if not success or image is None:
            print("ERROR", src)
            continue


        while success:
            vidcap.set(cv2.CAP_PROP_POS_MSEC,(count*fps))

            success,image = vidcap.read()

            if not success:
                break

            if len(pred_pin_other) >= 4:
                break

            X = transform_resize(image=image)["image"]
            X = transform_normalize(image=X)["image"]
            X = torch.tensor(X, device=device).permute([2,0,1]).unsqueeze(0)

            pin_prob = model(X)

            pred_pin = torch.argmax(pin_prob, 2).cpu().squeeze().tolist()

            pred_pin_x = pred_pin[0]
            pred_pin_y = pred_pin[1]

            pin_prob = pin_prob.cpu().tolist()
            pin_prob = pin_prob[0]

            pin_x_prob = pin_prob[0][pred_pin_x]
            pin_y_prob = pin_prob[1][pred_pin_y]

            count += 1

            if pred_pin_x >= 0 and pred_pin_y <= 1:
            
                if gathering_pins == True:
                    num = number_locations[max_x, max_y]
                    max_x = 0
                    max_y = 0
                    pred_pin_other.append(num)

                gathering_pins = not gathering_pins

            if pin_x_prob >= conf and pin_y_prob >= conf and gathering_pins:
                
                if pred_pin_x > max_x:
                    max_x = pred_pin_x
                
                if pred_pin_y > max_y:
                    max_y = pred_pin_y

        vidcap.release()


        target_pin = torch.tensor(target_pin)

        pred_pin_other = torch.tensor(pred_pin_other)

        if pred_pin_other.shape[0] < 4:
            # pred_pin_other = torch.vstack([pred_pin_other, torch.tensor([0])])
            continue

        values, counts = torch.unique(target_pin[target_pin == pred_pin_other], return_counts=True)

        print(counts, target_pin, pred_pin_other)

        print("\n\n")

        print(f"Counts: {counts}")
        print(f"target_pin: {target_pin}")
        print(f"pred_pin_other: {pred_pin_other}")

        print("\n\n")


        corrects, sizes = get_corrects_and_size(target_pin, pred_pin_other)

        correct += corrects
        size += sizes

        
print(correct / size)


tensor([], dtype=torch.int64) tensor([8, 1, 8, 5]) tensor([0., 0., 0., 8.])



Counts: tensor([], dtype=torch.int64)
target_pin: tensor([8, 1, 8, 5])
pred_pin_other: tensor([0., 0., 0., 8.])



tensor([], dtype=torch.int64) tensor([9, 5, 5, 2]) tensor([0., 0., 0., 0.])



Counts: tensor([], dtype=torch.int64)
target_pin: tensor([9, 5, 5, 2])
pred_pin_other: tensor([0., 0., 0., 0.])



tensor([2]) tensor([2, 7, 2, 8]) tensor([2., 4., 2., 5.])



Counts: tensor([2])
target_pin: tensor([2, 7, 2, 8])
pred_pin_other: tensor([2., 4., 2., 5.])



tensor([], dtype=torch.int64) tensor([3, 5, 2, 6]) tensor([0., 0., 0., 0.])



Counts: tensor([], dtype=torch.int64)
target_pin: tensor([3, 5, 2, 6])
pred_pin_other: tensor([0., 0., 0., 0.])



tensor([3]) tensor([8, 1, 1, 1]) tensor([5., 1., 1., 1.])



Counts: tensor([3])
target_pin: tensor([8, 1, 1, 1])
pred_pin_other: tensor([5., 1., 1., 1.])



tensor([1]) tensor([6, 3, 2, 7]) tensor([6., 0., 0., 0.])



Counts: tensor([1])
target_pin: tensor([6

In [None]:
# model.load_state_dict(torch.load(best_model_path))

# model.eval()
# conf = 0.3

# with torch.no_grad():

#     max_x = 0
#     max_y = 0
#     count = 0

#     vidcap = cv2.VideoCapture("/home/tyler/Documents/Data/PinData/PinDataTestv2/Front_8152_103fab40-ecac-4ef0-8884-bac83ffd9328.mp4")
#     os.makedirs("./tests/", exist_ok=True)
#     success,image = vidcap.read()

#     while success:
#         vidcap.set(cv2.CAP_PROP_POS_MSEC,(count*100))
#         success,image = vidcap.read()


        

#         if not success:
#             break
        
#         image = image[200:400, 200:400, :]

#         X = transform_resize(image=image)["image"]
#         X = transform_normalize(image=X)["image"]
#         X = torch.tensor(X, device=device).permute([2,0,1]).unsqueeze(0)

#         pred = model(X)

#         pred_index = torch.argmax(pred, 2).cpu().squeeze().tolist()

#         pred = pred.cpu().tolist()
#         pred = pred[0]

#         pred_1 = pred[0][pred_index[0]]
#         pred_2 = pred[1][pred_index[1]]
        
#         if pred_1 >= conf and pred_2 >= conf:
            
#             if pred_index[0] > max_x:
#                 max_x = pred_index[0]
            
#             if pred_index[1] > max_y:
#                 max_y = pred_index[1]

#             add_grid_lines(image)

#             cv2.imwrite(f"./tests/{count}{pred_index}.jpeg", image)



#         count += 1

       

        


#     print(max_x, "max_x", max_y, "max_y")

    
    

#     # if pred_1 >= 0.6 and pred_2 >= 0.6:
#     #     text = f"\n\nConfident\n\nFile: {file_name}\nPred: {pred}\ny: {y}\npred_idx: {pred_index} y_idx: {y_index}\n\n"
#     #     print(text)
#     #     count += 1

#     # elif pred_2 >= 0.7:
#     #     text = f"\n\nConfident\n\nFile: {file_name}\nPred: {pred}\ny: {y}\npred_idx: {pred_index} y_idx: {y_index}\n\n"
#     #     print(text)

#     # else:
#     #     text = f"\n\n!NOT! Confident\n\nFile: {file_name}\nPred: {pred}\ny: {y}\npred_idx: {pred_index} y_idx: {y_index}\n\n"
#     #     print(text)