# Stage 2 Savytski

In [None]:
import os
import pandas as pd
from torchvision.io import read_image
import torch
from torch.utils.data import Dataset
from torchvision import datasets
from torchvision.transforms import ToTensor
import matplotlib.pyplot as plt


class CustomImageDataset(Dataset):
    def __init__(self, train_path):
        self.train_data = pd.read_csv(train_path)



    def __len__(self):
        return len(self.train_data)

    def __getitem__(self, idx):
        image = torch.tensor(self.train_data.iloc[idx, 1:], dtype=torch.float)
        label = self.train_data.iloc[idx, 0]
        n = torch.linalg.vector_norm(image)
        # if n:
            # image = torch.div(image, n)
        return image, label




training_data = CustomImageDataset("../train.csv")
test_data = CustomImageDataset("../test.csv")


rand_indx = torch.randint(len(training_data), size=(1,)).item()
img, label = training_data[rand_indx]

In [None]:
from torch.utils.data import DataLoader 

train_dataloader = DataLoader(training_data, batch_size=64, shuffle=True)
test_dataloader = DataLoader(test_data, batch_size=64, shuffle=True)



# Display image and label.
train_features, train_labels = next(iter(train_dataloader))
print(f"Feature batch shape: {train_features.size()}")
print(f"Labels batch shape: {train_labels.size()}")

ind = torch.randint(len(train_features.size()), size=(1,)).item()
img = torch.reshape(train_features[ind], (28, 28)) 
label = train_labels[ind]

plt.imshow(img, cmap="gray")
plt.show()
print(f"Label: {label}")

In [None]:
from torch import nn


class MLP(nn.Module):
    
    def __init__(self) -> None:
        super().__init__()
        # self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(28*28, 120),
            nn.ReLU(),
            nn.Linear(120, 84),
            nn.ReLU(),
            nn.Linear(84, 10),
        )
    
    def forward(self, x):
        # x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits
        


        
        


In [None]:
from sklearn.metrics import precision_score, recall_score
model = MLP().to("cpu")

learning_rate = 1e-3
batch_size = 64
epochs = 5
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)




def train_loop(dataloader, model, loss_fn, optimizer, l):
    size = len(dataloader.dataset)
    # Set the model to training mode - important for batch normalization and dropout layers
    # Unnecessary in this situation but added for best practices
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        # Compute prediction and loss
        pred = model(X)
        if l=="MSE":
            loss = loss_fn(pred.argmax(1).to(torch.float16), y.to(torch.float16))
        else:
            loss = loss_fn(pred, y)

        # Backpropagation
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        if batch % 100 == 0:
            loss, current = loss.item(), batch * batch_size + len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")
    





def test_loop(dataloader, model, loss_fn, loss):
    # Set the model to evaluation mode - important for batch normalization and dropout layers
    # Unnecessary in this situation but added for best practices
    model.eval()
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    test_loss, correct = 0, 0

    # Evaluating the model with torch.no_grad() ensures that no gradients are computed during test mode
    # also serves to reduce unnecessary gradient computations and memory usage for tensors with requires_grad=True
    
    # ps_arr = np.empty([0])
    recall_arr = np.empty([0])
    with torch.no_grad():
        for X, y in dataloader:
            pred = model(X)
            if loss=="MSE":
                test_loss += loss_fn(pred.argmax(1).to(torch.float16), y.to(torch.float16)).item()
            else:
                test_loss += loss_fn(pred, y).item()
            # ps_arr = np.append(ps_arr, precision_score(pred.argmax(1), y, average='micro'))
            
            recall_arr = np.append(recall_arr, recall_score(pred.argmax(1), y, average='micro'))
            correct += (pred.argmax(1)== y).type(torch.float).sum().item()            

    test_loss /= num_batches
    correct /= size
    recall = np.average(recall_arr)
    f1 = 2 / (1/recall + 1/correct)
    print(f"Error: \n Accuracy: {(100*correct):>0.2f}%, Avg loss: {test_loss:>.2f}, Recall: {(100*recall):>0.2f}%, F1: {(100*f1):>0.2f}%\n")
    return (correct, recall, f1)
    





In [None]:
from torchviz import make_dot



for X, y in test_dataloader:
    print(make_dot(model(X).mean(), params=dict(model.named_parameters())))
    break


In [None]:
import matplotlib.pyplot as plt
import numpy as np

def run(epochs, loss_fn, optimizer, loss="Cross Entropy"):
    # m = MLP().to("cpu")
    test_metrics, train_metrics = np.empty([0, 3]), np.empty([0, 3])
    for t in range(epochs):
        print(f"Epoch {t+1}\n-------------------------------")
        train_loop(train_dataloader, model, loss_fn, optimizer, loss)
        print("Train:")
        unp = test_loop(train_dataloader, model, loss_fn, loss)
        train_metrics = np.append(train_metrics, np.array([unp]), axis=0)
        print("Test:")
        unp = test_loop(test_dataloader, model, loss_fn, loss)
        test_metrics = np.append(test_metrics, np.array([unp]), axis=0)
    print(f"Averages: ------------------ \n ")
    print(f"Training ------------------- \n Precision: {(np.average(train_metrics[:, 0])*100):>0.2f}, Recall: {(np.average(train_metrics[:, 1])*100):>0.2f} \
        f1: {(100*np.average(train_metrics[:, 2])):>0.2f}")
    print(f"Testing ------------------- \n Precision: {(np.average(test_metrics[:, 0])*100):>0.2f}, Recall: {(100*np.average(test_metrics[:, 1])):>0.2f} \
        f1: {(100*np.average(test_metrics[:, 2])):>0.2f}")
    
    plt.plot(range(epochs), train_metrics[:,0], color="orange")
    plt.plot(range(epochs), test_metrics[:, 0])
    plt.ylim(bottom=0, top=1.5)
    
    
# run(10, loss_fn, optimizer)
        
        

In [None]:

# loss_fn = nn.CrossEntropyLoss()
# optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
# run(10, loss_fn,optimizer, "M")

In [None]:
# optimizer = torch.optim.Adadelta(model.parameters(), lr=learning_rate)
# run(10, loss_fn,optimizer, "M")

In [None]:
optimizer = torch.optim.Rprop(model.parameters(), lr=learning_rate)
run(10, loss_fn,optimizer, "M")