<div style="text-align: center;">
    <h1><strong>PyTorch Fundamentals -- My Notes</strong></h1>
</div>

### Imports

In [None]:
# Import PyTorch
import torch

import torch.nn.functional as F
from torch import nn # Neural network module with loss functions, layers and container modules
from torch.utils.data import DataLoader # Load and batch data using this
from torch.optim.lr_scheduler import ReduceLROnPlateau # Scheduler

# Import torchvision
import torchvision

from torchvision.datasets import ImageFolder # to use custom datasets
from torchvision import transforms # get functions for manipulating your vision data here
from torchvision.transforms import ToTensor # convert a PIL image of numpy array to tensor using this

# Import matplotlib for visualization
import matplotlib.pyplot as plt

# Other imports
import os
import numpy as np
from pathlib import Path
from timeit import default_timer as timer
import csv
from sklearn.metrics import confusion_matrix

# Check versions
print(f"PyTorch version: {torch.__version__}\ntorchvision version: {torchvision.__version__}")

# Check if CUDA is available
print(f"\nCUDA Available: {torch.cuda.is_available()}")

# Set device
device = "cuda" if torch.cuda.is_available() else "cpu"
gpu = torch.cuda.get_device_name(0)
print(f"GPU: {gpu}")


### Vast

In [None]:
# !pip install matplotlib
# !pip install scikit-learn

# import zipfile

# zip_path = r"EuroSATsplit.zip"
# extract_path = r"data/EuroSATsplit"

# with zipfile.ZipFile(zip_path, "r") as zip_ref:
#     zip_ref.extractall(extract_path)

# print(os.listdir(extract_path))

### Data transformation & Inspection

In [None]:
# Define transformation
transform = transforms.Compose([
    transforms.Resize((64, 64)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.3444, 0.3803, 0.4078], std=[0.2027, 0.1369, 0.1155])
])

# Define paths
TRAIN_PATH = r"data/EuroSATsplit/train"
TEST_PATH = r"data/EuroSATsplit/test"

# Load a custom dataset
train_data = ImageFolder(root=TRAIN_PATH, transform=transform)
test_data = ImageFolder(root=TEST_PATH, transform=transform)

# See what the dataset looks like
class_names = train_data.classes
class_len = len(train_data.classes)
train_len = len(train_data)
test_len = len(test_data)
print(f"Train data length: {train_len}\nTest data length: {test_len}\n\nClass names ({class_len}): {class_names}\n")

In [None]:
# Divide the data into batches
BATCH_SIZE = 32
train_dataloader = DataLoader(dataset=train_data, batch_size=BATCH_SIZE, shuffle=True)
test_dataloader = DataLoader(dataset=test_data, batch_size=BATCH_SIZE, shuffle=False)

# Inspect data
print(f"Length of train_dataloader: {len(train_dataloader)} batches of {BATCH_SIZE}")
print(f"Length of test_dataloader: {len(test_dataloader)} batches of {BATCH_SIZE}")
train_features_batch, train_labels_batch = next(iter(train_dataloader))
print(f"\nShape of train_features_batch: {train_features_batch.shape}\n-> [batch_size, color_channels, height, width]")
print(f"\nShape of train_labels_batch: {train_labels_batch.shape}\n-> [batch_size]")

### Show 5 samples

In [None]:
rand_ids = torch.randint(0, len(train_features_batch), size=[5]).tolist()

fig, axes = plt.subplots(1, 5, figsize=(15, 5))

for i, idx in enumerate(rand_ids):
    img, label = train_features_batch[idx], train_labels_batch[idx]

    axes[i].imshow(img.permute(1, 2, 0).numpy())
    axes[i].set_title(class_names[label])
    axes[i].axis("off")

plt.show()

### Model Initialization

#### Custom model

In [None]:
# # Create a baseline model
# class BreedIDModel(nn.Module):
#     def __init__(self, input_shape: int, hidden_units: int, output_shape: int):
#         super().__init__()
#         self.layer_stack = nn.Sequential(nn.Flatten(),
#                                          nn.Linear(in_features=input_shape,
#                                                    out_features=hidden_units),
#                                          nn.Linear(in_features=hidden_units,
#                                                    out_features=output_shape))
#     def forward(self, x):
#         return self.layer_stack(x)

# # See what the model looks like
# h = 224
# w = 224
# c = 1
# model = BreedIDModel(input_shape=h*w*c,
#                          hidden_units=10,
#                          output_shape=len(class_names))
# model

In [None]:
# # Load EfficientNet model
# model = torchvision.models.efficientnet_b0(weights=torchvision.models.EfficientNet_B0_Weights.DEFAULT).to(device)

# # Parameter training setting
# for param in model.features.parameters():
#     param.requires_grad = True

# # Define classifier
# model.classifier = torch.nn.Sequential(
#     torch.nn.Linear(in_features=model.classifier[1].in_features,
#                     out_features=class_len,
#                     bias=True)
#                     ).to(device)

#### DenseNet

In [None]:
# Load DenseNet model
model = torchvision.models.densenet169(weights=torchvision.models.DenseNet169_Weights.DEFAULT).to(device)

# Parameter training setting
for param in model.features.parameters():
    param.requires_grad = True

# Define classifier
model.classifier = torch.nn.Sequential(
    torch.nn.Linear(in_features=model.classifier.in_features,
                    out_features=class_len,
                    bias=True)
                    ).to(device)

#### Move model to device and inspect

In [None]:
model.to(device)
#model

### Initialize Accuracy Function & Optimizer

In [None]:
# Accuracy function
def accuracy_fn(y_true, y_pred):
    correct = torch.eq(y_true, y_pred).sum().item()
    acc = (correct / len(y_pred)) * 100
    return acc

# Setup loss function, optimizer and scheduler
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=2, threshold=0.01)

### Define colors

In [None]:
# ANSI color codes
RED = "\033[91m"
GREEN = "\033[92m"
BLUE = "\033[94m"
YELLOW = "\033[93m"
CYAN = "\033[96m"
RESET = "\033[0m"  # Reset color

# Define color variations
MODEL_COLOR = CYAN if model._get_name == "EfficientNet" else BLUE
PARAM_COLOR = GREEN if param.requires_grad else RED
BATCH_COLOR = RED if BATCH_SIZE >= 64 else YELLOW
GPU_COLOR = GREEN if device == 'cuda' else RED

### Define logging functions

In [None]:
def clean_csv(filenames):
    for filename in filenames:
        with open(f"csv/{filename}.csv", "w"): pass

def log_csv(filename, data):
    with open(f"csv/{filename}.csv", "a", newline="") as file:
        writer = csv.writer(file)
        writer.writerow(data)

### Training

In [None]:
# Create directories for outputs
dirs = ["csv", "txt", "confusion-matrices", "roc-curves", "models"]
for dir in dirs:
    os.makedirs(dir, exist_ok=True)

# Clean csv's
clean_csv(["accuracy", "train_loss", "test_loss"])

# Print training info
print(f"TRAINING {MODEL_COLOR}{model._get_name()}{RESET}\n"
      f"PARAMETER TRAINING {PARAM_COLOR}{param.requires_grad}{RESET}\n"
      f"BATCHES OF {BATCH_COLOR}{BATCH_SIZE}{RESET}\n"
      f"ON {GPU_COLOR}{gpu}{RESET}\n")

# Set num of epochs
epochs = 10

# Training loop
for epoch in range(1, epochs+1):
    # Reset timer
    start_time = timer()

    # Reset train loss
    train_loss, best_acc = 0, 0

    # Switch to train mode
    model.train()

    # X - image, y - label
    for batch, (X, y) in enumerate(train_dataloader, start=1):
        # Move data to device
        X, y = X.to(device), y.to(device)

        # 1. Forward pass
        y_pred = model(X)

        # 2. Calc loss per batch
        loss = loss_fn(y_pred, y)
        train_loss += loss.item()

        # 3. Zero grad the optimizer
        optimizer.zero_grad()

        # 4. Loss backward
        loss.backward()

        # 5. Optimizer step
        optimizer.step()

        # Output progress
        if batch % 10 == 0 or batch == len(train_dataloader):
            end_time = timer()
            total_time = end_time - start_time
            samples = batch * len(X)
            total_samples = len(train_dataloader.dataset)
            seconds_per_batch = total_time / batch
            eta = max(0, (len(train_dataloader) - batch) * seconds_per_batch)
            eta_h = int(eta // 3600)
            eta_m = int((eta % 3600) // 60)
            eta_s = int(eta % 60)
            print(f"{CYAN}Epoch {epoch}:{RESET} looked at {GREEN}{samples}/{total_samples}{RESET} samples "
                    f"({YELLOW}{total_time:.2f}{RESET} seconds taken, {BLUE}{seconds_per_batch:.2f}{RESET} s/batch, "
                    f"ETA: {RED}{eta_h}h{RESET} {GREEN}{eta_m}m{RESET} {BLUE}{eta_s}s{RESET})", end="\r")

    # Divide total train loss by length of train dataloader
    train_loss /= len(train_dataloader)

    # Reset test loss and acc
    test_loss, total_samples, total_correct = 0, 0, 0
    predictions, labels = [], []
    roc_labels, roc_probabilities = [], []

    # Switch to evaluation mode
    model.eval()

    # Testing loop
    with torch.inference_mode():
        for X_test, y_test in test_dataloader:
            # Move data to device
            X_test, y_test = X_test.to(device), y_test.to(device)

            # 1. Forward pass
            test_pred = model(X_test)

            # Calculate ROC-curve data
            probabilities = torch.softmax(test_pred, dim=1)  # Shape: (batch_size, num_classes)
            roc_labels.extend(y_test.cpu().numpy())  # True labels
            roc_probabilities.extend(probabilities.cpu().numpy())  # Probabilities for all classes

            # 2. Calculate loss
            test_loss += loss_fn(test_pred, y_test).item()

            # 3. Count correct predictions
            correct_predictions = torch.eq(y_test, test_pred.argmax(dim=1)).sum().item()
            total_correct += correct_predictions
            total_samples += len(y_test)

            # 4. Store predictions and labels for confusion matrix
            predictions.extend(test_pred.argmax(dim=1).cpu().numpy())
            labels.extend(y_test.cpu().numpy())

        # 5. Calc test loss avg per batch
        test_loss /= len(test_dataloader)
        test_acc = (total_correct / total_samples)

    # Save best weights
    if test_acc > best_acc:
        best_acc = test_acc
        torch.save(model.state_dict(), f"models/{model._get_name()}_{param.requires_grad}_{(test_acc*100):.2f}_weights.pt")

    # Update scheduler
    scheduler.step(test_loss)

    # Print & Log results
    print(f"\nTrain loss = {train_loss:.2f}\nTest loss = {test_loss:.2f}\nTest accuracy = {GREEN}{(test_acc*100):.2f}%{RESET}\n")
    log_csv("accuracy", [epoch, round(test_acc, 4)])
    log_csv("train_loss", [epoch, round(train_loss, 2)])
    log_csv("test_loss", [epoch, round(test_loss, 2)])

    # Compute confusion matrix and save it
    cm = confusion_matrix(labels, predictions)
    np.save(f"confusion-matrices/cm{epoch}.npy", cm)

    # Convert to numpy arrays and save ROC-curve data
    roc_labels = np.array(roc_labels)
    roc_probabilities = np.array(roc_probabilities)
    np.save(f"roc-curves/roc_labels{epoch}.npy", roc_labels)
    np.save(f"roc-curves/roc_probabilities{epoch}.npy", roc_probabilities)