In [1]:
import warnings

warnings.filterwarnings("ignore")

In [2]:
!pip install torch torchvision torchmetrics

Collecting torchmetrics
  Downloading torchmetrics-1.4.1-py3-none-any.whl.metadata (20 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.1.105 (from torch)
  Using cached nvidia_cuda_nvrtc_cu12-12.1.105-py3-none-manylinux1_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.1.105 (from torch)
  Using cached nvidia_cuda_runtime_cu12-12.1.105-py3-none-manylinux1_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.1.105 (from torch)
  Using cached nvidia_cuda_cupti_cu12-12.1.105-py3-none-manylinux1_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==8.9.2.26 (from torch)
  Using cached nvidia_cudnn_cu12-8.9.2.26-py3-none-manylinux1_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.1.3.1 (from torch)
  Using cached nvidia_cublas_cu12-12.1.3.1-py3-none-manylinux1_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.0.2.54 (from torch)
  Using cached nvidia_cufft_cu12-11.0.2.54-py3-none-manylinux1_x86_64.whl.metadata (1.5 kB)
Collecting nv

In [3]:
# import torch
# import torchvision.models as models

# # Load the VGG16 model
# vgg16 = models.vgg16(pretrained=True)

# # Print the model architecture
# print(vgg16)


In [4]:
DATA_DIR = '/content/cifar100_data'

IMAGE_SIZE = 224
NUM_CLASSES = 100

NUM_EPOCHS = 100
NUM_WORKERS = 4
BATCH_SIZE = 32
LEARNING_RATE = 0.001

CONV_KERNEL = 3
CONV_STRIDE = 1
CONV_PADDING = 1
MP_KERNEL = 2
MP_STRIDE = 2
MP_PADDING = 0

checkpoint_path = '/content/trained_models/'

VGG16_archite = [64, 64, "M", 128, 128, "M", 256, 256, 256, "M", 512, 512, 512, "M", 512, 512, 512, "M"]


In [5]:
# def get_args():
#     parser = argparse.ArgumentParser(description="Train NN model")
#     parser.add_argument("--data_path", "-d", type=str, default="data/animals", help="path to the dataset")
#     parser.add_argument("--batch_size", "-b", type=int, default=16)
#     parser.add_argument("--image_size", "-i", type=int, default=224)
#     parser.add_argument("--epochs", "-e", type=int, default=100)
#     parser.add_argument("--lr", "-l", type=float, default=1e-2)
#     parser.add_argument("--log_path", "-p", type=str, default="tensorboard/animals")
#     parser.add_argument("--checkpoint_path", "-c", type=str, default="trained_models/animals")
#     args = parser.parse_args()

#     return args

In [6]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms, datasets
from torch.utils.data import DataLoader, random_split, SubsetRandomSampler
import torch.nn.functional as F
from torch.utils.tensorboard import SummaryWriter
from sklearn.metrics import accuracy_score, confusion_matrix
import numpy as np
from torchmetrics.classification import MulticlassPrecision, MulticlassRecall, MulticlassAccuracy

from tqdm import tqdm
import os


In [7]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [8]:
def data_loader(data_dir,
                batch_size,
                random_seed=42,
                valid_size=0.1,
                shuffle=True,
                test=False):

    normalize = transforms.Normalize(
        mean=[0.4914, 0.4822, 0.4465],
        std=[0.2023, 0.1994, 0.2010],
    )

    # define transforms
    transform = transforms.Compose([
            transforms.Resize((227,227)),
            transforms.ToTensor(),
            normalize,
    ])

    if test:
        dataset = datasets.CIFAR100(
          root=data_dir, train=False,
          download=True, transform=transform,
        )

        data_loader = torch.utils.data.DataLoader(
            dataset, batch_size=batch_size, shuffle=shuffle
        )

        return data_loader

    # load the dataset
    train_dataset = datasets.CIFAR100(
        root=data_dir, train=True,
        download=True, transform=transform,
    )

    valid_dataset = datasets.CIFAR10(
        root=data_dir, train=True,
        download=True, transform=transform,
    )

    num_train = len(train_dataset)
    indices = list(range(num_train))
    split = int(np.floor(valid_size * num_train))

    if shuffle:
        np.random.seed(random_seed)
        np.random.shuffle(indices)

    train_idx, valid_idx = indices[split:], indices[:split]
    train_sampler = SubsetRandomSampler(train_idx)
    valid_sampler = SubsetRandomSampler(valid_idx)

    train_loader = torch.utils.data.DataLoader(
        train_dataset, batch_size=batch_size, sampler=train_sampler)

    valid_loader = torch.utils.data.DataLoader(
        valid_dataset, batch_size=batch_size, sampler=valid_sampler)

    return (train_loader, valid_loader)


# CIFAR100 dataset
train_loader, valid_loader = data_loader(data_dir='./data',
                                         batch_size=64)

test_loader = data_loader(data_dir='./data',
                              batch_size=64,
                              test=True)

Downloading https://www.cs.toronto.edu/~kriz/cifar-100-python.tar.gz to ./data/cifar-100-python.tar.gz


100%|██████████| 169001437/169001437 [00:05<00:00, 30242752.91it/s]


Extracting ./data/cifar-100-python.tar.gz to ./data
Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ./data/cifar-10-python.tar.gz


100%|██████████| 170498071/170498071 [00:05<00:00, 29745197.46it/s]


Extracting ./data/cifar-10-python.tar.gz to ./data
Files already downloaded and verified


In [9]:
total_step = len(train_loader)
print("Total step train:",total_step)

Total step train: 704


In [10]:
import math
def max_pooling_output_size_for_fc(H:int=IMAGE_SIZE, m:int=MP_KERNEL, p:int=MP_PADDING, s:int=MP_STRIDE, type_of_VGG:list=None) -> int:
    number_of_max_pooling = sum(1 for element in type_of_VGG if not isinstance(element, int))
    for _ in range(number_of_max_pooling):
        size = math.floor((H + 2*p - m) / s) + 1
    return size

In [11]:
class VGG_nn(nn.Module):
  def __init__(self, in_channels=3, num_classes=100):
    super(VGG_nn, self).__init__()
    self.in_channels = in_channels
    self.conv_layers = self.create_conv_layers(VGG16_archite)
    # self.image_size = max_pooling_output_size_for_fc(H=IMAGE_SIZE, n=len())
    self.fc = nn.Sequential(
        nn.Linear(512*7*7, 4096),
        nn.ReLU(),
        nn.Dropout(p=0.5),
        nn.Linear(4096, 4096),
        nn.ReLU(),
        nn.Dropout(p=0.5),
        nn.Linear(4096, num_classes)
    )

  def forward(self,x):
    x = self.conv_layers(x)
    x = x.reshape(x.shape[0], -1)
    x = self.fc(x)
    return x

  def create_conv_layers(self, archite):
    layers = []
    in_channels = self.in_channels
    for x in archite:
      if type(x) == int:
        out_channels = x
        layers += [nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=(CONV_KERNEL,CONV_KERNEL), stride=(CONV_STRIDE,CONV_STRIDE), padding=(CONV_PADDING,CONV_PADDING)),
                   nn.BatchNorm2d(x),
                   nn.ReLU()]
        in_channels = x
      elif x == "M":
        layers += [nn.MaxPool2d(kernel_size=(MP_KERNEL,MP_KERNEL), stride=(MP_STRIDE,MP_STRIDE))]
    return nn.Sequential(*layers)


In [12]:
model = VGG_nn(in_channels=3, num_classes=100).to(device=device)
# print(model)

In [13]:
# defined loss and optimizer function
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

In [14]:
# def validate(device, model, val_loader):
#     model.eval()
#     correct = 0
#     total = 0
#     with torch.no_grad():
#         for images, labels in val_loader:
#             images = images.to(device)
#             labels = labels.to(device)
#             outputs = model(images)

#             _, predicted = torch.max(outputs.data, 1)
#             total += labels.size(0)
#             correct += (predicted == labels).sum().item()

#     accuracy = 100 * correct / total
#     print(f"Accuracy of the network on the {total} validation images: {accuracy:.2f} %")
#     return accuracy

In [15]:
def train(checkpoint_dir):
    best_acc = 0.0
    os.makedirs(checkpoint_dir, exist_ok=True)
    writer = SummaryWriter('/content/')

    precision_metric = MulticlassPrecision(num_classes=10, average='macro').to(device)
    recall_metric = MulticlassRecall(num_classes=10, average='macro').to(device)
    accuracy_metric = MulticlassAccuracy(num_classes=10).to(device)

    for epoch in range(NUM_EPOCHS):
        model.train()
        progress_bar = tqdm(train_loader, colour='green')
        for i, (images, labels) in enumerate(progress_bar):
            images = images.to(device)
            labels = labels.to(device)
            output = model(images)
            loss = criterion(output, labels)
            progress_bar.set_description(f"Epochs {epoch + 1} / {NUM_EPOCHS} loss: {loss :0.4f}")
            writer.add_scalar('Train/loss', loss, epoch * len(train_loader) + i)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        model.eval()
        all_losses = []
        all_labels = []
        all_predictions = []
        with torch.no_grad():
            progress_bar = tqdm(valid_loader, colour='yellow')
            for i, (images, labels) in enumerate(progress_bar):
                images = images.to(device)
                labels = labels.to(device)
                output = model(images)

                prediction = torch.argmax(output, dim=1)
                loss = criterion(output, labels)
                progress_bar.set_description(f"Epochs {epoch + 1} / {NUM_EPOCHS} loss: {loss :0.4f}")
                all_losses.append(loss.item())
                all_labels.extend(labels.tolist())
                all_predictions.extend(prediction.tolist())

            average_loss = np.mean(all_losses)

            # Calculate accuracy, precision, and recall
            accuracy = accuracy_metric(torch.tensor(all_predictions).to(device), torch.tensor(all_labels).to(device))
            precision = precision_metric(torch.tensor(all_predictions).to(device), torch.tensor(all_labels).to(device))
            recall = recall_metric(torch.tensor(all_predictions).to(device), torch.tensor(all_labels).to(device))

            print(f"Precision: {precision.item()} Recall: {recall.item()} Loss: {average_loss} Accuracy: {accuracy.item()}")
            writer.add_scalar("Valid/loss", average_loss, epoch)
            writer.add_scalar("Valid/accuracy", accuracy.item(), epoch)
            writer.add_scalar("Valid/precision", precision.item(), epoch)
            writer.add_scalar("Valid/recall", recall.item(), epoch)

            # Save the model checkpoint every epoch
            torch.save(model.state_dict(), os.path.join(checkpoint_dir, 'last.pt'))

            # Save the best model
            if accuracy.item() > best_acc:
                torch.save(model.state_dict(), os.path.join(checkpoint_dir, 'best.pt'))
                best_acc = accuracy.item()

    writer.close()

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [16]:
def testing(device,model,test_loader):
  with torch.no_grad():
    correct = 0
    total = 0
    for images, labels in test_loader:
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        del images, labels, outputs

    print('Accuracy of the network on the {} test images: {} %'.format(10000, 100 * correct / total))


In [17]:
def main():
  print("Train")
  train(checkpoint_path)

  print("Test")
  testing(device,model,test_loader)

In [19]:
main()

Train


  0%|[32m          [0m| 0/704 [00:00<?, ?it/s]


OutOfMemoryError: CUDA out of memory. Tried to allocate 400.00 MiB. GPU 