In [None]:
import gdown   
import zipfile
import os

file_id = '1Ue9vPaNUi2VVO0uK-pTqc1tJOesFkXgs'
output = 'compressed_bmw_dataset.zip'

url = f'https://drive.google.com/uc?id={file_id}'

print("⬇ Downloading from share link...")
gdown.download(url, output, quiet=False)

⬇ Downloading from share link...


Downloading...
From (original): https://drive.google.com/uc?id=1Ue9vPaNUi2VVO0uK-pTqc1tJOesFkXgs
From (redirected): https://drive.google.com/uc?id=1Ue9vPaNUi2VVO0uK-pTqc1tJOesFkXgs&confirm=t&uuid=b7b00149-8422-4861-837c-105bb3844f05
To: /content/compressed_bmw_dataset.zip
100%|██████████| 4.60G/4.60G [00:35<00:00, 129MB/s] 


'compressed_bmw_dataset.zip'

In [5]:
import zipfile
import os

if os.path.exists("compressed_bmw_dataset.zip") and not os.path.exists("ddg_custom_bmw_dataset"):
        print("Found zipped dataset! Unzipping")
        with zipfile.ZipFile("compressed_bmw_dataset.zip", 'r') as zip_ref:
            zip_ref.extractall(".")
            print("Unzipping complete.")
else:
    print("zipped dataset not found or already unzipped")

zipped dataset not found or already unzipped


In [12]:
# Import modules
import torch
import torch.nn as nn
from torchvision import datasets, transforms, models
from torchvision.transforms import ToTensor
from torch.utils.data import DataLoader, random_split
import os
import time
import numpy as np
from tqdm import tqdm
import json

from PIL import Image

def is_valid_file(path):
    try:
        with Image.open(path) as img:
            img.verify() # Checks if file is broken without decoding the whole thing
        return True
    except:
        return False


data_folder = "ddg_custom_bmw_dataset"
batch_size = 16
lr = 0.001
epochs = 12

class TransformedDataset(torch.utils.data.Dataset):
    def __init__(self, subset, transform=None):
        self.subset = subset
        self.transform = transform

    def __len__(self):
        return len(self.subset)

    def __getitem__(self, idx):
        image, label = self.subset[idx]
        if self.transform:
            image = self.transform(image)
        return image, label

def get_data():
    train_transform = transforms.Compose([
        transforms.Resize((256, 256)),
        transforms.RandomCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(15),
        transforms.ColorJitter(brightness=0.2),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])
    val_transform = transforms.Compose([
        transforms.Resize((256, 256)),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])
    # Set a random seed for reproducibility
    torch.manual_seed(2025)
    




    full_dataset = datasets.ImageFolder(data_folder, is_valid_file=is_valid_file)
    train_set, val_set = random_split(full_dataset, [int(0.8 * len(full_dataset)), len(full_dataset) - int(0.8 * len(full_dataset))])


    train_data = TransformedDataset(train_set, transform=train_transform)
    val_data = TransformedDataset(val_set, transform=val_transform)

    index_to_class = {i: name for i, name in enumerate(full_dataset.classes)}
    with open("bmw_class_names.json", "w") as f:
        json.dump(index_to_class, f)

    return train_data, val_data, full_dataset

train_data, val_data, full_dataset = get_data()
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False)

In [None]:
from matplotlib import pyplot as plt
from tqdm import tqdm

def plot_training_curves(train_losses, val_accuracies):
    """Plot training loss and validation accuracy curves.

    Parameters
    ----------
    train_losses : list of float
        Training loss values for each epoch. Should have one value per epoch.
    val_accuracies : list of float
        Validation accuracy values for each epoch. Should have same length as
        train_losses. Accuracy values should be between 0 and 1 (or 0 and 100
        if using percentages).

    Returns
    -------
    None
        Displays matplotlib figure with two subplots showing training curves.

    Examples
    --------
    >>> train_losses = [0.8, 0.6, 0.4, 0.3, 0.2]
    >>> val_accuracies = [0.75, 0.80, 0.85, 0.87, 0.88]
    >>> plot_training_curves(train_losses, val_accuracies)
    """
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))

    ax1.plot(train_losses)
    ax1.set_title('Training Loss')
    ax1.set_xlabel('Epoch')
    ax1.set_ylabel('Loss')
    ax1.grid(True)

    ax2.plot(val_accuracies)
    ax2.set_title('Validation Accuracy')
    ax2.set_xlabel('Epoch')
    ax2.set_ylabel('Accuracy')
    ax2.grid(True)

    plt.tight_layout()
    plt.show()


if torch.cuda.is_available():
    device = 'cuda'
elif torch.backends.mps.is_available():
    device = 'mps'
else:
    device = 'cpu'

print(f"Using device {device}")


def get_weights(dataset):
  class_counts = np.zeros(len(dataset.classes))
  for _, label in dataset.samples:
    class_counts[label] += 1

  weights = [1./c if c>0 else 0 for c in class_counts]
  total_weight = sum(weights)
  norm_weights = [w*len(weights) / total_weight for w in weights]

  return torch.FloatTensor(norm_weights).to(device)

#THIS IS THE CODE FOR TASK 4 STEP 2(train model with dropout).
class_weights = get_weights(full_dataset)
print("class weights calculated!!! fixed imbalance of dataset classes")



# Setting a random seed for reproducibility
torch.manual_seed(2025)

class ConvNN(torch.nn.Module):
    """Definition of a convolutional neural network for classification
    in Pytorch, inheriting from the torch.nn.Module base class."""
    def __init__(self):
        super().__init__()
        # TODO: finish constructor/initialization
        self.flatten=nn.Flatten()
        self.conv = nn.Conv2d(in_channels = 3, out_channels = 10, kernel_size = 3)
        self.conv2 = nn.Conv2d(in_channels = 10, out_channels = 15, kernel_size = 3)
        self.conv3 = nn.Conv2d(in_channels = 15, out_channels = 20, kernel_size = 3)
        self.pool = nn.MaxPool2d(kernel_size = 2)
        self.linear1 = nn.Linear(13520, 64) #explain math 
        self.dropout = nn.Dropout(p=0.2)
        self.linear2 = nn.Linear(64, len(full_dataset.classes))

    def forward(self, x):
        """ Compute logits for batch x by forward propagation.
        Parameters
        ----------
        x : tensor, shape = [n_examples, n_channels, width, height]
        """
        # TODO: implement forward propagation
        x = self.conv(x)
        x = nn.functional.relu(x)
        x = self.pool(x)
        x = self.conv2(x)
        x = nn.functional.relu(x)
        x = self.pool(x)
        x = self.conv3(x)
        x = nn.functional.relu(x)
        x = self.pool(x)
        x = self.flatten(x)
        a_h = self.linear1(x)
        z_h = nn.functional.relu(a_h)
        z_h = self.dropout(z_h)
        a_out = self.linear2(z_h)
        return a_out

gpu = None

if torch.cuda.is_available():
  gpu = torch.device("cuda")
  model = ConvNN().to(gpu)

else:
  model = ConvNN()

#print(gpu)


loss_fn = nn.CrossEntropyLoss(weight = class_weights)
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
epochs = 20





model.train()

gpu_start_time = time.time()

best_val_acc = 0
patience = 2
patience_counter = 0

train_losses = []
val_accuracies = []

for epoch in range(epochs):
  #
  train_loss = 0
  correct_predictions_train = 0
  for batch, (inputs, labels) in enumerate(tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}", unit="batch")):
    inputs = inputs.to(device)
    labels = labels.to(device)
    outputs = model(inputs)
    loss = loss_fn(outputs, labels)
    train_loss += loss.item()
    correct_predictions_train += (outputs.argmax(1) == labels).sum().item()
    loss.backward()

    optimizer.step()
    optimizer.zero_grad()
  avg_train_loss = train_loss/len(train_loader)
  train_losses.append(avg_train_loss)




  model.eval()
  val_loss = 0
  correct_predictions_eval = 0
  with torch.no_grad():
    for inputs, labels in val_loader: #test_loader!!
      inputs = inputs.to(device)
      labels = labels.to(device)

      outputs = model(inputs)
      loss = loss_fn(outputs, labels)
      val_loss += loss.item()
      _, predicted = torch.max(outputs, 1)
      correct_predictions_eval += (predicted == labels).sum().item()

  val_acc = correct_predictions_eval / len(val_loader.dataset) #test_loader
  avg_val_loss = val_loss/len(val_loader) #test_loader
  val_accuracies.append(val_acc)

  #print("epoch number:", epoch+1)
  print(f'validation loss: {avg_val_loss:>5f}')
  print(f'validation accuracy: {val_acc:>5f}\n')

  # Save a checkpoint for every epoch
  torch.save(model.state_dict(), f"bmw_model_epoch_{epoch+1}.pth")

  if val_acc > best_val_acc:
    best_val_acc = val_acc
    patience_counter = 0
  else:
    patience_counter += 1
    if patience_counter >= patience:
      print("early stopping triggered! epoch:", epoch+1)
      break

  #
  model.train()



gpu_end_time = time.time()
gpu_training_time = gpu_end_time - gpu_start_time


plot_training_curves(train_losses, val_accuracies)
print("gpu training time = ", gpu_training_time)

h_num_params = 0
h2_num_params = 0
o_num_params = 0

#print("total model params = ", sum([h_num_params, h2_num_params, o_num_params]))

Using device cuda
class weights calculated!!! fixed imbalance of dataset classes


Epoch 1/20:  13%|█▎        | 70/533 [00:41<04:35,  1.68batch/s]


KeyboardInterrupt: 