In [19]:
import torch
import torchvision
print("PyTorch version: ", torch.__version__)
print("torchvision version: ", torchvision.__version__)
import torch
import numpy as np 

PyTorch version:  2.0.0+cu118
torchvision version:  0.15.1+cu118


In [20]:
tensor = torch.rand(3, 4)
if torch.cuda.is_available():
    tensor = tensor.to('cuda')

print(f"Device: {tensor.device}")

Device: cpu


In [21]:
#@title Define some hyper-parameters

data_path = 'data/fashion' #@param {type: 'string'}
batch_size = 1024     #@param {type: 'number'}

log_dir = "./runs/lab_2" #@param {type: 'string'}
device = 'cpu'  #@param ['cpu', 'cuda']

learning_rate = 0.001 #@param {type: 'number'}
epochs = 5 #@param {type: 'number'}

# Create training, validation and testing data

In [22]:
!mkdir -p data
!mkdir -p data/fashion

In [23]:
from data import mnist_reader
from sklearn.model_selection import train_test_split
X_train, y_train = mnist_reader.load_mnist(data_path, kind='train')
test_x, test_y = mnist_reader.load_mnist(data_path, kind='t10k')
train_x, val_x, train_y, val_y = train_test_split(X_train, y_train, test_size=0.3, random_state=1, shuffle=True, stratify=None) # training validation set split
        
# test_x = torch.from_numpy(test_x)
# test_y = torch.from_numpy(test_y)
# train_x = torch.from_numpy(train_x)
# train_y = torch.from_numpy(train_y)
# val_x = torch.from_numpy(val_x)
# val_y = torch.from_numpy(val_y)

# Define Model

In [24]:
import torch.nn as nn   
import torch.nn.functional as F

class MyModel(nn.Module):  # inherit from nn.Module class
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=32, kernel_size=5, stride=1, padding=2) # stride means how many pixels need to move
        self.relu = nn.ReLU()
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1) # outchannel in conv1 is the inchannel in conv2
        self.relu = nn.ReLU()
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.fc1 = nn.Linear(in_features=64*7*7, out_features=1024) 
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(in_features=1024, out_features=10)

        
    def forward(self, x):
        # x shape [batch_size, channel, height, width]  [B, 3, 32, 32]
        x = self.conv1(x) 
        x = F.relu(x)
        x = self.pool1(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = self.pool2(x) #[1024, 64, 7, 7]
        x = x.view(-1, 64 * 7 * 7)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.fc2(x)
        return x

model = MyModel() # instantiate a model
print(model)

MyModel(
  (conv1): Conv2d(1, 32, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
  (relu): ReLU()
  (pool1): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (pool2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (fc1): Linear(in_features=3136, out_features=1024, bias=True)
  (fc2): Linear(in_features=1024, out_features=10, bias=True)
)


In [25]:
import torch.nn as nn   
import torch.nn.functional as F

class MyModel2(nn.Module):  
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=64, kernel_size=5, stride=1, padding=2)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu1 = nn.ReLU()
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv2 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(128)
        self.relu2 = nn.ReLU()
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.fc1 = nn.Linear(in_features=128*7*7, out_features=1024)
        self.dropout1 = nn.Dropout(0.5)
        self.relu3 = nn.ReLU()
        self.fc2 = nn.Linear(in_features=1024, out_features=512)
        self.dropout2 = nn.Dropout(0.5)
        self.relu4 = nn.ReLU()
        self.fc3 = nn.Linear(in_features=512, out_features=10)

    def forward(self, x):
        x = self.conv1(x) 
        x = self.bn1(x)
        x = self.relu1(x)
        x = self.pool1(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu2(x)
        x = self.pool2(x) 
        x = x.view(-1, 128 * 7 * 7)
        x = self.fc1(x)
        x = self.dropout1(x)
        x = self.relu3(x)
        x = self.fc2(x)
        x = self.dropout2(x)
        x = self.relu4(x)
        x = self.fc3(x)
        return x

model = MyModel2() # instantiate a model

In [26]:
#@title Train Loop
def train_loop(dataloader, model, loss_fn, optimizer, device, epoch): 

    print("training data...")

    running_loss = 0.0
    total_loss = 0.0
    correct, total = 0, 0

    # Get a batch of training data from the DataLoader
    for batch, data in enumerate(dataloader):
        # Every data instance is an image + label pair
        img, label = data

        # Transfer data to target device
        img = img.to(device)
        label = label.to(device)

        # Zero your gradients for every batch
        optimizer.zero_grad()

        # Compute prediction for this batch
        logit = model(img)

        # compute the loss and its gradients
        loss = loss_fn(logit, label)

        # Calculate the index of maximum logit as the predicted label
        prob = F.softmax(logit, dim=1)
        pred = prob.argmax(dim=1)
        # record correct predictions
        correct += (pred == label).type(torch.float).sum().item()
        total += label.size(0)


        # Backpropagation
        loss.backward()

        # update the parameters according to gradients
        optimizer.step()

        # Gather data and report
        running_loss += loss.item()     # ! Don't forget to use .item() to retrieve loss value  
        total_loss += loss.item()
        # report every 100 iterations
        if batch % 100 == 99:
            print(' epoch {} loss: {:.4f}'.format(epoch+1, running_loss / 100))
            running_loss = 0.0

    train_loss = total_loss / (batch+1)
    accuracy = correct / total

    return train_loss, accuracy

In [27]:
#@title Evaluation Loop
def evaluate_loop(dataloader, model, loss_fn, device): 

    print("evaluating data...")
    # Get number of batches
    #num_batches = len(dataloader)

    test_loss, correct, total = 0, 0, 0
    
    # Context-manager that disabled gradient calculation.
    with torch.no_grad():
        for data in dataloader:
            # Every data instance is an image + label pair
            img, label = data
            # Transfer data to target device
            img = img.to(device)
            label = label.to(device)

            # Compute prediction for this batch
            logit = model(img)

            # compute the loss
            test_loss += loss_fn(logit, label).item()    # ! Don't forget .item() again!!!

            # Calculate the index of maximum logit as the predicted label
            prob = F.softmax(logit, dim=1)
            pred = prob.argmax(dim=1)
            # record correct predictions
            correct += (pred == label).type(torch.float).sum().item()
            total += label.size(0)
    # Gather data and report
    test_loss /= 64
    accuracy = correct / total
    print("Test Error: \n   Accuracy: {:.2f}, Avg loss: {:.4f} \n".format(100*accuracy, test_loss))
    
    return test_loss, accuracy

# dataset

In [28]:
import os
import pickle
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import Dataset
from PIL import Image
from sklearn.model_selection import train_test_split
from data import mnist_reader

labels_map = {
    0: "T-shirt/top",
    1: "Trouser",
    2: "Pullover",
    3: "Dress",
    4: "Coat",
    5: "Sandal",
    6: "Shirt",
    7: "Sneaker",
    8: "Bag",
    9: "Ankle boot",
}

class CustomDataset(Dataset):
    def __init__(self, image, label, transform):
        super().__init__()

        self.image = image
        self.label = label

        self.image = self.image.reshape(-1, 28, 28)
        self.transform = transform
    
    def __len__(self):
        return len(self.label)

    def __getitem__(self, idx):
        image = self.image[idx]
        label = self.label[idx]
        if self.transform:
            image = self.transform(Image.fromarray(image))
        label = torch.tensor(label).long()
        return image, label 

# Training

In [None]:
from PIL import Image
from torchvision import transforms
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter

# log some hyper-parameters
print("Batch size: ", batch_size)
print("Learning rate: ", learning_rate)
print("Total epochs: ", epochs)
# ******************* Dataset *******************
train_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(0.5, 0.5), # normalize the data to the range (-1, 1)
    transforms.RandomRotation(20),
    transforms.RandomVerticalFlip(0.5),
    transforms.RandomHorizontalFlip(0.5), # the image will be flipped with 50% probability left and right
    transforms.Pad(padding=4, fill=0, padding_mode='constant'),  # zero padded 4 pixels  
    transforms.RandomCrop(size=28), #randomly crop 28x28 as input
    ])

# We should not do any data augmentation for evaluation.
evaluate_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(0.5, 0.5),
    ])

train_dataset = CustomDataset(train_x, train_y, transform=train_transform)
valid_dataset = CustomDataset(val_x, val_y, transform=evaluate_transform)
test_dataset = CustomDataset(test_x, test_y, transform=evaluate_transform)

train_loader = DataLoader(
    dataset=train_dataset,
    batch_size=batch_size,
    shuffle=True,
    )
valid_loader = DataLoader(
    dataset=valid_dataset,
    batch_size=batch_size,
    shuffle=False,
    )
test_loader = DataLoader(
    dataset=test_dataset,
    batch_size=batch_size,
    shuffle=False,
    )
print("Set up dataset ...")
print("Train dataset contains: %d samples" % len(train_dataset))
print("Validation dataset contains: %d samples" % len(valid_dataset))
print("Test dataset contains: %d samples" % len(test_dataset))

# ******************* TensorBoard *******************
writer = SummaryWriter('runs/lab_2')


# ******************************************************************************** Model ******************************************************************************************************
model = MyModel2()
# ******************************************************************************** Model ******************************************************************************************************

# save model graph to tensorboard
imgs, labels = next(iter(train_loader)) # Get an example data batch
writer.add_graph(model, imgs)

# move model to target device
model.to(device)

# ******************* Loss & Optimizer *******************
loss_fn = nn.CrossEntropyLoss()
#optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, betas=(0.9, 0.999))
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# ******************* Optimization *******************
best_accuracy = 0.
for epoch in range(1, epochs+1):
    print("current epoch is: " , epoch)
    # train loop
    # set the module in training mode.
    model.train()
    train_loss, train_accuracy = train_loop(train_loader, model, loss_fn, optimizer, device, epoch)

    # save to tensorboard
    writer.add_scalar('train_loss vs epochs', train_loss, epoch)
    writer.add_scalar('train_accuracy vs epochs', train_loss, epoch)
    
    # save model weights
    save_path = 'ckpt_{:04d}.pth'.format(epoch+1)
    torch.save(model.state_dict(), save_path)

    # validation loop
    # set the module in evaluation mode.
    model.eval()
    valid_loss, valid_accuracy = evaluate_loop(valid_loader, model, loss_fn, device)
    # save to tensorboard
    writer.add_scalar('valid_loss vs epochs', valid_loss, epoch)
    writer.add_scalar('valid_accuracy vs epochs', valid_accuracy, epoch)
    if valid_accuracy > best_accuracy:    # save the model with best validation accuracy
        save_path = 'ckpt_best.pth'
        torch.save(model.state_dict(), save_path)
        best_accuracy = valid_accuracy

writer.close()
print("Finished Training.")

# load the parameters to the same device
state_dict = torch.load('ckpt_best.pth', map_location=device)
model.load_state_dict(state_dict)   # load parameters to model instance
model.eval()
test_loss, test_accuracy = evaluate_loop(test_loader, model, loss_fn, device)

%reload_ext tensorboard
%tensorboard --logdir ./runs/lab_2 --port 6006

Batch size:  1024
Learning rate:  0.001
Total epochs:  5
Set up dataset ...
Train dataset contains: 42000 samples
Validation dataset contains: 18000 samples
Test dataset contains: 10000 samples
current epoch is:  1
training data...
evaluating data...
Test Error: 
   Accuracy: 34.59, Avg loss: 0.4997 

current epoch is:  2
training data...
