<a href="https://colab.research.google.com/github/Diksha-Arsule/Wad/blob/main/prac6.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import argparse
import os
import random
import shutil
from pathlib import Path
from typing import Tuple, Dict

import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torchvision import datasets, models, transforms

In [14]:
def split_dataset_into_dirs(source_dir: str, dest_root: str, split: Tuple[float, float, float]=(0.5, 0.25, 0.25), seed: int = 42):
    """
    Copy images from source_dir/class_x/* into dest_root/{train,valid,test}/class_x/
    according to split ratios. Keeps file extensions and names.
    """
    assert sum(split) == 1.0, "split must sum to 1.0"
    random.seed(seed)
    source = Path(source_dir)
    dest_root = Path(dest_root)
    train_dir = dest_root / "train"
    valid_dir = dest_root / "valid"
    test_dir  = dest_root / "test"
    for d in (train_dir, valid_dir, test_dir):
        d.mkdir(parents=True, exist_ok=True)
    classes = [p for p in source.iterdir() if p.is_dir()]
    if not classes:
        raise RuntimeError(f"No class subdirectories found in {source_dir}")

    for cls_path in classes:
        cls_name = cls_path.name
        images = [p for p in cls_path.iterdir() if p.is_file()]
        if not images:
            continue
        random.shuffle(images)
        n = len(images)
        n_train = int(split[0] * n)
        n_valid = int(split[1] * n)
        # remainder to test
        n_test = n - n_train - n_valid

        assignments = {
            train_dir / cls_name: images[:n_train],
            valid_dir / cls_name: images[n_train:n_train + n_valid],
            test_dir  / cls_name: images[n_train + n_valid:]
        }

        for out_dir, files in assignments.items():
            out_dir.mkdir(parents=True, exist_ok=True)
            for src_path in files:
                dst_path = out_dir / src_path.name
                # copy if not exists (to allow restarting)
                if not dst_path.exists():
                    shutil.copy2(src_path, dst_path)

    print(f"Dataset split into: {train_dir}, {valid_dir}, {test_dir}")

In [20]:
def build_dataloaders(datadir: str, batch_size: int = 32, num_workers: int = 4) -> Tuple[Dict[str, DataLoader], Dict[str, datasets.ImageFolder]]:
    image_transforms = {
        'train': transforms.Compose([
            transforms.RandomResizedCrop(size=256, scale=(0.8, 1.0)),
            transforms.RandomRotation(degrees=15),
            transforms.ColorJitter(),
            transforms.RandomHorizontalFlip(),
            transforms.CenterCrop(size=224),  # ImageNet standard
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406],
                                 [0.229, 0.224, 0.225])
        ]),
        'valid': transforms.Compose([
            transforms.Resize(size=256),
            transforms.CenterCrop(size=224),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406],
                                 [0.229, 0.224, 0.225])
        ]),
        'test': transforms.Compose([
            transforms.Resize(size=256),
            transforms.CenterCrop(size=224),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406],
                                 [0.229, 0.224, 0.225])
        ]),
    }
    data_dirs = { 'train': os.path.join(datadir, 'train'),
                  'valid': os.path.join(datadir, 'valid'),
                  'test' : os.path.join(datadir, 'test') }

    datasets_dict = {
        x: datasets.ImageFolder(root=data_dirs[x], transform=image_transforms[x])
        for x in ['train', 'valid', 'test']
    }
    dataloaders = {
        'train': DataLoader(datasets_dict['train'], batch_size=batch_size, shuffle=True, num_workers=num_workers),
        'val':   DataLoader(datasets_dict['valid'], batch_size=batch_size, shuffle=False, num_workers=num_workers),
        'test':  DataLoader(datasets_dict['test'], batch_size=batch_size, shuffle=False, num_workers=num_workers),
    }
    return dataloaders, datasets_dict



In [25]:
def prepare_model(num_classes: int, device: torch.device) -> nn.Module:
    # Load pretrained VGG16
    model = models.vgg16(pretrained=True)

    # Freeze all parameters initially
    for param in model.parameters():
        param.requires_grad = False

    if isinstance(model.classifier[6], nn.Linear):
        n_inputs = model.classifier[6].in_features
    else:
        # fallback: flatten size after features -> 25088 for standard VGG16 224x224
        n_inputs = 25088

   # Ensure only classifier[6] parameters are trainable:
    for name, param in model.named_parameters():
        # any parameter in classifier[6] should have requires_grad=True
        if name.startswith('classifier.6'):
            param.requires_grad = True
        else:
            param.requires_grad = False

    model = model.to(device)
    return model


In [31]:
def train_model(model: nn.Module, dataloaders: Dict[str, DataLoader], dataset_sizes: Dict[str, int],
                device: torch.device, epochs: int = 10, lr: float = 1e-3, save_path: str = "best_vgg16.pth"):
    # Only parameters that require grad are passed to optimizer
    params_to_update = [p for p in model.parameters() if p.requires_grad]
    optimizer = torch.optim.Adam(params_to_update, lr=lr)
    criterion = nn.NLLLoss()  # because model outputs LogSoftmax

    best_acc = 0.0
    for epoch in range(1, epochs + 1):
        print(f"\nEpoch {epoch}/{epochs}")
        print("-" * 20)

    # Each epoch has train and val phases
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()
            else:
                model.eval()

            running_loss = 0.0
            running_corrects = 0

            loader = dataloaders['train'] if phase == 'train' else dataloaders['val']
            for inputs, labels in loader:
                inputs = inputs.to(device)
                labels = labels.to(device)

                optimizer.zero_grad()
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    loss = criterion(outputs, labels)
                    _, preds = torch.max(outputs, 1)

                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data).item()

            epoch_loss = running_loss / dataset_sizes['train'] if phase == 'train' else running_loss / dataset_sizes['valid']
            epoch_acc  = running_corrects / dataset_sizes['train'] if phase == 'train' else running_corrects / dataset_sizes['valid']

            print(f"{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}")

             # deep copy the model
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                torch.save({
                    'epoch': epoch,
                    'model_state_dict': model.state_dict(),
                    'optimizer_state_dict': optimizer.state_dict(),
                    'best_acc': best_acc
                }, save_path)
                print(f"Saved best model with val acc: {best_acc:.4f}")

    print(f"\nTraining complete. Best val Acc: {best_acc:.4f}")


In [32]:
def evaluate_on_test(model: nn.Module, dataloader: DataLoader, device: torch.device):
    model.eval()
    running_corrects = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs = inputs.to(device)
            labels = labels.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            running_corrects += torch.sum(preds == labels.data).item()
            total += inputs.size(0)
    acc = running_corrects / total if total > 0 else 0.0
    print(f"Test Accuracy: {acc:.4f}")
    return acc

In [45]:
def main():
    parser = argparse.ArgumentParser(description="Transfer learning with VGG16")
    parser.add_argument('--source_dir', type=str, required=True,
                        help="Source dataset root with class subfolders. e.g. ./caltech_images")
    parser.add_argument('--work_dir', type=str, default='./datadir',
                        help="Destination root where train/valid/test folders will be created")
    parser.add_argument('--batch_size', type=int, default=32)
    parser.add_argument('--epochs', type=int, default=8)
    parser.add_argument('--lr', type=float, default=1e-3)
    parser.add_argument('--num_workers', type=int, default=4)
    parser.add_argument('--seed', type=int, default=42)
    parser.add_argument('--save_path', type=str, default='best_vgg16.pth')
    args = parser.parse_args()

     # 1) Split and copy the dataset into train/valid/test
    split_dataset_into_dirs(args.source_dir, args.work_dir, split=(0.5, 0.25, 0.25), seed=args.seed)

    # 2) Prepare dataloaders
    dataloaders, datasets_dict = build_dataloaders(args.work_dir, batch_size=args.batch_size, num_workers=args.num_workers)
    dataset_sizes = {'train': len(datasets_dict['train']), 'valid': len(datasets_dict['valid']), 'test': len(datasets_dict['test'])}
    class_names = datasets_dict['train'].classes
    num_classes = len(class_names)
    print(f"Found classes: {class_names}, num_classes={num_classes}")
    print(f"Dataset sizes: {dataset_sizes}")

    # 3) Device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    # 4) Prepare model
    model = prepare_model(num_classes=num_classes, device=device)
    print("Model prepared. Trainable parameters:")
    for name, param in model.named_parameters():
        if param.requires_grad:
            print("  ", name, param.size())

     # 5) Train
    train_model(model, dataloaders, dataset_sizes, device, epochs=args.epochs, lr=args.lr, save_path=args.save_path)

     # 6) Load best model and evaluate on test
    if os.path.exists(args.save_path):
        ckpt = torch.load(args.save_path, map_location=device)
        model.load_state_dict(ckpt['model_state_dict'])
        print(f"Loaded best model from epoch {ckpt.get('epoch', '?')} with val acc {ckpt.get('best_acc', '?')}")
    evaluate_on_test(model, dataloaders['test'], device)

