In [3]:
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from torchvision.datasets import Food101
import torch
import random
import torch.nn as nn
from torch.utils.data import DataLoader
from torchvision import models
import os
from tqdm.auto import tqdm
from typing import Dict, List, Tuple
from pathlib import Path
from torch.optim import lr_scheduler
from torchmetrics import Precision, Recall, F1Score, ConfusionMatrix
from torchinfo import summary


In [None]:
root = 'data'  # Directory where the dataset will be stored
transform = transforms.Compose([
    transforms.Resize((384, 384)),  # Resize images to 224x224 pixels
    transforms.ToTensor(),# Convert images to PyTorch tensors
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) # Normalize using ImageNet stats
])

food101_train = Food101(root=root, split='train', transform=transform, download=True)
food101_test = Food101(root=root, split='test', transform=transform, download=True)

In [None]:
def Food101_dataloaders(
    train_dataset,
    test_dataset,
    batch_size:int,
    num_workers:int):

  class_names = train_dataset.classes
  class_to_idx=train_dataset.class_to_idx

  # Turn images into data loaders
  train_dataloader = DataLoader(
      train_dataset,
      batch_size=batch_size,
      shuffle=True,
      num_workers=num_workers,
      pin_memory=True,
  )


  test_dataloader = DataLoader(
      test_dataset,
      batch_size=batch_size,
      shuffle=False, # don't need to shuffle test data
      num_workers=num_workers,
      pin_memory=True,
  )

  return train_dataloader, test_dataloader, class_names,class_to_idx

In [None]:
def EfficientNet_V2(num_classes:int,
                    freeze_percentage:float=0.5,
                    tr_learning=False
                    ):
  weights=models.EfficientNet_V2_S_Weights.IMAGENET1K_V1
  #transform=weights.transforms
  EfficientNet_V2=models.efficientnet_v2_s(weights=weights)
  if tr_learning:
    for param in EfficientNet_V2.parameters():
      param.requires_grad=False

    EfficientNet_V2.classifier=nn.Sequential(
    nn.Dropout(p=0.2,inplace=True),
    nn.Linear(in_features=1280,out_features=num_classes,bias=True)
  )
  else:
    params = list(EfficientNet_V2.parameters())

    # Calculate the number of parameters to freeze
    num_params_to_freeze = int(len(params) * freeze_percentage)

    # Randomly select parameters to freeze
    params_to_freeze = random.sample(params, num_params_to_freeze)

    # Freeze selected parameters
    for param in params_to_freeze:
       param.requires_grad = False

    EfficientNet_V2.classifier=nn.Sequential(
       nn.Dropout(p=0.2,inplace=True),
       nn.Linear(in_features=1280,out_features=num_classes,bias=True)
  )
  return EfficientNet_V2

In [None]:
def train_step(model:nn.Module,
               dataloader,
               loss_fn,
               optimizer,
               device):
  """Trains a PyTorch model for a single epoch.

  Turns a target PyTorch model to training mode and then
  runs through all of the required training steps (forward
  pass, loss calculation, optimizer step).

  Args:
    model: A PyTorch model to be trained.
    dataloader: A DataLoader instance for the model to be trained on.
    loss_fn: A PyTorch loss function to minimize.
    optimizer: A PyTorch optimizer to help minimize the loss function.
    device: A target device to compute on (e.g. "cuda" or "cpu").

  Returns:
    A tuple of training loss and training accuracy metrics.
    In the form (train_loss, train_accuracy). For example:

    (0.1112, 0.8743)
  """
  # Put model in train mode
  model.train()

  # Setup train loss and train accuracy values
  train_loss, train_acc = 0, 0

  # Loop through data loader data batches
  for batch, (X, y) in enumerate(dataloader):
      # Send data to target device
      X, y = X.to(device), y.to(device)

      # 1. Forward pass
      y_pred = model(X)

      # 2. Calculate  and accumulate loss
      loss = loss_fn(y_pred, y)
      train_loss += loss.item()

      # 3. Optimizer zero grad
      optimizer.zero_grad()

      # 4. Loss backward
      loss.backward()

      # 5. Optimizer step
      optimizer.step()

      #scheduler.step()

      # Calculate and accumulate accuracy metric across all batches
      y_pred_class = torch.argmax(torch.softmax(y_pred, dim=1), dim=1)
      train_acc += (y_pred_class == y).sum().item()/len(y_pred)

  # Adjust metrics to get average loss and accuracy per batch
  train_loss = train_loss / len(dataloader)
  train_acc = train_acc / len(dataloader)
  return train_loss, train_acc

In [None]:
def test_step(model: torch.nn.Module,
              dataloader: torch.utils.data.DataLoader,
              loss_fn: torch.nn.Module,
              device: torch.device) -> Tuple[float, float]:
  """Tests a PyTorch model for a single epoch.

  Turns a target PyTorch model to "eval" mode and then performs
  a forward pass on a testing dataset.

  Args:
    model: A PyTorch model to be tested.
    dataloader: A DataLoader instance for the model to be tested on.
    loss_fn: A PyTorch loss function to calculate loss on the test data.
    device: A target device to compute on (e.g. "cuda" or "cpu").

  Returns:
    A tuple of testing loss and testing accuracy metrics.
    In the form (test_loss, test_accuracy). For example:

    (0.0223, 0.8985)
  """
  # Put model in eval mode
  model.eval()

  # Setup test loss and test accuracy values
  test_loss, test_acc = 0, 0


  # Turn on inference context manager
  with torch.inference_mode():
      # Loop through DataLoader batches
      for batch, (X, y) in enumerate(dataloader):
          # Send data to target device
          X, y = X.to(device), y.to(device)

          # 1. Forward pass
          test_pred_logits = model(X)

          # 2. Calculate and accumulate loss
          loss = loss_fn(test_pred_logits, y)
          test_loss += loss.item()

          # Calculate and accumulate accuracy
          test_pred_labels = test_pred_logits.argmax(dim=1)
          test_acc += ((test_pred_labels == y).sum().item()/len(test_pred_labels))



  # Adjust metrics to get average loss and accuracy per batch
  test_loss = test_loss / len(dataloader)
  test_acc = test_acc / len(dataloader)
  return test_loss, test_acc

In [None]:
def train(model: torch.nn.Module,
          train_dataloader: torch.utils.data.DataLoader,
          test_dataloader: torch.utils.data.DataLoader,
          optimizer: torch.optim.Optimizer,
          loss_fn: torch.nn.Module,
          #scheduler,
          epochs: int,
          device: torch.device) -> Dict[str, List]:
  """Trains and tests a PyTorch model.

  Passes a target PyTorch models through train_step() and test_step()
  functions for a number of epochs, training and testing the model
  in the same epoch loop.

  Calculates, prints and stores evaluation metrics throughout.

  Args:
    model: A PyTorch model to be trained and tested.
    train_dataloader: A DataLoader instance for the model to be trained on.
    test_dataloader: A DataLoader instance for the model to be tested on.
    optimizer: A PyTorch optimizer to help minimize the loss function.
    loss_fn: A PyTorch loss function to calculate loss on both datasets.
    epochs: An integer indicating how many epochs to train for.
    device: A target device to compute on (e.g. "cuda" or "cpu").

  Returns:
    A dictionary of training and testing loss as well as training and
    testing accuracy metrics. Each metric has a value in a list for
    each epoch.
    In the form: {train_loss: [...],
                  train_acc: [...],
                  test_loss: [...],
                  test_acc: [...]}
    For example if training for epochs=2:
                 {train_loss: [2.0616, 1.0537],
                  train_acc: [0.3945, 0.3945],
                  test_loss: [1.2641, 1.5706],
                  test_acc: [0.3400, 0.2973]}
  """
  # Create empty results dictionary
  results = {"train_loss": [],
      "train_acc": [],
      "test_loss": [],
      "test_acc": []
  }

  # Loop through training and testing steps for a number of epochs
  for epoch in tqdm(range(epochs)):
      train_loss, train_acc = train_step(model=model,
                                          dataloader=train_dataloader,
                                          loss_fn=loss_fn,
                                          optimizer=optimizer,
                                          #scheduler=scheduler,
                                          device=device)
      test_loss, test_acc = test_step(model=model,
          dataloader=test_dataloader,
          loss_fn=loss_fn,
          device=device)

      if (epoch+1)%10==0:
        save_model(model=model,
                   target_dir="models",
                   model_name=f"{epoch+1}_Food101_EfficientNetV2_tr_learning.pth")

      # Print out what's happening
      print(
          f"Epoch: {epoch+1} | "
          f"train_loss: {train_loss:.4f} | "
          f"train_acc: {train_acc:.4f} | "
          f"test_loss: {test_loss:.4f} | "
          f"test_acc: {test_acc:.4f}"
      )

      # Update results dictionary
      results["train_loss"].append(train_loss)
      results["train_acc"].append(train_acc)
      results["test_loss"].append(test_loss)
      results["test_acc"].append(test_acc)

  # Return the filled results at the end of the epochs
  return results

In [None]:
def eval_model(model: torch.nn.Module,
              dataloader: torch.utils.data.DataLoader,
              loss_fn: torch.nn.Module,
               num_classes:int,
              device: torch.device):
  model.eval()

  # Setup test loss and test accuracy values
  results={"test_loss":[],
           "test_acc":[],
           "test_precision":[],
           "test_recall":[],
           "test_f1_score":[],
           "test_confusion_matrix":[]}

  precision_metric = Precision(num_classes=num_classes)
  recall_metric = Recall(num_classes=num_classes)
  f1_score_metric = F1Score(num_classes=num_classes)
  confusion_matrix = ConfusionMatrix(num_classes=num_classes)


  # Turn on inference context manager
  with torch.inference_mode():
      # Loop through DataLoader batches
      for batch, (X, y) in enumerate(dataloader):
          # Send data to target device
          X, y = X.to(device), y.to(device)

          # 1. Forward pass
          test_pred_logits = model(X)

          # 2. Calculate and accumulate loss
          loss = loss_fn(test_pred_logits, y)
          test_loss += loss.item()

          # Calculate and accumulate accuracy
          test_pred_labels = test_pred_logits.argmax(dim=1)
          test_acc += ((test_pred_labels == y).sum().item()/len(test_pred_labels))

          precision_metric(test_pred_labels, y)
          recall_metric(test_pred_labels, y)
          f1_score_metric(test_pred_labels, y)
          confusion_matrix(test_pred_labels, y)



  # Adjust metrics to get average loss and accuracy per batch
  test_loss = test_loss / len(dataloader)
  test_acc = test_acc / len(dataloader)

  final_precision = precision_metric.compute()
  final_recall = recall_metric.compute()
  final_f1 = f1_score_metric.compute()
  final_confusion_matrix = confusion_matrix.compute()

  results["test_loss"].append(test_loss)
  results["test_acc"].append(test_acc)
  results["test_precision"].append(final_precision)
  results["test_recall"].append(final_recall)
  results["test_f1_score"].append(final_f1)
  results["test_confusion_matrix"].append(final_confusion_matrix)


  return results

In [None]:
def save_model(model: torch.nn.Module,
               target_dir: str,
               model_name: str):
  """Saves a PyTorch model to a target directory.

  Args:
    model: A target PyTorch model to save.
    target_dir: A directory for saving the model to.
    model_name: A filename for the saved model. Should include
      either ".pth" or ".pt" as the file extension.

  Example usage:
    save_model(model=model_0,
               target_dir="models",
               model_name="05_going_modular_tingvgg_model.pth")
  """
  # Create target directory
  target_dir_path = Path(target_dir)
  target_dir_path.mkdir(parents=True,
                        exist_ok=True)

  # Create model save path
  assert model_name.endswith(".pth") or model_name.endswith(".pt"), "model_name should end with '.pt' or '.pth'"
  model_save_path = target_dir_path / model_name

  # Save the model state_dict()
  print(f"[INFO] Saving model to: {model_save_path}")
  torch.save(obj=model.state_dict(),
             f=model_save_path)

In [None]:
efficientnet_v2=EfficientNet_V2(num_classes=101,
                                freeze_percentage=0.7,
                                tr_learning=True)

In [None]:
Num_Workers=os.cpu_count()

Food101_train_dataloader,Food101_test_dataloader,class_names,class_to_idx=Food101_dataloaders(train_dataset=food101_train,
                                                                              
                                                                              test_dataset=food101_test,
                                                                              batch_size=40,
                                                                              num_workers=Num_Workers
                                                                              )

In [None]:
loss_fn=nn.CrossEntropyLoss()
optimizer=torch.optim.SGD(efficientnet_v2.parameters(), lr=0.01, momentum=0.9)
#scheduler = lr_scheduler.CyclicLR(optimizer, base_lr=0.0001, max_lr=0.01, step_size_up=2000, mode='triangular')

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
efficientnet_v2 = nn.DataParallel(efficientnet_v2, device_ids=[0, 1])
efficientnet_v2.to(device)