# Intrinsic Image Popularity Assessment
This notebook is an implementation of the model described by Ding et al. in the paper "Intrinsic Image Popularity Assessment" (https://arxiv.org/pdf/1907.01985.pdf).

### Import libraries
- Torch: Build and train the dataset and model
- Torchvision: Transform images and get the pretrained ResNet-50 model
- os: Get image paths
- PIL: Load images
- tqdm: Fancy progress bar
- matplotlib: Display images

In [1]:
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
from torch.optim.lr_scheduler import StepLR
from torchvision import transforms
from torchvision.models import resnet50
from torchvision.utils import make_grid

import os
from PIL import Image
from tqdm.auto import tqdm
import matplotlib.pyplot as plt

### Parameters
Parameters of the model as described in the paper.

- `lr_pretrained`: Learning rate of ResNet-50
- `lr_last`: Learning rate of the last linear layer
- `opt_weight_decay`: L2 penalty multiplier for Adam
- `lr_decay`: Learning rate decay
- `batch_size`: Batch size
- `criterion`: Loss function (binary cross-entropy)
- `n_epochs`: Number of epochs
- `img_rescale_dim`: length of shortest side of image after rescaling
- `img_dim`: length and width of image after cropping

Additionally we specify the number of steps before showing results during training (`display_step`) and the device.

In [2]:
lr_pretrained = 1e-5
lr_last = 1e-4
opt_weight_decay = 1e-4
lr_decay = 0.95
batch_size = 64
criterion = nn.BCEWithLogitsLoss()
n_epochs = 14

img_rescale_dim = 256
img_dim = 224

display_step = 10

device = 'cuda' if torch.cuda.is_available() else 'cpu'

### Helper Function: show_images
Create a helper function to display images. It first unnormalizes the images, and then displays `n_images` number of images. Change `n_row` to adjust the dimensions of the image grid.

In [3]:
inv_normalize = transforms.Normalize(
    mean=[-0.485/0.229, -0.456/0.224, -0.406/0.225], 
    std=[1/0.229, 1/0.224, 1/0.225]
)

def show_images(images, normalized=True, size=(3, img_dim, img_dim), n_images=4, n_row=2):
    """
    Helper function for displaying images.
    images - The image tensor.
    normalized - If the images are normalized as specified for the model
    size - The size of each image (channels, height, width).
    n_images - The number of images to be displayed.
    n_row - The number of rows in the image grid.
    """
    
    # Inverse normalize
    # Formula: mean = -mean/std, std = 1/std
    if normalized:
        images = inv_normalize(images)
    
    # Display images
    image_unflat = images.detach().cpu().view(-1, *size)
    image_grid = make_grid(image_unflat[:n_images], nrow=n_row)
    plt.imshow(image_grid.permute(1, 2, 0).squeeze())
    plt.show()

### Create dataset
Create training, validation, and test sets. 

Define a paired dataset, where each item is a tuple (high popularity image, low popularity image). As specified in the paper, we perform the following preprocessing steps:
1. Convert the image to RGB format
2. Normalize the image to be between the range of [0, 1]
3. Resize the image so that the shortest side has length 256
4. Crop the image randomly to have dimensions 224 x 224
5. Normalize the image to have a mean of [0.485, 0.456, 0.406] and a standard deviation of [0.229, 0.224, 0.225].

In [4]:
class PairedDataset(Dataset):
    def __init__(self, im_1_paths, im_2_paths, transform):
        self.im_1_paths = im_1_paths
        self.im_2_paths = im_2_paths
        self.transform = transform
        
    def __getitem__(self, index):
        x = Image.open(self.im_1_paths[index]).convert('RGB')
        y = Image.open(self.im_2_paths[index]).convert('RGB')
        
        x = self.transform(x)
        y = self.transform(y)

        return x, y

    def __len__(self):
        return len(self.im_1_paths)

In [5]:
# Get image paths
file_list_high = []
file_list_low = []

for i in os.listdir('../input/pdip-instagram/High_popularity'):
    file_list_high.append(f'../input/pdip-instagram/High_popularity/{i}')
    file_list_low.append(f'../input/pdip-instagram/Low_popularity/{i}')
    
file_list_high.sort()
file_list_low.sort()

In [6]:
# Preprocess images
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize(img_rescale_dim),
    transforms.RandomCrop(img_dim),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [7]:
# Split data into train, validation, and test sets
val_split = len(file_list_high) * 8 // 10
test_split = len(file_list_high) * 9 // 10
train_ds = PairedDataset(file_list_high[:val_split], file_list_low[:val_split], transform)
val_ds = PairedDataset(file_list_high[val_split:test_split], file_list_low[val_split:test_split], transform)
test_ds = PairedDataset(file_list_high[test_split:], file_list_low[test_split:], transform)

### Create the model
Load the pretrained ResNet-50 model which is trained on the ImageNet-1k dataset. We fine-tune the base model by replacing the output layer with a fully-connected layer that consists of a single neuron.

As specified in the paper, we have different learning rates for the base model and the fully-connected layer. We also define a learning rate decay that is updated every epoch.

In [8]:
model = resnet50(pretrained=True)
model.fc = nn.Linear(model.fc.in_features, 1)
model = model.to(device)

Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to /root/.cache/torch/hub/checkpoints/resnet50-0676ba61.pth


  0%|          | 0.00/97.8M [00:00<?, ?B/s]

In [9]:
pretrained_params = []
for name, param in model.named_parameters():
    if name != 'fc.weight' and name != 'fc.bias':
        pretrained_params.append(param)

In [10]:
opt = torch.optim.Adam([
        {'params': pretrained_params, 'lr': lr_pretrained}, 
        {'params': model.fc.parameters(), 'lr': lr_last}
], weight_decay=opt_weight_decay)
scheduler = StepLR(opt, 1, gamma=lr_decay, verbose=True)

Adjusting learning rate of group 0 to 1.0000e-05.
Adjusting learning rate of group 1 to 1.0000e-04.


In [11]:
pretrained = False
if pretrained:
    loaded_state = torch.load("../input/resnet50-checkpoints/resnet50_12.pth", map_location=device)
    model.load_state_dict(loaded_state['model'])
    opt.load_state_dict(loaded_state['opt'])
    scheduler.load_state_dict(loaded_state['scheduler'])
else:
    nn.init.kaiming_normal_(model.fc.weight, mode='fan_in')

### Train model
Train the model on the PDIP dataset by using a Siamese architecture. First we pass the high and low popularity images separately through the model and then store the output value as the image's popularity score. Then we subtract the low popularity score from the high popularity score to get the model's prediction as to which image is more popular. 

Through the binary cross entropy with logits loss, we calculate the sigmoid of the difference. If the model predicts that the high popularity image is more popular, it will output a value close to 1. On the other hand, if the model predicts that the low popularity image is more popular, it will output a value close to 0. We then take this value and compare it to the ground truth value of 1, where a value closer to 1 corresponds to a lower loss.

In [13]:
def train(save_model=True):
    dataloader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
    cur_step = 0
    total_loss = 0
    
    for epoch in range(n_epochs):
        for high, low in tqdm(dataloader):
            high = high.to(device)
            low = low.to(device)
            
            high_pred = model(high)
            low_pred = model(low)
            print(high_pred, low_pred)
            
            loss = criterion(high_pred - low_pred, torch.ones_like(high_pred))
            
            opt.zero_grad()
            loss.backward()
            opt.step()
            
            total_loss += loss.item()
            
            if cur_step % display_step == 0 and display_step > 0:
                print(f"Epoch: {epoch} \t Step: {cur_step} \t Loss: {total_loss / display_step}")
                total_loss = 0
            cur_step += 1
            
        scheduler.step()
        
        if save_model:
            torch.save({
                "model": model.state_dict(),
                "opt": opt.state_dict(),
                "scheduler": scheduler.state_dict()
            }, f"resnet50_{epoch}.pth")

In [14]:
train()

### Validation
Check the model with the validation set. If the model did not overfit the training data, it should output a loss close to the loss seen during training.

In [21]:
def validate():
    dataloader = DataLoader(val_ds, batch_size=batch_size)
    total_loss = 0
    
    for high, low in tqdm(dataloader):
        high = high.to(device)
        low = low.to(device)
        
        with torch.no_grad():
            high_pred = model(high)
            low_pred = model(low)

            loss = criterion(high_pred - low_pred, torch.ones_like(high_pred))
            total_loss += loss.item()
            
    print("Loss:", total_loss / len(dataloader))

In [1]:
validate()

In [17]:
def predict(img_path, transform):
    img = Image.open(img_path).convert('RGB')
    img = transform(img).to(device)
    img = torch.unsqueeze(img, 0)
    
    with torch.no_grad():
        img_pred = model(img)
        
    return img_pred