In [5]:
import pandas as pd
import data_preprocessing as dp

In [6]:
# Load split df
df = pd.read_csv('../results/splits_trained_on.csv')

# Define split columns
split_columns = ['time-aware', 'encounter', 'random']

# Iterate over each split column and print summary statistics
for split_column in split_columns:
    dp.print_split_summary(df, split_column)

# Get label maps
label_map, inverse_label_map = dp.get_data_label_map(df)

Summary for time-aware:
Number of images per split:
time-aware
train     5182
test      1070
val        680
ignore     650
Name: count, dtype: int64
Proportions of train/val/test splits:
time-aware
train     0.683461
test      0.141124
val       0.089686
ignore    0.085729
Name: count, dtype: float64
Number of individuals in train split: 357
Number of individuals in val split: 53
Number of individuals in test split: 75

Summary for encounter:
Number of images per split:
encounter
train    5552
test     1035
val       995
Name: count, dtype: int64
Proportions of train/val/test splits:
encounter
train    0.732261
test     0.136508
val      0.131232
Name: count, dtype: float64
Number of individuals in train split: 400
Number of individuals in val split: 349
Number of individuals in test split: 388

Summary for random:
Number of images per split:
random
train    5785
test     1073
val       724
Name: count, dtype: int64
Proportions of train/val/test splits:
random
train    0.762991
test   

# Base Predictions

In [14]:
import torch
from models import load_model_and_data
from eval import predict_ce, predict_arc

In [15]:
losses = ['crossentropy', 'arcface']
splits = ['encounter']
augs = ['base', 'mixed']

for split in splits:
    for loss in losses:
        for aug in augs:
            # Loading
            model_name = loss + '_' + aug + '_' + split
            result = load_model_and_data(model_name + '.pth', loss, split, label_map)

            model = result['model']
            test_loader = result['test_loader']
            clean_train_loader = result['clean_train_loader']

            # Define column name
            pred_column_name = model_name + '_none'
            df[pred_column_name] = 'None'

            if loss == 'arcface':
                predictions, ground_truth = predict_arc(model, test_loader, clean_train_loader)
            elif loss == 'crossentropy':
                predictions, ground_truth = predict_ce(model, test_loader)

            df.loc[df[split]=="test", pred_column_name] = [inverse_label_map[split][prediction] for prediction in predictions]

In [11]:
losses = ['crossentropy']
splits = ['encounter']

for split in splits:
    for loss in losses:
        # Loading
        model_name = loss + '_mixed_' + split
        result = load_model_and_data(model_name + '.pth', loss, split, label_map)

        model = result['model']
        test_loader = result['test_loader']
        clean_train_loader = result['clean_train_loader']

        # Define column name
        pred_column_name = loss + '_' + split + '_base_none'
        df[pred_column_name] = 'None'

        if loss == 'arcface':
            predictions, ground_truth = predict_arc(model, test_loader, clean_train_loader)
        elif loss == 'crossentropy':
            predictions, ground_truth = predict_ce(model, test_loader)

        df.loc[df[split]=="test", pred_column_name] = [inverse_label_map[split][prediction] for prediction in predictions]

# Low Resolution

In [12]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as T
import torchvision.transforms.functional as TF
from torchvision.transforms.functional import to_pil_image
from PIL import Image
import random
import numpy as np

In [13]:
def resolution_distortion(images, scale_factor=0.5):
    """
    Lowers the resolution of the images and then scales them back to 224x224.
    
    Args:
    images (torch.Tensor): Input images of shape (B, C, H, W)
    scale_factor (float): Factor to scale down the image resolution
    
    Returns:
    torch.Tensor: Distorted images of shape (B, C, 224, 224)
    """
    B, C, H, W = images.shape
    
    # Calculate new dimensions
    new_H, new_W = int(H * scale_factor), int(W * scale_factor)
    
    # Define the transformation pipeline
    transform = T.Compose([
        T.Resize((new_H, new_W), antialias=True),  # Lower resolution
        T.Resize((224, 224), antialias=True)  # Scale back to 224x224
    ])
    
    # Apply the transformation
    distorted_images = torch.stack([transform(img) for img in images])
    
    return distorted_images

In [14]:
scale_factors = [1, 0.75, 0.5, 0.25, 0.15, 0.1]

losses = ['crossentropy']
splits = ['encounter']

for split in splits:
    for loss in losses:
        for sf in scale_factors:
            # Loading
            model_name = loss + '_base_' + split
            distortion_func=lambda x: resolution_distortion(x, scale_factor=sf)
            result = load_model_and_data(model_name + '.pth', loss, split, label_map)

            model = result['model']
            test_loader = result['test_loader']
            clean_train_loader = result['clean_train_loader']

            # Define column name
            pred_column_name = model_name + f'_lr(sf={sf})'
            df[pred_column_name] = 'None'

            if loss == 'arcface':
                predictions, ground_truth = predict_arc(model, test_loader, clean_train_loader, distortion_func=distortion_func)
            elif loss == 'crossentropy':
                predictions, ground_truth = predict_ce(model, test_loader, distortion_func=distortion_func)

            df.loc[df[split]=="test", pred_column_name] = [inverse_label_map[split][prediction] for prediction in predictions]

In [15]:
scale_factors = [1, 0.75, 0.5, 0.25, 0.15, 0.1]

losses = ['arcface']
splits = ['encounter']

for split in splits:
    for loss in losses:
        for sf in scale_factors:
            # Loading
            model_name = loss + '_mixed_' + split
            distortion_func=lambda x: resolution_distortion(x, scale_factor=sf)
            result = load_model_and_data(model_name + '.pth', loss, split, label_map)

            model = result['model']
            test_loader = result['test_loader']
            clean_train_loader = result['clean_train_loader']

            # Define column name
            pred_column_name = model_name + f'_lr(sf={sf})'
            df[pred_column_name] = 'None'

            if loss == 'arcface':
                predictions, ground_truth = predict_arc(model, test_loader, clean_train_loader, distortion_func=distortion_func)
            elif loss == 'crossentropy':
                predictions, ground_truth = predict_ce(model, test_loader, distortion_func=distortion_func)

            df.loc[df[split]=="test", pred_column_name] = [inverse_label_map[split][prediction] for prediction in predictions]

# Motion Blur

In [16]:
import os
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision

Page where I got motion blur from: https://www.kaggle.com/code/limitz/random-motion-blur

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

def random_motion(steps=16, initial_vector=None, alpha=0.2):
    if initial_vector is None:
        initial_vector = torch.randn(1, dtype=torch.cfloat)
    
    # Generate the random motion path
    motion = [torch.zeros_like(initial_vector)]
    for _ in range(steps):
        change = torch.randn(initial_vector.shape[0], dtype=torch.cfloat)
        initial_vector = initial_vector + change * alpha
        initial_vector /= initial_vector.abs().add(1e-8)
        motion.append(motion[-1] + initial_vector)
    
    motion = torch.stack(motion, -1)
    
    # Find bounding box
    real_min, _ = motion.real.min(dim=-1, keepdim=True)
    real_max, _ = motion.real.max(dim=-1, keepdim=True)
    imag_min, _ = motion.imag.min(dim=-1, keepdim=True)
    imag_max, _ = motion.imag.max(dim=-1, keepdim=True)

    # Scale motion to fit exactly in steps x steps
    real_scale = (steps - 1) / (real_max - real_min)
    imag_scale = (steps - 1) / (imag_max - imag_min)
    scale = torch.min(real_scale, imag_scale)
    
    real_shift = (steps - (real_max - real_min) * scale) / 2 - real_min * scale
    imag_shift = (steps - (imag_max - imag_min) * scale) / 2 - imag_min * scale
    
    scaled_motion = motion * scale + (real_shift + 1j * imag_shift)
    
    # Create kernel
    kernel = torch.zeros(initial_vector.shape[0], 1, steps, steps)
    
    # Fill kernel
    for s in range(steps + 1):
        v = scaled_motion[:, s]
        x = torch.clamp(v.real, 0, steps - 1)
        y = torch.clamp(v.imag, 0, steps - 1)
        
        ix = x.long()
        iy = y.long()
        
        vx = x - ix.float()
        vy = y - iy.float()
        
        for i in range(initial_vector.shape[0]):
            kernel[i, 0, iy[i], ix[i]] += (1-vx[i]) * (1-vy[i]) / steps
            if ix[i] + 1 < steps:
                kernel[i, 0, iy[i], ix[i]+1] += vx[i] * (1-vy[i]) / steps
            if iy[i] + 1 < steps:
                kernel[i, 0, iy[i]+1, ix[i]] += (1-vx[i]) * vy[i] / steps
            if ix[i] + 1 < steps and iy[i] + 1 < steps:
                kernel[i, 0, iy[i]+1, ix[i]+1] += vx[i] * vy[i] / steps

    # Normalize the kernel
    kernel /= kernel.sum(dim=(-1, -2), keepdim=True)
    
    return kernel

class RandomMotionBlur(nn.Module):
    """
    Apply random motion blur to input tensors.
    """
    def __init__(self, steps=17, alpha=0.2):
        """
        Initialize the RandomMotionBlur module.
        
        Args:
        - steps (int): Number of steps in the motion path
        - alpha (float): Controls the randomness of the motion path
        """
        super().__init__()
        self.steps = steps
        self.alpha = alpha
       
    def forward(self, x, return_kernel=False):
        """
        Apply random motion blur to the input tensor.
        
        Args:
        - x (torch.Tensor): Input tensor to be blurred
        - return_kernel (bool): If True, return both blurred tensor and kernel
        
        Returns:
        - y (torch.Tensor): Blurred tensor
        - m (torch.Tensor, optional): Blur kernel, if return_kernel is True
        """
        # Add an extra dimension in x
        if x.dim() == 3:
            x = x.unsqueeze(0)

        # Generate a random initial vector
        vector = torch.randn(x.shape[0], dtype=torch.cfloat) / 3
        vector.real /= 2
        
        # Create the motion blur kernel
        m = random_motion(self.steps, vector, alpha=self.alpha)
        
        # Pad the input tensor for convolution
        xpad = [m.shape[-1]//2+1] * 2 + [m.shape[-2]//2+1] * 2
        x = F.pad(x, xpad)
        
        # Pad the kernel to match input size
        mpad = [0, x.shape[-1]-m.shape[-1], 0, x.shape[-2]-m.shape[-2]]
        mp = F.pad(m, mpad)
        
        # Apply blur in the frequency domain
        fx = torch.fft.fft2(x)  # FFT of input
        fm = torch.fft.fft2(mp)  # FFT of kernel
        fy = fx * fm  # Multiplication in frequency domain
        y = torch.fft.ifft2(fy).real  # Inverse FFT to get blurred result
        
        # Crop the result to original size
        y = y[...,xpad[2]:-xpad[3], xpad[0]:-xpad[1]]
        
        return y if not return_kernel else (y, m)


In [18]:
step_sizes = [4, 8, 12, 16, 20, 24]

for split in ['encounter']:
    for loss in ['crossentropy']:
        for ss in step_sizes:
            # Loading
            model_name = loss + '_base_' + split
            blur = RandomMotionBlur(steps=ss)
            distortion_func=lambda x: blur(x)
            result = load_model_and_data(model_name + '.pth', loss, split, label_map)

            model = result['model']  
            test_loader = result['test_loader']
            clean_train_loader = result['clean_train_loader']

            # Define column name
            pred_column_name = model_name + f'_mb(ss={ss})'
            df[pred_column_name] = 'None'

            if loss == 'arcface':
                predictions, ground_truth = predict_arc(model, test_loader, clean_train_loader, distortion_func=distortion_func)
            elif loss == 'crossentropy':
                predictions, ground_truth = predict_ce(model, test_loader, distortion_func=distortion_func)

            df.loc[df[split]=="test", pred_column_name] = [inverse_label_map[split][prediction] for prediction in predictions]

In [19]:
step_sizes = [4, 8, 12, 16, 20, 24]

for split in ['encounter']:
    for loss in ['arcface']:
        for ss in step_sizes:
            # Loading
            model_name = loss + '_mixed_' + split
            blur = RandomMotionBlur(steps=ss)
            distortion_func=lambda x: blur(x)
            result = load_model_and_data(model_name + '.pth', loss, split, label_map)

            model = result['model']  
            test_loader = result['test_loader']
            clean_train_loader = result['clean_train_loader']

            # Define column name
            pred_column_name = model_name + f'_mb(ss={ss})'
            df[pred_column_name] = 'None'

            if loss == 'arcface':
                predictions, ground_truth = predict_arc(model, test_loader, clean_train_loader, distortion_func=distortion_func)
            elif loss == 'crossentropy':
                predictions, ground_truth = predict_ce(model, test_loader, distortion_func=distortion_func)

            df.loc[df[split]=="test", pred_column_name] = [inverse_label_map[split][prediction] for prediction in predictions]

# Gaussian Blur

In [23]:
import torchvision.transforms.functional as TF

def gaussian_blur(images, kernel_size=5, sigma=2.0):
    return TF.gaussian_blur(images, kernel_size, sigma) if sigma > 0 else images

In [None]:
kernel_sizes = [3, 5, 7, 9, 11, 13, 15, 17]
sigma_values = [1.0, 2.0, 3.0, 5.0, 7.0, 10.0]

models = ['base', 'mixed']

for split in ['encounter']:
    for loss in ['arcface', 'crossentropy']:
        for model_type in models:
            print(model_type)
            for kernel_size in kernel_sizes:
                for sigma in sigma_values:
                    # Loading
                    model_name = loss + '_' + model_type + '_' + split
                    distortion_func=lambda x: gaussian_blur(x, kernel_size=kernel_size, sigma=sigma)
                    result = load_model_and_data(model_name + '.pth', loss, split, label_map)

                    model = result['model']
                    test_loader = result['test_loader']
                    clean_train_loader = result['clean_train_loader']

                    # Define column name
                    pred_column_name = model_name + f'_gb(ks={kernel_size}, sigma={sigma})'
                    df[pred_column_name] = 'None'

                    if loss == 'arcface':
                        predictions, ground_truth = predict_arc(model, test_loader, clean_train_loader, distortion_func=distortion_func)
                    elif loss == 'crossentropy':
                        predictions, ground_truth = predict_ce(model, test_loader, distortion_func=distortion_func)

                    df.loc[df[split]=="test", pred_column_name] = [inverse_label_map[split][prediction] for prediction in predictions]

In [26]:
df.to_csv('../results/predictions/all_predictions.csv', index=False)