In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms, models
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
import copy
import os
import tarfile
import urllib.request
import scipy.io
from shutil import copy2

# Function to download and extract dataset
def download_and_extract_flowers102(data_dir="flowers102"):
    url = "https://www.robots.ox.ac.uk/~vgg/data/flowers/102/102flowers.tgz"
    labels_url = "https://www.robots.ox.ac.uk/~vgg/data/flowers/102/imagelabels.mat"
    splits_url = "https://www.robots.ox.ac.uk/~vgg/data/flowers/102/setid.mat"

    if not os.path.exists(data_dir):
        os.makedirs(data_dir)

    # Download and extract images
    print("Downloading and extracting images...")
    images_path = os.path.join(data_dir, "102flowers.tgz")
    if not os.path.exists(images_path):
        urllib.request.urlretrieve(url, images_path)
    with tarfile.open(images_path, "r:gz") as tar:
        tar.extractall(path=data_dir)

    # Download labels
    print("Downloading labels...")
    labels_path = os.path.join(data_dir, "imagelabels.mat")
    if not os.path.exists(labels_path):
        urllib.request.urlretrieve(labels_url, labels_path)

    # Download splits
    print("Downloading splits...")
    splits_path = os.path.join(data_dir, "setid.mat")
    if not os.path.exists(splits_path):
        urllib.request.urlretrieve(splits_url, splits_path)

    print("Dataset ready.")

# Function to organize dataset into train, val, and test splits
def organize_dataset(data_dir="flowers102"):
    # Load setid.mat for train, val, test splits
    setid_path = os.path.join(data_dir, "setid.mat")
    setid = scipy.io.loadmat(setid_path)

    splits = {
        'train': setid['trnid'][0],
        'val': setid['valid'][0],
        'test': setid['tstid'][0]
    }

    # Create train, val, test directories
    for split in splits:
        split_dir = os.path.join(data_dir, split)
        os.makedirs(split_dir, exist_ok=True)
        for label in range(1, 103):  # Flowers-102 has labels 1-102
            label_dir = os.path.join(split_dir, str(label))
            os.makedirs(label_dir, exist_ok=True)

    # Move images to corresponding directories
    image_dir = os.path.join(data_dir, "jpg")
    for split, indices in splits.items():
        for idx in indices:
            img_name = f"image_{idx:05d}.jpg"
            label = (idx - 1) // 80 + 1  # Images are grouped by 80 per class
            src_path = os.path.join(image_dir, img_name)
            dest_dir = os.path.join(data_dir, split, str(label))
            copy2(src_path, dest_dir)

    print("Dataset organized into train, val, and test splits.")

# Call the function to download and prepare the dataset
download_and_extract_flowers102()
organize_dataset()

# Check device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Data transforms
data_transforms = {
    'train': transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'test': transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])
}

# Load Flowers-102 dataset
data_dir = "flowers102"
image_datasets = {
    x: datasets.ImageFolder(root=f"{data_dir}/{x}", transform=data_transforms[x])
    for x in ['train', 'val', 'test']
}
dataloaders = {
    x: DataLoader(image_datasets[x], batch_size=32, shuffle=True, num_workers=2)
    for x in ['train', 'val', 'test']
}
dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val', 'test']}
class_names = image_datasets['train'].classes

# Fine-tuning function
# def fine_tune_model(model, num_classes):
#     model.fc = nn.Linear(model.fc.in_features, num_classes)
#     return model

def fine_tune_model(model, num_classes):
  """
  Fine-tunes the final layer of the model for the given number of classes.

  Args:
      model (torch.nn.Module): The model to be fine-tuned.
      num_classes (int): The number of classes in the classification task.

  Returns:
      torch.nn.Module: The fine-tuned model.
  """
  # Check if the model has an 'fc' attribute (ResNet)
  if hasattr(model, 'fc'):
    model.fc = nn.Linear(model.fc.in_features, num_classes)
  # Otherwise, check for 'classifier' (DenseNet)
  else:
    model.classifier = nn.Sequential(
      nn.Linear(model.classifier.in_features, num_classes)
    )
  return model

# Train and evaluate
def train_model(model, criterion, optimizer, scheduler, num_epochs=25):
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    training_loss = []
    validation_loss = []

    for epoch in range(num_epochs):
        print(f'Epoch {epoch + 1}/{num_epochs}')
        print('-' * 10)

        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()
            else:
                model.eval()

            running_loss = 0.0
            running_corrects = 0

            for inputs, labels in dataloaders[phase]:
                inputs, labels = inputs.to(device), labels.to(device)

                optimizer.zero_grad()

                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            if phase == 'train':
                scheduler.step()

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]

            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

            if phase == 'train':
                training_loss.append(epoch_loss)
            else:
                validation_loss.append(epoch_loss)

            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())

    model.load_state_dict(best_model_wts)
    return model, training_loss, validation_loss

# Evaluate model
def evaluate_model(model):
    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, labels in dataloaders['test']:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (preds == labels).sum().item()

    accuracy = correct / total
    print(f'Test Accuracy: {accuracy:.4f}')
    return accuracy

# Plot training and validation loss
def plot_loss(training_loss, validation_loss):
    plt.figure(figsize=(8, 6))
    plt.plot(training_loss, label='Training Loss')
    plt.plot(validation_loss, label='Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.title('Training and Validation Loss')
    plt.legend()
    plt.show()




Downloading and extracting images...
Downloading labels...
Downloading splits...
Dataset ready.
Dataset organized into train, val, and test splits.
Using device: cuda


In [14]:
models_to_train = {
    "DenseNet121": models.densenet121(pretrained=True),
    "ResNet50": models.resnet50(pretrained=True)

}

# for model_name, model in models_to_train.items():
#     print(f"\nTraining {model_name}...")

#     model = fine_tune_model(model, num_classes=len(class_names))
#     model = model.to(device)

#     criterion = nn.CrossEntropyLoss()
#     optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
#     scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

#     trained_model, train_loss, val_loss = train_model(model, criterion, optimizer, scheduler, num_epochs=20)
#     plot_loss(train_loss, val_loss)

#     print(f"Evaluating {model_name}...")
#     evaluate_model(trained_model)

for model_name, model in models_to_train.items():
  print(f"\nTraining {model_name}...")

  model = fine_tune_model(model, num_classes=len(class_names))
  model = model.to(device)

  criterion = nn.CrossEntropyLoss()
  optimizer = optim.Adam(model.parameters(), lr=0.0001)
  scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

  trained_model, train_loss, val_loss = train_model(model, criterion, optimizer, scheduler, num_epochs=20)
  plot_loss(train_loss, val_loss)

  print(f"Evaluating {model_name}...")
  evaluate_model(trained_model)


Training DenseNet121...
Epoch 1/20
----------
train Loss: 4.5027 Acc: 0.0494


KeyboardInterrupt: 