In [3]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter

In [12]:
# Using GPU if available
device = torch.device("cuda:2" if torch.cuda.is_available() else "cpu")
device

device(type='cuda', index=2)

In [116]:
# Defining Constants
BATCH_SIZE = 64
EPOCHS = 20
LEARNING_RATE = 0.001
INPUT_SIZE = 28 * 28
HIDDEN_SIZE = [32,16]
OUTPUT_SIZE = 10
CHECKPOINT_PATH = "./checkpoints"
LOG_DIR = "./logs"

In [47]:
# Creating Directories if not exists
os.makedirs(CHECKPOINT_PATH, exist_ok=True)
os.makedirs(LOG_DIR, exist_ok=True)

In [30]:
# Tensorboard Writer
writer_train = SummaryWriter(log_dir=LOG_DIR+"/train")
writer_test = SummaryWriter(log_dir=LOG_DIR+"/test")

In [62]:
# Data Preparation
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

train_dataset = datasets.MNIST(root="./data", train=True, transform=transform, download=False)
test_dataset = datasets.MNIST(root="./data", train=False, transform=transform, download=False)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

In [117]:
# Model Definition
class MLP(nn.Module):

    def __init__(self, input_size, hidden_sizes, output_size):
        super().__init__()

        self.model = nn.Sequential(
            nn.Linear(input_size, hidden_sizes[0]),
            nn.LeakyReLU(),
            nn.Dropout(0.4),
            nn.Linear(hidden_sizes[0], hidden_sizes[1]),
            nn.LeakyReLU(),
            nn.Dropout(0.4),
            nn.Linear(hidden_sizes[1], output_size),
            nn.Softmax(dim=1)
        )

    def forward(self, x):
        return self.model(x)

In [118]:
# Model Initialization
model = MLP(INPUT_SIZE, HIDDEN_SIZE, OUTPUT_SIZE).to(device)

In [119]:
# Loss and Optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr = LEARNING_RATE)

In [96]:
# Checkpoint Backup

def save_checkpoints(epoch, model, optimizer, path):
    checkpoint = {
        "epoch": epoch,
        "model_state_dict": model.state_dict(),
        "optimizer_state_dict": optimizer.state_dict(),
    }
    torch.save(checkpoint, path)

def load_checkpoint(path):
    if os.path.exists(path):
        checkpoint = torch.load(path)
        model.load_state_dict(checkpoint["model_state_dict"])
        optimizer.load_state_dict(checkpoint["optimizer_state_dict"])
        start_epoch = checkpoint["epoch"] + 1
        print(f"Resuming from epoch {start_epoch}...")
        return start_epoch
    return 1 # Start from Epoch 1

In [120]:
def evaluate_model(model, test_loader, criterion):

    model.eval()
    correct = 0
    total = 0
    total_loss = 0

    with torch.no_grad():

        for data, target in test_loader:
            data, target = data.view(-1, INPUT_SIZE).to(device), target.to(device)
            output = model(data)

            total_loss += criterion(output, target).item()
            _, predicted = torch.max(output, 1)
            total += target.size(0)
            correct += (predicted == target).sum().item()

        return 100 * correct / total, total_loss / len(test_loader)

In [122]:
# Training Loop
def train_model(model, criterion, optimizer, train_loader, test_loader, epochs, device, checkpoint_path):
    # model.train()
    
    start_epoch = load_checkpoint(os.path.join(checkpoint_path, "mlp_checkpoint.pth"))

    for epoch in range(start_epoch, epochs + 1):

        total_loss = 0
        total_train = 0
        correct_train = 0

        for batch_idx, (data, target) in enumerate(train_loader):

            model.train()
            
            data, target = data.reshape(-1, INPUT_SIZE).to(device), target.to(device)

            optimizer.zero_grad()

            output= model(data)
            
            loss = criterion(output, target)
            total_loss += loss.item()

            loss.backward()

            optimizer.step()

            _, predicted = torch.max(output, 1)
            total_train += target.size(0)
            correct_train += (predicted == target).sum().item()
        
        train_loss = total_loss / len(train_loader)
        train_accuracy = 100 * correct_train / total_train

        test_accuracy, test_loss = evaluate_model(model, test_loader, criterion)

        writer_train.add_scalar("Accuracy", train_accuracy, epoch)
        writer_train.add_scalar("Loss", train_loss, epoch)

        writer_test.add_scalar("Accuracy", test_accuracy, epoch)
        writer_test.add_scalar("Loss", test_loss, epoch)

        print(f"Epoch {epoch}/{epochs} : Train Loss {train_loss:.4f}, Train Accuracy {train_accuracy:.2f}%, Test Loss {test_loss:.4f}, Test Accuracy {test_accuracy:.2f}%")
        # print(f"Epoch {epoch}/{epochs} : Train Loss {train_loss:.4f}, Train Accuracy {train_accuracy:.2f}%")

        save_checkpoints(epoch, model,optimizer, os.path.join(checkpoint_path, "mlp_checkpoint.pth"))


In [123]:
train_model(model, criterion, optimizer, train_loader, test_loader, EPOCHS, device, CHECKPOINT_PATH)
writer_train.close()
writer_test.close()

Epoch 1/20 : Train Loss 1.9476, Train Accuracy 52.70%, Test Loss 1.6379, Test Accuracy 84.59%
Epoch 2/20 : Train Loss 1.7643, Train Accuracy 71.20%, Test Loss 1.5789, Test Accuracy 88.77%
Epoch 3/20 : Train Loss 1.7186, Train Accuracy 75.50%, Test Loss 1.5739, Test Accuracy 89.16%
Epoch 4/20 : Train Loss 1.6979, Train Accuracy 77.32%, Test Loss 1.5620, Test Accuracy 90.14%
Epoch 5/20 : Train Loss 1.6844, Train Accuracy 78.70%, Test Loss 1.5637, Test Accuracy 89.88%
Epoch 6/20 : Train Loss 1.6747, Train Accuracy 79.41%, Test Loss 1.5612, Test Accuracy 90.09%


KeyboardInterrupt: 