In [None]:
import os
import torch
from torchvision.utils.data import DataLoader
from torchvision import transforms
from torch import optim
from torch import nn

In [None]:
train_dir = image_path / "train"
test_dir = image_path / "test"
train_dir,test_dir

In [None]:
data_transform = transforms.Compose([
    transforms.Resize(size=(64,64)),
    transforms.RandomHorizontalFlip(p=0.1),
    transforms.ToTensor()
])

In [None]:
from torchvision.datasets import ImageFolder

train_data = ImageFolder(root=train_dir,
                         transform=data_transform)
test_data = ImageFolder(root=test_dir,
                        transform=data_transform)

In [None]:
train_dataloader = DataLoader(train_data,batch_size=32,shuffle=True)
test_dataloader = DataLoader(test_data,batch_size=32)

train_dataloader,test_dataloader

In [None]:
print(f"Length of train data: {len(train_data)} and test data: {len(test_data)}")
print(f"Length of train dataloader: {len(train_dataloader)} and test dataloader: {len(test_dataloader)}")

In [None]:
img,target = next(iter(train_dataloader))
print(f"Image Shape: {img.shape} -> [batch_size,colour_channel,height,width]")
print(f"Target Shape: {target.shape}")

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
device

## Base Model -> TinyVGG

In [None]:
simple_data_transform = transforms.Compose([
    transforms.Resize((64,64)),
    transforms.ToTensor()
])

train_data_simple = ImageFolder(
    root=train_dir,
    transform = simple_data_transform
)

test_data_simple = ImageFolder(
    root=test_dir,
    transform = simple_data_transform
)

In [None]:
import os
BATCH_SIZE = 32
NUM_WORKERS = os.cpu_count()
train_data_simple_dataloader = DataLoader(dataset=train_data_simple,
                                          batch_size=BATCH_SIZE,
                                          num_workers=NUM_WORKERS,
                                          shuffle=True)
test_data_simple_dataloader = DataLoader(dataset=test_data_simple,
                                          batch_size=BATCH_SIZE,
                                          num_workers=NUM_WORKERS,
                                          shuffle=False)

In [None]:
from torch import nn

class TinyVGG(nn.Module):
    def __init__(self, input_units, hidden_units, output_units):
        super().__init__()

        # First conv block
        self.conv_1 = nn.Sequential(
            nn.Conv2d(
                in_channels=input_units,
                out_channels=hidden_units,
                kernel_size=3,
                padding=0,
                stride=1
            ),
            nn.ReLU(),
            nn.Conv2d(
                in_channels=hidden_units,
                out_channels=hidden_units,
                kernel_size=3,
                stride=1,
                padding=0
            ),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )

        # Second conv block
        self.conv_2 = nn.Sequential(
            nn.Conv2d(
                in_channels=hidden_units,
                out_channels=hidden_units,
                kernel_size=3,
                padding=0,
                stride=1
            ),
            nn.ReLU(),
            nn.Conv2d(
                in_channels=hidden_units,
                out_channels=hidden_units,
                kernel_size=3,
                stride=1,
                padding=0
            ),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )

        self.conv_3 = nn.Sequential(
            nn.Conv2d(
                in_channels=hidden_units,
                out_channels=hidden_units,
                kernel_size=3,
                padding=0,
                stride=1
            ),
            nn.ReLU(),
            nn.Conv2d(
                in_channels=hidden_units,
                out_channels=hidden_units,
                kernel_size=3,
                stride=1,
                padding=0
            ),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )

        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(in_features=hidden_units*4*4,
                      out_features=output_units)
        )

    def forward(self, x: nn.Module):
        x = self.conv_1(x)
        # print("After conv_1:", x.shape)
        x = self.conv_2(x)
        # print("After conv_2:", x.shape)
        x = self.conv_3(x)
        # print("After con_3:", x.shape)
        x = self.classifier(x)
        # print("After classifier:", x.shape)
        return x


In [None]:
torch.manual_seed(42)
model_0 = TinyVGG(input_units=3,
                  hidden_units=64,
                  output_units=len(class_names)
                  ).to(device)
# print(model_0)

In [None]:
image_batch, target_batch = next(iter(train_data_simple_dataloader))
print(target_batch)

RuntimeError: mat1 and mat2 shapes cannot be multiplied (32x2560 and 10x3)

256 should be multiplied to 10 in forward method

In [None]:
# model_0(image_batch.to(device))

In [None]:
%pip install torchinfo

In [None]:
from torchinfo import summary
summary(model_0)

In [None]:
from torch import optim
EPOCHS = 20
LR = 0.001
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.Adam(model_0.parameters(),lr=LR)

In [None]:
from tqdm.auto import tqdm

def train_step(
    model: torch.nn.Module,
    data_loader: torch.utils.data.DataLoader,
    loss_fn: torch.nn.Module,
    optimizer: torch.optim.Optimizer,
    device: torch.device
):
    """Train the model for one epoch."""
    model.train()
    train_loss, correct_preds = 0, 0

    for batch, (X, y) in enumerate(data_loader):
        X, y = X.to(device), y.to(device)

        # Forward pass
        y_pred = model(X)
        loss = loss_fn(y_pred, y)
        train_loss += loss.item()

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Accuracy
        y_pred_class = torch.argmax(y_pred, dim=1)
        correct_preds += (y_pred_class == y).sum().item()

    train_loss /= len(data_loader)
    train_acc = correct_preds / len(data_loader.dataset) # type: ignore

    print(f"Train Loss: {train_loss:.3f} | Train Acc: {train_acc:.3f}")
    return train_loss,train_acc


def test_step(
    model: torch.nn.Module,
    data_loader: torch.utils.data.DataLoader,
    loss_fn: torch.nn.Module,
    device: torch.device
):
    """Evaluate the model on the test dataset."""
    model.eval()
    test_loss, correct_preds = 0, 0

    with torch.inference_mode():
        for batch, (X_t, y_t) in enumerate(data_loader):
            X_t, y_t = X_t.to(device), y_t.to(device)

            test_logits = model(X_t)
            loss = loss_fn(test_logits, y_t)
            test_loss += loss.item()

            test_logits_class = torch.argmax(test_logits, dim=1)
            correct_preds += (test_logits_class == y_t).sum().item()

    test_loss /= len(data_loader)
    test_acc = correct_preds / len(data_loader.dataset) # type: ignore

    print(f"Test Loss: {test_loss:.3f} | Test Acc: {test_acc:.3f}")
    return test_loss,test_acc

In [None]:
from tqdm.auto import tqdm
import torch
from typing import Optional

def train(
    num_epochs: int,
    model: torch.nn.Module,
    train_loader: torch.utils.data.DataLoader,
    test_loader: torch.utils.data.DataLoader,
    loss_fn: torch.nn.Module,
    optimizer: torch.optim.Optimizer,
    device: torch.device,
):
    """Run training + testing for multiple epochs."""
    results = {
        "train_loss": [],
        "train_acc": [],
        "test_loss": [],
        "test_acc": [],
    }

    for epoch in tqdm(range(num_epochs)):
        print(f"\nEpoch {epoch+1}/{num_epochs}")

        # Training
        train_loss, train_acc = train_step(
            model=model,
            data_loader=train_loader,
            loss_fn=loss_fn,
            optimizer=optimizer,
            device=device,
        )

        # Testing
        test_loss, test_acc = test_step(
            model=model,
            data_loader=test_loader,
            loss_fn=loss_fn,
            device=device,
        )

        # Save results
        results["train_loss"].append(train_loss)
        results["train_acc"].append(train_acc)
        results["test_loss"].append(test_loss)
        results["test_acc"].append(test_acc)

    return results


In [None]:
model_0_results  = train(num_epochs=EPOCHS,
      model=model_0,
      train_loader=train_data_simple_dataloader,
      test_loader=test_data_simple_dataloader,
      loss_fn=loss_fn,
      optimizer=optimizer,
      device=device)

In [None]:
from typing import Dict,List
def plot_loss_curves(results:Dict[str,List[float]]):
  """ Plots training curves of a results dictionary """
  loss = results["train_loss"]
  test_loss = results["test_loss"]

  train_acc = results["train_acc"]
  test_acc = results["test_acc"]

  epochs = range(len(results["train_loss"]))

  plt.figure(figsize=(15,4))
  plt.subplot(1,2,1)
  plt.plot(epochs,loss,label="train_loss")
  plt.plot(epochs,test_loss,label="test_loss")
  plt.title("Loss")
  plt.xlabel("Epochs")
  plt.legend()

  plt.subplot(1,2,2)
  plt.plot(epochs,train_acc,label="train_accuracy")
  plt.plot(epochs,test_acc,label="test accuracy")
  plt.title("Accuracy")
  plt.xlabel("Epochs")
  plt.legend()

In [None]:
plot_loss_curves(model_0_results)

# TinyVGG with Data Augmentaton

In [None]:
train_transform_trivial = transforms.Compose([
    transforms.Resize((64,64)),
    transforms.TrivialAugmentWide(num_magnitude_bins=31), # 0 to 31
    transforms.ToTensor()
])

test_transform_simple = transforms.Compose([
    transforms.Resize(size=(64,64)),
    transforms.ToTensor()
])

In [None]:
train_data_simple_augment=  ImageFolder(root=train_dir,
                                        transform=train_transform_trivial)
test_data_simple_augment = ImageFolder(root=test_dir,
                                       transform=test_transform_simple)

In [None]:
train_data_simple_augment_dataloader= DataLoader(
                                              dataset=train_data_simple_augment,
                                              shuffle=True,
                                              batch_size=BATCH_SIZE,
                                              num_workers=NUM_WORKERS)

test_data_simple_augment_dataloader = DataLoader(
    dataset=test_data_simple_augment,
    batch_size=BATCH_SIZE,
    num_workers=NUM_WORKERS
    )

In [None]:
len((next(iter(train_data_simple_augment_dataloader)))[0])

In [None]:
model_1 = TinyVGG(input_units=3,
                  hidden_units=64,
                  output_units=len(class_names))
# print(model_1)

In [None]:
LR = 0.001
optimizer = optim.Adam(model_1.parameters(),lr=LR)
loss_fn = nn.CrossEntropyLoss()

In [None]:
model_1_results = train(num_epochs=20,
                        model=model_1,
                        train_loader=train_data_simple_augment_dataloader,
                        test_loader=test_data_simple_augment_dataloader,
                        loss_fn=loss_fn,
                        optimizer=optimizer,
                        device=device)
# model_1_results

In [None]:
plot_loss_curves(model_1_results)

In [None]:
import pandas as pd
model_0_df = pd.DataFrame(model_0_results)
model_1_df = pd.DataFrame(model_1_results)

In [None]:
plt.figure(figsize=(15,10))

epochs = range(len(model_0_df))
plt.subplot(2,2,1)
plt.plot(epochs,model_0_df["train_loss"],label="Model 0")
plt.plot(epochs,model_1_df["train_loss"],label="Model 1")
plt.title("Train Loss")
plt.xlabel("Epochs")
plt.legend();

plt.subplot(2,2,2)
plt.plot(epochs,model_0_df["test_loss"],label="Model 0")
plt.plot(epochs,model_1_df["test_loss"],label="Model 1")
plt.title("Test Loss")
plt.xlabel("Epochs")
plt.legend();

plt.subplot(2,2,3)
plt.plot(epochs,model_0_df["train_acc"],label="Model 0")
plt.plot(epochs,model_1_df["train_acc"],label="Model 1")
plt.title("Train acc")
plt.xlabel("Epochs")
plt.legend();

plt.subplot(2,2,4)
plt.plot(epochs,model_0_df["test_acc"],label="Model 0")
plt.plot(epochs,model_1_df["test_acc"],label="Model 1")
plt.title("Test acc")
plt.xlabel("Epochs")
plt.legend();

## Loading a custom image

In [None]:
import requests

response = requests.get("https://images8.alphacoders.com/369/369063.jpg")

with open("custom_image.jpg","wb") as f:
  f.write(response.content)

In [None]:
import torchvision
custom_image_path = "custom_image.jpg"
custom_image_units = torchvision.io.read_image(custom_image_path)
plt.imshow(custom_image_units.permute(1,2,0))

In [None]:
# print(f"Custom image tensor:\n{custom_image_units}")
print(f"Custom image shape: {custom_image_units.shape}")
print(f"Custom image device: {custom_image_units.device}")
print(f"Custom image dtype: {custom_image_units.dtype}")

In [None]:
custom_image = custom_image_units.type(torch.float32).to(device) / 255

In [None]:
custom_image_transform = transforms.Compose([
    transforms.Resize((64,64))
])

custom_image_transformed = custom_image_transform(custom_image)

In [None]:
print(f"Original Shape: {custom_image_units.shape}")
print(f"Transformed Shape: {custom_image_transformed.shape}")
print(f"Adding Batch: {custom_image_transformed.unsqueeze(0).shape}")

In [None]:
plt.imshow(custom_image_transformed.permute(1,2,0))

In [None]:
model_1.eval()
with torch.inference_mode():
  custom_image_pred = model_1(custom_image_transformed.unsqueeze(0))

In [None]:
custom_image_pred

In [None]:
class_names[custom_image_pred.argmax(dim=1)]

In [None]:
def custom_image_predictions(img_path:str,
                             model: torch.nn.Module,
                             device:torch.device,
                             class_names:list[str],
                             transform: None):
  """ Used to predict custom images to test the model """
  target_image = torchvision.io.read_image(img_path)
  target_image = target_image.type(torch.float32).to(device) / 255
  if transform:
    target_image = transform(target_image)
  target_image = target_image.unsqueeze(0)
  model.eval()
  with torch.inference_mode():
    target_img_pred = model(target_image)
  target = target_img_pred.argmax(dim=1)
  plt.imshow(target_image.squeeze().permute(1,2,0))
  return class_names[target]

In [None]:
custom_image_predictions(img_path=custom_image_path,
                         model=model_1,
                         transform=custom_image_transform,
                         device=device,
                         class_names=class_names)