In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
from torchvision import transforms
import torchvision.models as models
from torchvision.transforms import ToTensor, Resize
import os
from torch.utils.data import DataLoader, random_split
from torchvision.datasets import ImageFolder



First model "ConceptModel" (image(x) -> concept(c)) could just be a pretrained resnet. It should take an image as input and output a vector of size 112 representing the concepts (binary attributes)

In [2]:
class ConceptModel(nn.Module):
    def __init__(self):
        super(ConceptModel, self).__init__()
        # Pre-trained ResNet50
        self.base_model = models.resnet50(pretrained=True)
        self.base_model.fc = nn.Linear(self.base_model.fc.in_features, 112) #Updated last layer to 112
        self.sigmoid = nn.Sigmoid()
    
    def forward(self, x):
        x = self.base_model(x)
        x = self.sigmoid(x)  # Sigmoid for probabilities of concept?
        return x

Second part of the model "PredictionModel" (concepts(c) -> prediction(y)) should take the output vector from the conceptmodel in the first layer.

In [3]:
class PredictionModel(nn.Module):
    def __init__(self):
        super(PredictionModel, self).__init__()
        self.fc1 = nn.Linear(112, 256)  # Concept vector as input in the first layer
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(256, 200)  # Output layer for 200 bird species
        self.softmax = nn.Softmax(dim=1)
    
    def forward(self, c):
        c = self.relu(self.fc1(c))
        c = self.softmax(self.fc2(c))
        return c

## Bottleneck model (the two combined in one module)

In [4]:
class BottleneckModel(nn.Module):
    def __init__(self):
        super(BottleneckModel, self).__init__()
        self.concept_model = ConceptModel()
        self.prediction_model = PredictionModel()

    def forward(self, x):
        concepts = self.concept_model(x)
        predictions = self.prediction_model(concepts)
        return predictions

### Insert * NICE DATALOADER *

In [9]:
#root_dir = f'{os.getcwd()}/CUB_200_2011/images/'
root_dir = f'{os.getcwd()}\\CUB_200_2011\\images\\'

transform = transforms.Compose([
    Resize((299, 299)),  # Resize images to a fixed size, for example, 224x224
    ToTensor()           # Convert images to tensors
])
dataset = ImageFolder(root=root_dir,transform=transform)


In [10]:
train_size = int(0.8 * len(dataset))  # 80% for training
val_size = int(0.1 * len(dataset))    # 10% for validation
test_size = len(dataset) - train_size - val_size  # Remaining for testing

# Split the dataset randomly into train, validation, and test sets
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

# Print the sizes of the splits
print(f"Train set size: {len(train_dataset)}")
print(f"Validation set size: {len(val_dataset)}")
print(f"Test set size: {len(test_dataset)}")

Train set size: 9424
Validation set size: 1178
Test set size: 1179


In [12]:
import os
import random
import pickle
import argparse
from os import listdir
from os.path import isfile, isdir, join
from collections import defaultdict as ddict


def extract_data(data_dir):
    cwd = os.getcwd()
    data_path = join(cwd, data_dir + "\\images")
    val_ratio = 0.1

    path_to_id_map = dict()  # map from full image path to image id
    with open(data_path.replace("images", "images.txt"), "r") as f:
        for line in f:
            items = line.strip().split()
            path_to_id_map[join(data_path, items[1])] = int(items[0])

    attribute_labels_all = ddict(
        list
    )  # map from image id to a list of attribute labels
    attribute_certainties_all = ddict(
        list
    )  # map from image id to a list of attribute certainties
    attribute_uncertain_labels_all = ddict(
        list
    )  # map from image id to a list of attribute labels calibrated for uncertainty
    # 1 = not visible, 2 = guessing, 3 = probably, 4 = definitely
    uncertainty_map = {
        1: {
            1: 0,
            2: 0.5,
            3: 0.75,
            4: 1,
        },  # calibrate main label based on uncertainty label
        0: {1: 0, 2: 0.5, 3: 0.25, 4: 0},
    }
    with open(join(cwd, data_dir + "\\attributes\\image_attribute_labels.txt"), "r") as f:
        for line in f:
            file_idx, attribute_idx, attribute_label, attribute_certainty = (
                line.strip().split()[:4]
            )
            attribute_label = int(attribute_label)
            attribute_certainty = int(attribute_certainty)
            uncertain_label = uncertainty_map[attribute_label][attribute_certainty]
            attribute_labels_all[int(file_idx)].append(attribute_label)
            attribute_uncertain_labels_all[int(file_idx)].append(uncertain_label)
            attribute_certainties_all[int(file_idx)].append(attribute_certainty)

    is_train_test = dict()  # map from image id to 0 / 1 (1 = train)
    with open(join(cwd, data_dir + "\\train_test_split.txt"), "r") as f:
        for line in f:
            idx, is_train = line.strip().split()
            is_train_test[int(idx)] = int(is_train)
    print(
        "Number of train images from official train test split:",
        sum(list(is_train_test.values())),
    )

    train_val_data, test_data = [], []
    train_data, val_data = [], []
    folder_list = [f for f in listdir(data_path) if isdir(join(data_path, f))]
    folder_list.sort()  # sort by class index
    for i, folder in enumerate(folder_list[:2]):
        folder_path = join(data_path, folder)
        classfile_list = [
            cf
            for cf in listdir(folder_path)
            if (isfile(join(folder_path, cf)) and cf[0] != ".")
        ]
        # classfile_list.sort()
        for cf in classfile_list:
            img_id = path_to_id_map[join(folder_path, cf)]
            img_path = join(folder_path, cf)
            metadata = {
                "id": img_id,
                "img_path": img_path,
                "img": dataset[i],
                "class_label": i,
                "attribute_label": torch.tensor(attribute_labels_all[img_id],dtype=torch.float32),
                "attribute_certainty": attribute_certainties_all[img_id],
                "uncertain_attribute_label": attribute_uncertain_labels_all[img_id],
            }
            if is_train_test[img_id]:
                train_val_data.append(metadata)
                # if val_files is not None:
                #     if img_path in val_files:
                #         val_data.append(metadata)
                #     else:
                #         train_data.append(metadata)
            else:
                test_data.append(metadata)

    random.shuffle(train_val_data)
    split = int(val_ratio * len(train_val_data))
    train_data = train_val_data[split:]
    val_data = train_val_data[:split]
    print("Size of train set:", len(train_data))
    return train_data, val_data, test_data

In [13]:
data_dir = f'{os.getcwd()}\\CUB_200_2011'
train_dataset, val_dataset, test_dataset = extract_data(data_dir)
train_loader = DataLoader(train_dataset, batch_size=32)
val_loader = DataLoader(val_dataset, batch_size=32)
test_loader = DataLoader(test_dataset, batch_size=32)


Number of train images from official train test split: 5994
Size of train set: 54


## Loss Function and Optimizer

I was thinking CrossEntropyLoss. Since the bottleneck model includes pre-trained components we might want to use different learning rates for different parts of the model? but I think it is maybe possible with PyTorch optimizers..

In [14]:
model = BottleneckModel()
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

In [20]:
num_epochs = 10 
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for batch in train_loader:
        inputs = batch['img'][0]
        labels = batch['class_label']
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()  
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()  
        optimizer.step()
        
        running_loss += loss.item() * inputs.size(0)
    
    epoch_loss = running_loss / len(train_loader.dataset)
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}')

Epoch [1/10], Loss: 5.2897
Epoch [2/10], Loss: 5.2773
Epoch [3/10], Loss: 5.2565
Epoch [4/10], Loss: 5.2138
Epoch [5/10], Loss: 5.1288
Epoch [6/10], Loss: 4.9849
Epoch [7/10], Loss: 4.7918
Epoch [8/10], Loss: 4.6142
Epoch [9/10], Loss: 4.5019
Epoch [10/10], Loss: 4.4412
