In [None]:
#imports
import os
import sys
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"
import pandas as pd
import numpy as np
import joblib
from datetime import datetime
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
#from torchinfo import summary
#import torchvision.transforms.functional as F
import torch.nn.functional as F
from torchvision.transforms import InterpolationMode
from torchvision import datasets, transforms
from torch.utils.data import Dataset,DataLoader, random_split
from torchvision.models import efficientnet_v2_s, EfficientNet_V2_S_Weights, alexnet, AlexNet_Weights
from torchvision.models import resnet50, ResNet50_Weights, resnet18, ResNet18_Weights
from torch.optim.lr_scheduler import CosineAnnealingLR

from tqdm import tqdm

import cv2
from PIL import Image
source_path = os.path.abspath(os.path.join(os.getcwd(), '..', '..'))
sys.path.append(source_path)

# Definitions

In [None]:
transform_resnet = transforms.Compose([
                    transforms.Resize(256, interpolation=transforms.InterpolationMode.BILINEAR),
                    transforms.CenterCrop(224),
                    transforms.ToTensor(),
                    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
                ])

In [None]:
# ------------------------------
# Data Augmentation for Contrastive Learning
# ------------------------------
color_jitter = transforms.ColorJitter(brightness=0.4, contrast=0.4, saturation=0.4, hue=0.1)

ContrastiveTransform = transforms.Compose([
    transforms.RandomResizedCrop(256, scale=(0.8, 1.0), interpolation=InterpolationMode.BILINEAR),
    transforms.RandomHorizontalFlip(),
    transforms.RandomApply([color_jitter], p=0.8),
    transforms.RandomApply([transforms.GaussianBlur(kernel_size=3)], p=0.5),
    transforms.ToTensor(),
])

In [None]:
normalize = transforms.Normalize(
    mean=[0.485, 0.456, 0.406], 
    std=[0.229, 0.224, 0.225]
)

# SimCLR data augmentation transform
simclr_transform = transforms.Compose([
    transforms.RandomResizedCrop(size=224),
    transforms.RandomHorizontalFlip(),
    transforms.RandomApply([
        transforms.ColorJitter(0.8, 0.8, 0.8, 0.2)
    ], p=0.8),
    transforms.RandomGrayscale(p=0.2),
    transforms.GaussianBlur(kernel_size=23, sigma=(0.1, 2.0)),  # kernel_size ~ 0.1 * image size
    transforms.ToTensor(),
    normalize
])

In [None]:
class CustomPatchDataset(Dataset):
    def __init__(self, df,transform=ContrastiveTransform):
        """
        Args:
            image_dirs (list of str): List of directories to load images from.
            labels_df (DataFrame): DataFrame containing labeled images.
            transform (callable, optional): Optional transform to be applied on an image.
        """
        self.image_files = df['file_name'].tolist()
        self.x1 = df['x'].tolist()
        self.y1 = df['y'].tolist()
        self.x2 = df['x2'].tolist()
        self.y2 = df['y2'].tolist()
        self.transform = transform

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        img_path = self.image_files[idx]
        x1=self.x1[idx]
        y1=self.y1[idx]
        x2=self.x2[idx]
        y2=self.y2[idx]
        image = Image.open(img_path).convert("RGB")
        patch = image.crop((x1, y1, x2, y2))

        if self.transform:
            patch1 = self.transform(patch)
            patch2 = self.transform(patch)

        return {
            'image1': patch1,
            'image2': patch2,
        }

In [None]:
class ContrastiveModel(nn.Module):
    """ResNet Backbone + Projection Head for SimCLR."""
    def __init__(self, model, projection_dim=128):
        super().__init__()
        self.encoder = model
        self.encoder.fc = nn.Identity()  # Remove the classification head
        self.projection_head = nn.Sequential(
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Linear(256, projection_dim)
        )

    def forward(self, x):
        features = self.encoder(x)
        projections = self.projection_head(features)
        return F.normalize(projections, dim=1)

In [None]:
# ------------------------------
# NT-Xent Contrastive Loss
# ------------------------------
class NTXentLoss(nn.Module):
    """Normalized Temperature-scaled Cross Entropy Loss (SimCLR)."""
    def __init__(self, temperature=0.5):
        super().__init__()
        self.temperature = temperature
        self.criterion = nn.CrossEntropyLoss()

    def forward(self, z_i, z_j):
        batch_size = z_i.shape[0]
        z = torch.cat([z_i, z_j], dim=0)  # Stack positive pairs
        #print("stacked: ",z.shape)
        #print(z)
        similarity_matrix = torch.matmul(z, z.T)  # Cosine similarity
        #I don't normalize because the model already does it in the forward pass
        #print("similarity_matrix: ",similarity_matrix.shape)
        #print(similarity_matrix)
        
        # Remove self-similarity
        mask = torch.eye(2 * batch_size, dtype=torch.bool, device=z.device)
        similarity_matrix = similarity_matrix[~mask].view(2 * batch_size, -1)
        #print("similarity_matrix: ",similarity_matrix.shape)
        #print(similarity_matrix)
        
        # Compute positive pairs similarity
        '''
        positives = torch.cat([torch.diag(similarity_matrix, batch_size-1), 
                               torch.diag(similarity_matrix, -batch_size+1)], dim=0)
        '''
        
        # Compute NT-Xent loss
        #labels = torch.arange(2 * batch_size, device=z.device)
        labels = torch.cat([torch.arange(batch_size-1,2*batch_size-1, device=z.device),
                            torch.arange(batch_size, device=z.device)], dim=0)
        #print("labels: ",labels.shape)
        #print(labels)
        
        # Each row should have the highest score at its label index to be used by the crossentropy loss
        loss = self.criterion(similarity_matrix / self.temperature, labels)
        #labels should be the class indexes. The first argument are the logits.
        return loss

In [None]:
# ------------------------------
# Step 1: Contrastive Pretraining
# ------------------------------
def pretrain_contrastive(model, dataloader, optimizer, device, epochs=10):
    model.train()
    loss_fn = NTXentLoss()
    
    for epoch in range(epochs):
        epoch_loss = 0
        for batch in tqdm(dataloader, desc=f"Epoch {epoch+1}/{epochs}"):
            x_i, x_j = batch['image1'], batch['image2']
            x_i, x_j = x_i.to(device), x_j.to(device)
            z_i, z_j = model(x_i), model(x_j)
            loss = loss_fn(z_i, z_j)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            epoch_loss += loss.item()
        print(f"Epoch [{epoch+1}/{epochs}], Loss: {epoch_loss/len(dataloader):.4f}")

# Initialization

In [None]:
source_file="icdar_train_df_iam_rimes_patches_20250615_170212.csv"
running = 'new-laptop'
saved = 'new-laptop'
pretrained = True
selected_model='resnet50'
train_df = pd.read_csv(f"{source_path}\\outputs\\preprocessed_data\\{source_file}")
train_df=file_IO.change_filename_from_to(train_df, fr=saved, to=running)

In [None]:
initial_lr = 0.001
batch_size = 64

In [None]:
if selected_model == 'resnet50':
    weights = ResNet50_Weights.IMAGENET1K_V1 if pretrained else None
    model = resnet50(weights=weights)
else:
    raise ValueError(f"Model {selected_model} is not supported.")

In [None]:
# Set the probability of being 0
p_train = 0.9
N = train_df['index'].nunique()

# Create a dataframe with writer column from 1 to 282
pages_df = pd.DataFrame({'index': np.arange(1, N+1)})

# Add a train column that is randomly 0 or 1 with probability p of being 0
pages_df['train'] = np.random.choice([0, 1], size=len(pages_df), p=[1-p_train, p_train])

# Merge with the train_df dataframe on the writer column
train_df = train_df.merge(pages_df, on='index', how='left')

# Display the dataframe
train_df.head()

Unnamed: 0,writer,same_text,isEng,train
0,1,0,0,1
1,1,1,0,1
2,1,0,1,1
3,1,1,1,1
4,2,0,0,1


In [None]:
N_max=N
train_dataset = CustomPatchDataset(train_df[(train_df['train']==1) & (train_df['index']<=N_max)] ,transform=ContrastiveTransform)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

val_dataset = CustomPatchDataset(train_df[(train_df['train']==0) & (train_df['index']<=N_max)] , transform=ContrastiveTransform)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True)

In [None]:
# Create an iterator
data_iter = iter(train_dataloader)
# Get a single batch
batch = next(data_iter)
plot_image_batches(batch['image1'], batch['writer'], batch['label'])

In [None]:
contrastive_model = ContrastiveModel(model, projection_dim=128)

# run

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

contrastive_model = contrastive_model.to(device)
optimizer = optim.Adam(contrastive_model.parameters(), lr=initial_lr)

In [None]:
pretrain_contrastive(contrastive_model, train_dataloader, optimizer, device, epochs=20)

# save

In [None]:
experiment = datetime.now().strftime("%Y%m%d_%H%M%S")
out_dir = os.path.join(source_path, "outputs", "logs")
# Example usage:
LOG_OUT_FILE = out_dir+"\\feature_extraction_metadata_log.json"
print(f"Log file path: {LOG_OUT_FILE}")
file_IO.add_or_update_experiment(
    experiment, LOG_OUT_FILE,
    custom_metadata={
        "original raw file": source_data,
        "input file": input_file_name,
        "FE model": model_used,
        "FE transform": transform_used,
        "classifier model": selected_model,
        "model_params": feature_extraction_model[selected_model].get_params(),
        "n_splits": n_splits,
        "train_on_language": train_on_language,
        "train_on_same": train_on_same,
        "task": task,
        "with cross validation": with_cross_validation,
        "with PCA": with_pca,
        "training time for cross-validation": time_taken_cross_val,
        "training time for final model": time_taken,
        "cross_val_accuracies": cross_val_accuracies,
        "subgroup_accuracies": subgroup_accuracies,
        "is_kaggle": is_kaggle,
        "test": 'this is a test column',
        "description": ''' I am training a classifier on the feature vectors extracted by a deep model
        I am evaluating the results on subsets of the training data, based on language and same/different text.''' 
    }
)

# random tests

In [21]:
class RandomTensorDataset(Dataset):
    def __init__(self, num_samples, image_size=(256, 256, 3)):
        self.num_samples = num_samples
        self.image_size = image_size

    def __len__(self):
        return self.num_samples

    def __getitem__(self, idx):
        tensor1 = torch.rand(*self.image_size).permute(2, 0, 1)  # Convert to (C, H, W)
        #tensor2 = torch.rand(*self.image_size).permute(2, 0, 1)  # Convert to (C, H, W)
        tensor2 = tensor1.clone()
        return tensor1, tensor2

# Create the dataset and dataloader
random_dataset = RandomTensorDataset(num_samples=1000)
random_dataloader = DataLoader(random_dataset, batch_size=3, shuffle=True)

# Example: Fetch a batch
random_batch = next(iter(random_dataloader))
print(f"Batch tensor1 shape: {random_batch[0].shape}")
print(f"Batch tensor2 shape: {random_batch[1].shape}")

Batch tensor1 shape: torch.Size([3, 3, 256, 256])
Batch tensor2 shape: torch.Size([3, 3, 256, 256])


In [22]:
x_i,x_j = random_batch[0],random_batch[1]
z_i, z_j = contrastive_model(x_i), contrastive_model(x_j)
print(f"Batch representation1 shape: {z_i.shape}")
print(f"Batch representation2 shape: {z_j.shape}")

Batch representation1 shape: torch.Size([3, 10])
Batch representation2 shape: torch.Size([3, 10])


In [18]:
print(z_i[0]-z_j[0])

tensor([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0.], grad_fn=<SubBackward0>)


In [34]:
loss_fn = NTXentLoss()
loss = loss_fn(z_i, z_j)

stacked:  torch.Size([6, 10])
tensor([[-0.3102,  0.0861,  0.0577,  0.2929, -0.6085, -0.2531,  0.1815, -0.3245,
         -0.1531,  0.4597],
        [ 0.1333, -0.0379,  0.1585, -0.1918, -0.4724,  0.0645,  0.3494, -0.3464,
         -0.4000,  0.5380],
        [-0.2865, -0.0284,  0.3475, -0.2848, -0.5702, -0.1441,  0.1548, -0.4361,
         -0.2580,  0.2976],
        [-0.3102,  0.0861,  0.0577,  0.2929, -0.6085, -0.2531,  0.1815, -0.3245,
         -0.1531,  0.4597],
        [ 0.1333, -0.0379,  0.1585, -0.1918, -0.4724,  0.0645,  0.3494, -0.3464,
         -0.4000,  0.5380],
        [-0.2865, -0.0284,  0.3475, -0.2848, -0.5702, -0.1441,  0.1548, -0.4361,
         -0.2580,  0.2976]], grad_fn=<CatBackward0>)
similarity_matrix:  torch.Size([6, 6])
tensor([[1.0000, 0.6639, 0.7524, 1.0000, 0.6639, 0.7524],
        [0.6639, 1.0000, 0.8012, 0.6639, 1.0000, 0.8012],
        [0.7524, 0.8012, 1.0000, 0.7524, 0.8012, 1.0000],
        [1.0000, 0.6639, 0.7524, 1.0000, 0.6639, 0.7524],
        [0.6639, 1.0

# easy access

In [None]:
def reload_modules():
    import importlib
    import utils.image_processing as image_processing
    import utils.file_IO as file_IO
    import utils.visualization as visualization
    import utils.tests as tests

    importlib.reload(file_IO)
    importlib.reload(image_processing)
    importlib.reload(visualization)
    importlib.reload(tests)

    return image_processing, file_IO, visualization, tests
image_processing, file_IO, visualization, tests = reload_modules()

In [None]:
def plot_image_batches(batch1, batch2, n=8, figsize=(16, 4)):
    """
    Plots two batches of images: first row is batch1, second row is batch2.
    Args:
        batch1 (Tensor): Batch of images (B, C, H, W)
        batch2 (Tensor): Batch of images (B, C, H, W)
        n (int): Number of images to plot from each batch
        figsize (tuple): Figure size
    """
    import matplotlib.pyplot as plt

    n = min(n, batch1.shape[0], batch2.shape[0])
    fig, axes = plt.subplots(2, n, figsize=figsize)
    for i in range(n):
        img1 = batch1[i].cpu()
        img2 = batch2[i].cpu()
        # Unnormalize if needed (assuming ImageNet stats)
        if img1.shape[0] == 3:
            img1 = img1 * torch.tensor([0.229, 0.224, 0.225]).view(3,1,1) + torch.tensor([0.485, 0.456, 0.406]).view(3,1,1)
            img2 = img2 * torch.tensor([0.229, 0.224, 0.225]).view(3,1,1) + torch.tensor([0.485, 0.456, 0.406]).view(3,1,1)
            img1 = img1.clamp(0,1)
            img2 = img2.clamp(0,1)
        axes[0, i].imshow(img1.permute(1, 2, 0).numpy())
        axes[0, i].axis('off')
        axes[1, i].imshow(img2.permute(1, 2, 0).numpy())
        axes[1, i].axis('off')
    axes[0, 0].set_ylabel('Batch 1', fontsize=14)
    axes[1, 0].set_ylabel('Batch 2', fontsize=14)
    plt.tight_layout()
    plt.show()