## PyTorch Computer Vision

* `torchvision`
* `torchvision.datasets` - get datasets and dataloading functions here
* `torchvision.models` - get pretrained models that you can leverage  for your own problems
* `torchvision.transforms` - functions for manipulating  your vision data to be suitable for the pyTorch frameworks
* `torch.utils.data.Dataset` - Base dataset class in pytorch
* `torch.utils.data.Dataloader` - Creates an iterable over a dataset 

In [None]:
import torch
import torch.nn as nn
import torchvision
from torchvision import datasets, transforms
from torchvision.transforms import ToTensor
import numpy as np
import matplotlib.pyplot as plt
from tqdm.auto import tqdm

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

## Dataset and Dataloader

In [None]:
train_data = torchvision.datasets.FashionMNIST(
    root="data",
    train=True,
    download=True,
    transform=ToTensor(),
    target_transform=None
)

test_data = torchvision.datasets.FashionMNIST(
    root="data",
    train=False,
    download=True,
    transform=ToTensor(),
    target_transform=None
)

In [None]:
classes = train_data.classes
class_to_idx = train_data.class_to_idx
classes, class_to_idx

In [None]:
from torch.utils.data import DataLoader
trainDataloader = DataLoader(train_data, batch_size = 32, shuffle = True) # it is important to shuffle the data for training
testDataLoader = DataLoader(test_data, batch_size = 32) # no need to shuffle the data for testing

In [None]:
train_features_batch, train_labels_batch = next(iter(trainDataloader))
train_features_batch.shape, train_labels_batch.shape

## Exploring the Data

In [None]:
random_idx = np.random.randint(0, 32)
img = train_features_batch[random_idx].squeeze()
label = train_labels_batch[random_idx]
plt.imshow(img, cmap='gray')
plt.show()
print(classes[label])

#### Baseline Model - Flatten

In [None]:
flatten_model = nn.Flatten()
x = train_features_batch[0]
x.shape

In [None]:
output = flatten_model(x).squeeze()
output.shape

In [None]:
class FashnionMNISTV0(nn.Module):
    def __init__(self, input_shape:int, hidden_units: int, output_shape: int):
        super().__init__()
        self.layer_stack = nn.Sequential(
            nn.Flatten(),
            nn.Linear(input_shape, hidden_units),
            nn.ReLU(),
            nn.Linear(hidden_units, output_shape)
        )
    
    def forward(self, x):
        return self.layer_stack(x)

#### Dummy Run

In [None]:
model_0 = FashnionMNISTV0(28*28, 128, 10).to("cpu")

In [None]:
dummy_x = model_0(train_features_batch)

In [None]:
dummy_x.shape

In [None]:
dummy_x[0]

In [None]:
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model_0.parameters(), lr=1e-3)

#### Establish Accuracy function

In [None]:
def accuracy_fn(y_true, y_pred):
    correct = torch.eq(y_true, y_pred).sum().item()
    acc = (correct / len(y_pred)) * 100
    return acc

#### Create a function to time our experiments

In [None]:
from timeit import default_timer as timer

def print_train_time(start: float, end: float, device: torch.device = None):
    total_time = end - start
    print(f"Train time on {device}: {total_time:.2f} seconds")

### Training and Testing Loop

* Loop through the epochs
* Loop through the batches in each epoch
* Perform training, loss per batch
* Perform the testing for each batch

In [None]:
# 

# torch.manual_seed(42)

# EPOCHS = 3

# start_time = timer()

# for epoch in tqdm(range(EPOCHS)):
#     print(f"Epoch {epoch+1}\n-------------------------------")

#     ### Training

#     training_loss = 0
#     training_accuracy = 0

#     for batch, (X, y) in enumerate(trainDataloader, start = 1):
#         model_0.train()

#         y_pred = model_0(X)
#         loss = loss_fn(y_pred, y)
#         training_loss += loss.item()

#         training_accuracy += accuracy_fn(y, y_pred.argmax(1))

#         optimizer.zero_grad()
#         loss.backward()
#         optimizer.step()

#         if batch % 400 == 0:
#             acc = accuracy_fn(y, y_pred.argmax(1))
#             print(f"Batch {batch} Loss {loss.item()} Accuracy {acc}")

#     training_loss /= len(trainDataloader)
#     training_accuracy /= len(trainDataloader)

#     ### Testing

#     testing_loss = 0
#     testing_accuracy = 0

#     for X, y in testDataLoader:
#         model_0.eval()

#         with torch.inference_mode():
#             y_pred = model_0(X)
#             loss = loss_fn(y_pred, y)
#             testing_loss += loss.item()

#             testing_accuracy += accuracy_fn(y, y_pred.argmax(1))

#     testing_loss /= len(testDataLoader)
#     testing_accuracy /= len(testDataLoader)

#     print(f"Epoch {epoch+1} Training Loss: {training_loss:.4f} | Training Accuracy: {training_accuracy:.2f} |  Testing Loss: {testing_loss:.4f} | Testing Accuracy: {testing_accuracy:.2f}%")



# end_time = timer()
# print("----------------------------------")
# print_train_time(start_time, end_time, next(model_0.parameters()).device)

In [None]:
def training_loop(epochs=5,
                  model=model_0,
                  loss_fn=loss_fn,
                  accuracy_fn=accuracy_fn,
                  optimizer=optimizer,
                  trainDataLoader=trainDataloader,
                  device=device):

    train_loss = []
    train_accuracy = []

    model.to(device)  # Ensure the model is on the correct device

    for epoch in range(epochs):
        print(f"Epoch {epoch+1}\n-------------------------------")

        model.train()  # Set the model to training mode
        training_loss = 0
        training_accuracy = 0

        for batch, (X, y) in enumerate(trainDataLoader, start=1):
            # Move data and target to device
            X, y = X.to(device), y.to(device)
            
            # Zero the gradients
            optimizer.zero_grad()

            # Forward pass
            y_pred = model(X)
            
            # Compute loss
            loss = loss_fn(y_pred, y)
            training_loss += loss.item()
            
            # Compute accuracy
            training_accuracy += accuracy_fn(y, y_pred.argmax(dim=1))
            
            # Backward pass
            loss.backward()
            optimizer.step()

            if batch % 400 == 0:
                print(f"Done with {batch} batches")

        # Calculate average loss and accuracy for the epoch
        training_loss /= len(trainDataLoader)
        training_accuracy /= len(trainDataLoader)

        # Store loss and accuracy
        train_loss.append(training_loss)
        train_accuracy.append(training_accuracy)

        print(f"Epoch {epoch+1} Training Loss: {training_loss:.4f} | Training Accuracy: {training_accuracy:.2f}")
    
    return train_loss, train_accuracy


In [None]:
# training_loop()

In [None]:
def testing_loop(model = model_0,
                 testDataLoader = testDataLoader,
                 loss_fn = loss_fn,
                 accuracy_fn = accuracy_fn
                 ):
    
    model.to(device)
    model.eval()

    with torch.inference_mode():
        testing_loss = 0
        testing_accuracy = 0

        for X, y in testDataLoader:

            X, y = X.to(device), y.to(device)

            y_pred = model(X)
            loss = loss_fn(y_pred, y)
            testing_loss += loss.item()

            testing_accuracy += accuracy_fn(y, y_pred.argmax(1))

        testing_loss /= len(testDataLoader)
        testing_accuracy /= len(testDataLoader)

        print(f"Testing Loss: {testing_loss:.4f} | Testing Accuracy: {testing_accuracy:.2f}%")

In [None]:
# testing_loop()

## Creating a CNN

In [None]:
class FashionMNISTV1(nn.Module):
    def __init__(self, input_shape:int, hidden_units: int, output_shape: int):
        super().__init__()
        self.conv_block1 = nn.Sequential(
            nn.Conv2d(in_channels = 1, out_channels=32, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.Conv2d(in_channels = 32, out_channels=64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.conv_block2 = nn.Sequential(
            nn.Conv2d(in_channels = 64, out_channels=128, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.Conv2d(in_channels = 128, out_channels=256, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(7*7*256, hidden_units),
            nn.ReLU(),
            nn.Linear(hidden_units, output_shape),
            nn.Softmax(dim=1)
        )
    
    def forward(self, x):
        X = self.conv_block1(x)
        X = self.conv_block2(X)
        return self.classifier(X)

In [None]:
model_1 = FashionMNISTV1(28*28, 128, 10).to(device)

In [None]:
training_loop(model=model_1)