In [None]:
from google.colab import drive
import os
import shutil

drive.mount('/content/drive')

drive_data_dir = '/content/drive/My Drive/CS 229/StreetViewImages/'
local_data_dir = '/content/StreetViewImages'

if not os.path.exists(local_data_dir):
    shutil.copytree(drive_data_dir, local_data_dir)
    print("Dataset copied")
else:
    print("Dataset already exists")


In [None]:
# VIT
import os
import torch
import torchvision.models as models
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Subset
from tqdm import tqdm
from torchvision.transforms.v2 import RandomErasing, TrivialAugmentWide
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix

local_data_dir = "/content/StreetViewImages"


def split_train_test(dataset):
  count = 0
  train_indices = []
  test_indices = []
  for idx, (img_path, label) in enumerate(dataset.imgs):
      img_name = os.path.basename(img_path)
      img_number = int(img_name.split(".")[0])
      if img_number < 4050:
          train_indices.append(idx)
      else:
          test_indices.append(idx)
      count += 1
  print(count)

  return Subset(dataset, train_indices), Subset(dataset, test_indices)

def compute_mean_std(dataset, batch_size=32, num_workers=2):
  loader = DataLoader(dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers)
  mean = torch.zeros(3)
  std = torch.zeros(3)
  nb_samples = 0
  for images, _ in loader:
      batch_samples = images.size(0)
      images = images.view(batch_samples, images.size(1), -1)
      mean += images.mean(2).sum(0)
      std += images.std(2).sum(0)
      nb_samples += batch_samples
  mean /= nb_samples
  std /= nb_samples
  return mean.tolist(), std.tolist()


temp_dataset = datasets.ImageFolder(root=local_data_dir, transform=transforms.ToTensor())
train_dataset_stats, _ = split_train_test(temp_dataset)
computed_mean, computed_std = compute_mean_std(train_dataset_stats)
print("Dataset Mean:", computed_mean)
print("Dataset Std:", computed_std)

train_transform = transforms.Compose([
    transforms.Resize((518,518)), # For B16, need 224
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.RandomResizedCrop(518, scale=(0.8, 1.0)), # For B16, need 224
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.RandomGrayscale(p=0.1),
    transforms.GaussianBlur(kernel_size=3),
    transforms.ToTensor(),
    transforms.Normalize(mean=computed_mean, std=computed_std)
])


test_transform = transforms.Compose([
    transforms.Resize((518,518)),
    transforms.ToTensor(),
    transforms.Normalize(mean=computed_mean, std=computed_std)
])

dataset = datasets.ImageFolder(root=local_data_dir)
train_dataset, test_dataset = split_train_test(dataset)

train_dataset.dataset.transform = train_transform
test_dataset.dataset.transform = test_transform

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

base_model = models.vit_h_14(weights=models.ViT_H_14_Weights.IMAGENET1K_SWAG_E2E_V1)
in_features = base_model.heads.head.in_features
base_model.heads.head = nn.Linear(in_features, 6)
model = base_model

for param in model.parameters():
    param.requires_grad = False

for param in model.encoder.layers[-3:].parameters():
    param.requires_grad = True

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)


criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=0.0001, weight_decay=5e-3)


scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=5)

def train_model(model, train_loader, test_loader, criterion, optimizer, scheduler, epochs=10):
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        image_no = 0
        for images, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}", unit="batch"):
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            image_no += labels.size(0)

        train_acc = 100 * correct / total
        print(f"Epoch {epoch+1}/{epochs}, Loss: {running_loss/len(train_loader):.4f}, Train Acc: {train_acc:.2f}%")
        print(f"Test Accuracy: {evaluate_model(model, test_loader):.2f}%")
        plot_confusion_matrix(model, test_loader, dataset.classes)

        scheduler.step(running_loss/len(train_loader))


def evaluate_model(model, loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    return 100 * correct / total

def get_all_preds(model, loader):
    model.eval()
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for images, labels in loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    return all_labels, all_preds

def plot_confusion_matrix(model, loader, classes):
    true_labels, pred_labels = get_all_preds(model, loader)
    cm = confusion_matrix(true_labels, pred_labels)

    cm_percent = np.zeros_like(cm, dtype=float)
    for i, row in enumerate(cm):
        row_sum = row.sum()
        if row_sum > 0:
            cm_percent[i] = row / row_sum * 100
        else:
            cm_percent[i] = 0

    annot = np.empty_like(cm).astype(str)
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            annot[i, j] = f"{cm[i, j]}\n({cm_percent[i, j]:.1f}%)"

    plt.figure(figsize=(8,6))
    sns.heatmap(cm, annot=annot, fmt="", cmap="Blues", xticklabels=classes, yticklabels=classes)
    plt.ylabel("True Label")
    plt.xlabel("Predicted Label")
    plt.title("Confusion Matrix")
    plt.show()


train_model(model, train_loader, test_loader, criterion, optimizer, scheduler, epochs=5)
plot_confusion_matrix(model, test_loader, dataset.classes)


In [None]:
# LOGISTIC REGRESSION
import os
import torch
import torchvision.transforms as transforms
import torchvision.datasets as datasets
from torch.utils.data import DataLoader, Subset
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm

local_data_dir = "/content/StreetViewImages"


temp_dataset = datasets.ImageFolder(root=local_data_dir, transform=transforms.ToTensor())

def split_train_test(dataset):
    train_indices = []
    test_indices = []
    for idx, (img_path, label) in enumerate(dataset.imgs):
        img_name = os.path.basename(img_path)
        img_number = int(img_name.split(".")[0])
        if img_number < 4050:
            train_indices.append(idx)
        else:
            test_indices.append(idx)
    return Subset(dataset, train_indices), Subset(dataset, test_indices)

train_transform = transforms.Compose([
    transforms.ToTensor(),
])
test_transform = transforms.Compose([
    transforms.ToTensor(),
])

dataset = datasets.ImageFolder(root=local_data_dir)
train_dataset, test_dataset = split_train_test(dataset)


train_dataset.dataset.transform = train_transform
test_dataset.dataset.transform = test_transform

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader  = DataLoader(test_dataset, batch_size=32, shuffle=False)

num_classes = len(dataset.classes)
input_dim = 3 * 224 * 224

class LogisticRegressionModel(nn.Module):
    def __init__(self, input_dim, num_classes):
        super(LogisticRegressionModel, self).__init__()
        self.linear = nn.Linear(input_dim, num_classes)

    def forward(self, x):
        x = x.view(x.size(0), -1)
        logits = self.linear(x)
        return logits

model = LogisticRegressionModel(input_dim, num_classes)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)

def train_model(model, train_loader, test_loader, criterion, optimizer, epochs=30):
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        for images, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}", unit="batch"):
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        train_acc = 100 * correct / total
        print(f"Epoch {epoch+1}/{epochs}, Loss: {running_loss/len(train_loader):.4f}, Train Acc: {train_acc:.2f}%")
        print(f"Test Accuracy: {evaluate_model(model, test_loader):.2f}%")

def evaluate_model(model, loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    return 100 * correct / total

if __name__ == '__main__':
    train_model(model, train_loader, test_loader, criterion, optimizer, epochs=30)
    final_acc = evaluate_model(model, test_loader)
    print(f"Test Accuracy: {final_acc:.2f}%")


In [None]:
# RESNET50
import os
import torch
import torchvision.models as models
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Subset
from tqdm import tqdm

local_data_dir = "/content/StreetViewImages"

transform = transforms.Compose([transforms.ToTensor()])

def split_train_test(dataset):
    train_indices = []
    test_indices = []

    for idx, (img_path, label) in enumerate(dataset.imgs):
        img_name = os.path.basename(img_path)
        img_number = int(img_name.split(".")[0])
        if img_number < 4050:
            train_indices.append(idx)
        else:
            test_indices.append(idx)

    return Subset(dataset, train_indices), Subset(dataset, test_indices)

dataset = datasets.ImageFolder(root=local_data_dir, transform=transform)
train_dataset, test_dataset = split_train_test(dataset)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

model = models.resnet50(pretrained=True)
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 6)

for param in model.parameters():
    param.requires_grad = False


for param in list(model.parameters())[-11]:
    param.requires_grad = True


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)


criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=0.0001)


def train_model(model, train_loader, criterion, optimizer, epochs=10):
    for module in model.modules():
      if isinstance(module, nn.BatchNorm2d):
        module.train()

    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        image_no = 0
        for images, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}", unit="batch"):
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            image_no += labels.size(0)

        train_acc = 100 * correct / total
        print(f"Epoch {epoch+1}/{epochs}, Loss: {running_loss/len(train_loader):.4f}, Train Acc: {train_acc:.2f}%")
        print(f"Test Accuracy: {evaluate_model(model, test_loader):.2f}%")


def evaluate_model(model, loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    return 100 * correct / total


train_model(model, train_loader, criterion, optimizer, epochs=10)


final_acc = evaluate_model(model, test_loader)
train_acc = evaluate_model(model, train_loader)
print(f"Test Accuracy: {final_acc:.2f}%")