# **Deep Learning Project**

Made by students:
  - **Emanuele Conforti (252122)**
  - **Jacopo Garofalo (252093)**
  - **Gianmarco La Marca (252256)**

## **Environment initialization**

The **aim** of the project is to **generate a report starting from chest x-rays images**.

## **TransformerChestX Notebook Description**

- In this notebook you can find all the code (and all the trials) we made on the transformer.

- We used two pre-trained transformer (**GPT2** and **BioGPT**)

- The notebook is divided into the following parts:

- *short pre-processing*: just building, preparing the dataset and importing the models (encoder, ff_mapper and pre-trained transformer);


- *text generation*: take an image, generate its latent space by using the ff_mapper and pass it to the pre-trained transformer to generate the report

- *statistical analysis*: analyze and compare the ff_mapper's output with the transformer's expected embeddings to check whether the mapper is generating meaningful embeddings

- *transformer fine-tuning*: fine-tuning on the pre-trained transformer to adapt it to our dataset (medical context)  

### **Running the code on Colab**

- Run the following cells with the variable **onColab = True** if you are on Colab.
- We reccomend to run all the codes on Kaggle!

In [None]:
onColab = False

if onColab:
    ! pip install kaggle
    ! mkdir ~/.kaggle
    ! cp kaggle.json ~/.kaggle/
    ! chmod 600 ~/.kaggle/kaggle.json

In [None]:
if onColab:
    ! kaggle datasets download raddar/chest-xrays-indiana-university

In [None]:
import zipfile
import os

if onColab:
    file_name = "chest-xrays-indiana-university.zip"
    
    # extract the file from the zip
    with zipfile.ZipFile(file_name, 'r') as zip_ref:
        zip_ref.extractall("chest_xrays_data")

In [None]:
if onColab:
    !ls chest_xrays_data

In [None]:
if onColab: 
    img_dir = 'chest_xrays_data/images/images_normalized/'
    reports_dir = 'chest_xrays_data/indiana_reports.csv'
    projections_dir = 'chest_xrays_data/indiana_projections.csv'
else:
    img_dir = '/kaggle/input/chest-xrays-indiana-university/images/images_normalized/'
    reports_dir = '/kaggle/input/chest-xrays-indiana-university/indiana_reports.csv'
    projections_dir = '/kaggle/input/chest-xrays-indiana-university/indiana_projections.csv'

In [None]:
! pip install pytorch-msssim

In [None]:
# for BioGPT tokenizer
!pip install sacremoses

In [None]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

In [None]:
from PIL import Image
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split

In [None]:
import torch
from torch import nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from transformers import GPT2Tokenizer, GPT2LMHeadModel, BioGptTokenizer, BioGptForCausalLM

import torch.optim as optim
from torch.optim import AdamW

from tqdm import tqdm
from tqdm.auto import trange

import torchvision
from torchvision import transforms as T

from pytorch_msssim import ssim  # import SSIM as loss function

In [None]:
torch.backends.cudnn.benchmark = True

device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

print(f"Using device: {torch.cuda.get_device_name(0)}" if torch.cuda.is_available() else "Using CPU")

## **Pre-processing**

Here we print the datasets, analyze data inside them and prepare them for the training phase

In [None]:
reports_df = pd.read_csv(reports_dir)
reports_df.head()

In [None]:
projections_df = pd.read_csv(projections_dir)
projections_df.head()

#### **Since we want to use the findings columns as labels (they represent the report we want to generate), we delete all the rows with null findings**

In [None]:
# filter the rows with null findings
reports_filtered = reports_df.dropna(subset=["findings"])

# keep only entries in projections that have a filtered report associated (association through uid)
projections_filtered = projections_df[projections_df["uid"].isin(reports_filtered["uid"])]
reports_filtered.shape, projections_filtered.shape

In [None]:
reports_filtered.isna().sum()

#### **Split the filtered dataset (containing only the UID column) in train and validation set**

In [None]:
VAL_SIZE = 0.1

uids = reports_filtered.uid.unique()

train_ds, val_ds = train_test_split(
    uids,
    test_size=VAL_SIZE,
)

len(train_ds), len(val_ds)

#### **Import the pre-trained transformer (GPT2 or BioGPT)**

In [None]:
def load_model_and_tokenizer(model_name="gpt2"): 
    if model_name == "BioGPT":
        tokenizer = BioGptTokenizer.from_pretrained("microsoft/biogpt") 
        tokenizer.pad_token = tokenizer.eos_token
        model = BioGptForCausalLM.from_pretrained("microsoft/biogpt").to(device)
        hidden_size = model.config.hidden_size
        print("BioGPT model")
    else:
        tokenizer = GPT2Tokenizer.from_pretrained("gpt2")
        tokenizer.pad_token = tokenizer.eos_token
        model = GPT2LMHeadModel.from_pretrained("gpt2").to(device)
        hidden_size = model.config.n_embd
        print("GPT2 model")
        
    for param in model.parameters():
        param.requires_grad = True  # Freezes all transformer parameters

    transformer_parameters= sum(p.numel() for p in model.parameters())
    print(f"Number of transformer parameters: {transformer_parameters}")

    return tokenizer, model, hidden_size

# if you want to import BioGPT, change this variable!
model_name = "GPT2" # or "BioGPT"
tokenizer, transformerModel, hidden_size = load_model_and_tokenizer(model_name)

#### **We create a custom dataset containing only the data we need:**
- **images**
- **tokenized findings**
- **attention mask**

In [None]:
class ChestXRayDataset(Dataset):
    def __init__(self, reports_df, projections_df, image_folder, tokenizer, uids, transforms):
        self.reports_df = reports_df[reports_df["uid"].isin(uids)].reset_index(drop=True)
        self.projections_df = projections_df
        self.image_folder = image_folder
        self.tokenizer = tokenizer
        self.transform = transforms

    def __len__(self):
        return len(self.reports_df)

    def __getitem__(self, idx):
        row = self.reports_df.iloc[idx]
        uid = row["uid"]
        text = row["findings"]

        # tokenize findings column
        encoded_text = self.tokenizer(
            text,
            padding="max_length",
            truncation=True,
            max_length=144,
            return_tensors="pt"
        )

        # find the path and filename of the associated image
        image_filename = self.projections_df[self.projections_df["uid"] == uid]["filename"].values[0]
        image_path = f"{self.image_folder}/{image_filename}"

        # load and transform the image
        image = Image.open(image_path).convert("L")  # conversion to grayscale
        image = self.transform(image)

        # return the image, label (finding)
        return image, encoded_text["input_ids"].squeeze(0), encoded_text["attention_mask"].squeeze(0)


tf = T.Compose([
    T.Resize((224, 224)),  # resizing for pre-trained models
    T.ToTensor(),
])

train_dataset = ChestXRayDataset(reports_filtered, projections_filtered, img_dir, tokenizer, train_ds, tf)
val_dataset = ChestXRayDataset(reports_filtered, projections_filtered, img_dir, tokenizer, val_ds, tf)

#### **Create the dataloader, that is we split the data in the dataset previously created in batches. We do this operation for both train set and validation set**

In [None]:
BATCH_SIZE = 64

# create the DataLoader to generate batches of the dataset and iterate over them
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE)

#### **OutOfMemoryError: the following code is used for freeing the GPU cache**

In [None]:
import gc

gc.collect()
torch.cuda.empty_cache()

#### **Loading and mapping the encoderCNN to its corresponding model**

In [None]:
def conv_layer(n_input, n_output, kernel_size, stride=1):
    return nn.Sequential(
        nn.Conv2d(n_input, n_output, kernel_size, stride),
        nn.ReLU(),
        nn.BatchNorm2d(n_output),
        nn.MaxPool2d(2)
    )

In [None]:
class EncoderCNN(nn.Module):
    def __init__(self, in_channels):
        super().__init__()
        self.encoder = nn.Sequential(
            conv_layer(in_channels, 64, 3),
            conv_layer(64, 128, 3),
            conv_layer(128, 256, 3),
            conv_layer(256, 512, 3)
        )

    def encode(self, x):
        return self.encoder(x)

    def forward(self, x):
        return self.encode(x)

In [None]:
state_dict = torch.load("/kaggle/input/encodercnn/pytorch/default/1/encoder.pth")

new_state_dict = {f"encoder.{k}": v for k, v in state_dict.items()}

encoderCNN = EncoderCNN(1).to(device)
encoderCNN.load_state_dict(new_state_dict, strict=False)

#### **Build the FF mapper to adapt the encoderCNN output to the transformer**

In [None]:
def linear_layer(dim_input, dim_output, drop_p=0.2, last=False):
    layers = [nn.Linear(dim_input, dim_output)]
    if not last:
        layers.append(nn.ReLU())
        layers.append(nn.Dropout(p=drop_p))
    return nn.Sequential(*layers)

In [None]:
class FF_mapper(nn.Module):

    """
    # old model (before the two approaches)
    def __init__(self, dim_input, dim_output):
        super().__init__()
        self.ff = nn.Sequential(
            linear_layer(dim_input, 640),
            linear_layer(640, dim_output, last=True),
            nn.LayerNorm(dim_output)
        )
    """

    """
    # token CE approach
    def __init__(self, dim_input, dim_output):
        super().__init__()
        self.ff = nn.Sequential(
            linear_layer(dim_input, 768),
            linear_layer(768, 896),
            #linear_layer(1024, 896),
            linear_layer(896, dim_output, last=True),
            nn.LayerNorm(dim_output)
        )
    """
    
    # embedding approach 
    def __init__(self, dim_input, dim_output):
        super().__init__()
        self.ff = nn.Sequential(
            linear_layer(dim_input, 640),
            linear_layer(640, 896),
            #linear_layer(896, 1024),
            linear_layer(896, dim_output, last=True),
            nn.LayerNorm(dim_output)
        )

    def forward(self, latent_space):
        # flatten, permute and stuff
        batch_size, C, H, W = latent_space.shape
        latent_space = latent_space.permute(0, 2, 3, 1)  # (1, 12, 12, 512)
        latent_space = latent_space.view(batch_size, H * W, C)  # (1, 144, 512)
        return self.ff(latent_space)

## **Text generation**

#### **Pass the encoder output as input to the transformer and generate the text**

- Found a problem on the transformer generate function: since it's not differentiable (it internally uses non differentiable functions like argmax and sample), it does not allow us to perform backpropagation on the model (particularly on the ff_mapper). Hence, by using the generate function at training phase, we are not able to update the weights of the mapper 

In [None]:
# if we want to use generate function as a deterministic function, comment the top_k=50 
# and add do_sample=False
def generate_text(model_name, model, tokenizer, inputs_embeds, attention_mask):
    generated_ids = transformerModel.generate(
        inputs_embeds=inputs_embeds, 
        max_length=512,
        attention_mask=attention_mask,
        pad_token_id=tokenizer.eos_token_id,
        no_repeat_ngram_size=2,   # avoid repetitions
        top_k=50,   # consider only the 50 most probable words
        eos_token_id=None,
    )
    decoded_text = tokenizer.decode(generated_ids[0], skip_special_tokens=True)
    print("Generated text:", generated_ids)
    print("Decoded generated text:", decoded_text)

#### **Transformer text generation**

In [None]:
# Loading fine-tuned GPT2

# transformerModel.load_state_dict(torch.load("/kaggle/input/gpt2_fine_tuned/pytorch/default/1/gpt2_finetuned.pth"))

In [None]:
# Parameters
image_feature_dim = 512  # EncoderCNN output (latent space) has 512 features for each spatial position
                         # (the latent space size is 512)
seq_length = 12 * 12  # the number of token (144, one for each spatial position)

print("Hidden size: ", hidden_size) # it represents the input size expected by the transformer (768 for GPT2 and 1024 for BioGPT) 

# Mapper definition
mapper = FF_mapper(image_feature_dim, hidden_size).to(device)

# Loading of the old model (before two approaches)
# mapper.load_state_dict(torch.load("/kaggle/input/ff_mapper/pytorch/default/1/ff_mapper.pth"))

# Loading of the embedding approach
# For the embedding approach, we can use GPT2 or BioGPT
# GPT2:
mapper.load_state_dict(torch.load("/kaggle/input/mapper/pytorch/cos_sim/2/ff_mapper_GPT2.pth"))

# BioGPT:
# mapper.load_state_dict(torch.load("/kaggle/input/ff_mapper_biogpt/pytorch/default/1/ff_mapper_BioGPT.pth"))

# Loading of the token Cross Entropy approach
# mapper.load_state_dict(torch.load("/kaggle/input/mapper/pytorch/token_cross_entropy/1/ff_mapper_GPT2.pth"))

# take a row from the validation dataset
images, labels, att_mask = val_dataset[90]
images = images.unsqueeze(0).to(device)
labels = labels.to(device)
att_mask = att_mask.unsqueeze(0).to(device)

# from encoderCNN to transformer
with torch.no_grad():
    encoder_output = encoderCNN(images)  # features extraction
    transformer_input = mapper(encoder_output)  # adapt to GPT-2

print("mapper output: ", transformer_input)
print("mapper output shape: ", transformer_input.shape)
print("mapper output dtype: ", transformer_input.dtype)

# generate the report
generate_text(model_name, transformerModel, tokenizer, transformer_input, att_mask)

### **Statistical analysis**

- To understand and analyze what types of embeddings the ff_mapper is generating
- To check if the ff_mapper is working properly

In [None]:
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
from sklearn.metrics.pairwise import cosine_similarity
import seaborn as sns

# collect the transformer real embeddings and the predicted embeddings of some dataset samples
def collect_embeddings(unshuffled_dataloader, encoder, transformer, ff_mapper, device, max_samples=1000):
    true_embeddings = []
    pred_embeddings = []
    findings_texts = []

    # shuffle dataloader
    dataloader = DataLoader(unshuffled_dataloader.dataset, batch_size=unshuffled_dataloader.batch_size, shuffle=True)
    
    with torch.no_grad():
        for i, (images, text, attention) in enumerate(tqdm(dataloader, desc="Collecting embeddings")):
            # i: batch index
            # images.shape[0]: batch size
            if i * images.shape[0] >= max_samples:
                break
                
            images = images.to(device)
            text = text.to(device)
            attention = attention.to(device)
            
            # obtain the image latent space from the encoder and predict the embeddings
            latent_space = encoder(images)
            
            # in the old model, we did the following: 
            # pred_emb = ff_mapper(latent_space)
            
            # in the new model (in both approaches, embeddings one and token CE),
            # the pred_emb have to be equal to the generated text:
            ff_mapper_output = ff_mapper(latent_space)

            pred_text = transformerModel.generate(
                inputs_embeds=ff_mapper_output, 
                max_length=288,
                attention_mask=attention,
                pad_token_id=tokenizer.eos_token_id,
                no_repeat_ngram_size=2,   # avoid repetitions
                 # top_k=50,   # consider only the 50 most probable words
                eos_token_id=None,
                do_sample=False,
            )

            pred_emb = transformer.get_input_embeddings()(pred_text)
            
            # obtain the real embeddings from the transformer
            true_emb = transformer.get_input_embeddings()(text)
            
            # save the embeddings
            true_embeddings.append(true_emb.cpu().numpy())
            pred_embeddings.append(pred_emb.cpu().numpy())
            
            # save also the associated tokenized reports
            for t in text:
                findings_texts.append(t.cpu().numpy())
    
    # model the embeddings tensors from 3D to 2D (from (batch_size, seq_length, embedding_size) to (batch_size * seq_length, embedding_size))
    # where: 
    # - seq_length is the input sequence (like a statement) size
    # - embedding_size is the embedding (numeric tensor of an object or a word) size
    true_embeddings = np.vstack([emb.reshape(emb.shape[0] * emb.shape[1], emb.shape[2]) for emb in true_embeddings])
    pred_embeddings = np.vstack([emb.reshape(emb.shape[0] * emb.shape[1], emb.shape[2]) for emb in pred_embeddings])

    # these are matrices (arrays of embeddings, that are tensors)
    return true_embeddings, pred_embeddings, findings_texts

print("Collecting embeddings...")
true_embeddings, pred_embeddings, findings_texts = collect_embeddings(
    val_loader, encoderCNN, transformerModel, mapper, device
)

#### **L2 Normalization**
- The euclidean norm of an embedding geometrically represents the straight-line distance from the origin to that embedding

- $||\mathbf{v}||_2 = \sqrt{\sum_{i=1}^{n} v_i^2}$

**L2 Unit Norm**: A vector has an L2 unit norm if its L2 norm is equal to 1

- $||\mathbf{v}||_2 = 1 \iff \sqrt{v_1^2 + v_2^2 + ... + v_n^2} = 1$

- Each individual element of an L2 normalized embedding vector will always be between -1 and 1 (inclusive)

- **In terms of embeddings**:
When we normalize an embedding vector to have an L2 unit norm, we are essentially maintaining its direction in the embedding space, but scaling its length to 1


- Why is normalization to the L2 unit norm useful for embeddings?

    - **Focusing on direction**: In many embedding applications (especially in Natural Language Processing), the direction of an embedding vector captures the semantic meaning. By normalizing, we focus on the directional similarity between embeddings.
    - **Comparability**: Normalizing embeddings makes their directions more comparable, regardless of their original magnitude. This can be useful when magnitudes can vary for reasons not directly related to semantic meaning

In [None]:
# calculate the euclidean norm (L2 norm) of the embeddings
def normalize_L2_embeddings(embeddings):
    norm = np.linalg.norm(embeddings, axis=1, keepdims=True)
    normalized_embeddings = embeddings / norm
    return normalized_embeddings
    
# apply embedding normalization
true_embeddings_normalized = normalize_L2_embeddings(true_embeddings)
pred_embeddings_normalized = normalize_L2_embeddings(pred_embeddings)

# verify the norm of the normalized embeddings:
# calculate the norms and check if they are equal to 1 
# (that is, the square root of the sum of the squares of the elements is equal to 1)
# print("embeddings: ", pred_embeddings[0])
# print("normalized embeddings: ", pred_embeddings_normalized[0])
# norms_after_normalization = np.linalg.norm(pred_embeddings_normalized, axis=1)
# print("\nL2 norms of normalized embeddings:")
# print(norms_after_normalization)

# Check if the norms are close to 1
# tolerance = 1e-7  # Small tolerance for floating-point comparisons
# are_norms_close_to_one = np.allclose(norms_after_normalization, 1.0, atol=tolerance)
# print("\Are all norms close to 1?", are_norms_close_to_one)

In [None]:
# analyze the embeddings distribution and calculate some relevant statistics
def analyze_embeddings_distribution(true_embeddings, pred_embeddings):
    # calculate mean and standard deviation for the embedding sets
    true_mean = np.mean(true_embeddings, axis=0)
    true_std = np.std(true_embeddings, axis=0)
    
    pred_mean = np.mean(pred_embeddings, axis=0)
    pred_std = np.std(pred_embeddings, axis=0)

    # true_mean, true_std, pred_mean, pred_std are all arrays
    
    # calculate the mean of the cosine similarity between the predicted embeddings and the real ones
    # sim_matrix is the matrix of the cosine similarity where sim_matrix[i][j] is the
    # cosine similarity between the i-th predicted embedding and the j-th real embedding
    sim_matrix = cosine_similarity(pred_embeddings, true_embeddings)
    # since we only want to analyze the similarity between a predicted embedding and its corresponding real one,
    # we extract the main diagonal from the sim_matrix and calculate its mean
    diag_sim = np.diag(sim_matrix)
    avg_cosine_sim = np.mean(diag_sim)
    
    # calculate the mean of the euclidean distance    
    # linalg.norm is a measure of the vector's distance from the origin (basically, the euclidean distance)
    euclidean_distances = np.linalg.norm(true_embeddings - pred_embeddings, axis=1)
    avg_euclidean_dist = np.mean(euclidean_distances)
    
    print(f"Similarity Statistics:")
    print(f"Average Cosine Similarity: {avg_cosine_sim:.4f}")
    print(f"Average Euclidean Distance: {avg_euclidean_dist:.4f}")
    # the magnitude of embeddings can provide insights into the data distribution and the transformations 
    # applied by the mapper: if the magnitude of the predicted embeddings is significantly different 
    # from that of the real embeddings, it indicates that the mapper is altering the scale of the embeddings.
    # For example, if the predicted embeddings have a much larger magnitude, 
    # it suggests that the mapper is "inflating" the vectors
    print(f"True Embeddings - Mean Magnitude: {np.linalg.norm(true_mean):.4f}, Std: {np.mean(true_std):.4f}")
    print(f"Pred Embeddings - Mean Magnitude: {np.linalg.norm(pred_mean):.4f}, Std: {np.mean(pred_std):.4f}")
    
    return {
        'cosine_sim': diag_sim,
        'euclidean_dist': euclidean_distances,
        'true_mean': true_mean,
        'true_std': true_std,
        'pred_mean': pred_mean,
        'pred_std': pred_std
    }

print("\nAnalyzing embeddings distribution...")
stats = analyze_embeddings_distribution(true_embeddings_normalized, pred_embeddings_normalized)

In [None]:
# visualize the euclidean distances and cosine similarities distribution
def visualize_embeddings_comparison(true_embeddings, pred_embeddings, sample_size=1000, save_path=None):
    # select a sample of the embeddings (to make the plot more readable)
    if len(true_embeddings) > sample_size:
        indices = np.random.choice(len(true_embeddings), sample_size, replace=False)
        true_sample = true_embeddings[indices]
        pred_sample = pred_embeddings[indices]
    else:
        true_sample = true_embeddings
        pred_sample = pred_embeddings
    
    # calculate the Euclidean distances
    distances = np.sqrt(np.sum((true_sample - pred_sample)**2, axis=1))
    
    # calculate the cosine similarities
    cos_sims = np.sum(true_sample * pred_sample, axis=1) / (
        np.sqrt(np.sum(true_sample**2, axis=1)) * np.sqrt(np.sum(pred_sample**2, axis=1))
    )
    # we can calculate the cosine similarities also using the cosine_similarity function
    # sim_matrix = cosine_similarity(pred_embeddings, true_embeddings)
    # diag_sim = np.diag(sim_matrix)
    # avg_cosine_sim = np.mean(diag_sim)
    
    # plot
    plt.figure(figsize=(14, 6))

    # visualize the distribution of the euclidean distances
    plt.subplot(1, 2, 1)
    plt.hist(distances, bins=30, alpha=0.7)
    plt.axvline(np.mean(distances), color='r', linestyle='--', label=f'Mean: {np.mean(distances):.4f}')
    plt.title("Distribution of Euclidean Distances")
    plt.xlabel("Distance")
    plt.ylabel("Frequency")
    plt.legend()

    # visualize the distribution of the cosine similarities
    plt.subplot(1, 2, 2)
    plt.hist(cos_sims, bins=30, alpha=0.7)
    plt.axvline(np.mean(cos_sims), color='r', linestyle='--', label=f'Mean: {np.mean(cos_sims):.4f}')
    plt.title("Distribution of Cosine Similarities")
    plt.xlabel("Similarity")
    plt.ylabel("Frequency")
    plt.legend()
    
    plt.tight_layout()

    if save_path:
        plt.savefig(f"{save_path}_comparison.png")
    plt.show()
    
    return distances, cos_sims
    
save_path = "embeddings_analysis"

print("\nVisualizing embeddings comparison...")
distances, cos_sims = visualize_embeddings_comparison(true_embeddings_normalized, pred_embeddings_normalized, save_path=save_path)

#### **PCA**

- **PCA** (*Principal Component Analysis*): is a statistical technique used for dimensionality reduction. It transforms high-dimensional data into a lower-dimensional representation while preserving as much of the original variance as possible.
    
- How PCA Works:

    - **Standardization**:
        - PCA often begins by standardizing the data, which means scaling the variables to have a mean of 0 and a standard deviation of 1.
    
    - **Covariance Matrix**:
        - PCA calculates the covariance matrix (which describes the relationships between the variables) of the standardized data 
   
    - **Eigenvectors and Eigenvalues**:
        - PCA then calculates the eigenvectors and eigenvalues of the covariance matrix.
        - Eigenvectors represent the directions of the principal components and eigenvalues represent the magnitude of the variance along those directions
    
    - **Principal Components**:
        - The eigenvectors are sorted by their corresponding eigenvalues in descending order (from the greatest eigenvalue to the smallest one). The top k eigenvectors (where k is the desired number of dimensions) are selected as the principal components
    
    - **Data Transformation**:
        - The original data is projected onto the selected principal components, resulting in a lower-dimensional representation.



- Hence, in our case we are reducing the embeddings from size of 768 (that is the GPT2 embedding size) to 2D.

- **NOTE 1**: by reducing the embedding dimensionality, we are also losing information!
- **NOTE 2**: PCA is a linear dimensionality reduction method! We will check also non-linear methods (like t-SNE)!
    

In [None]:
# apply PCA (Principal Components Analysis) and visualize the 2D embeddings
def visualize_embeddings_pca(true_embeddings, pred_embeddings, save_path=None):
    # combine the true embeddings with the predicted ones in a single matrix (by stacking them)
    combined_embeddings = np.vstack([true_embeddings, pred_embeddings])
    
    pca = PCA(n_components=2)
    # reduce_embeddings will be a 2D matrix, where each row is an embedding projected in the principal components space
    reduced_embeddings = pca.fit_transform(combined_embeddings)
    
    # split the results
    true_reduced = reduced_embeddings[:len(true_embeddings)]
    pred_reduced = reduced_embeddings[len(true_embeddings):]
    
    # plot
    plt.figure(figsize=(10, 8))
    plt.scatter(true_reduced[:, 0], true_reduced[:, 1], alpha=0.5, label="True Embeddings", color="blue")
    plt.scatter(pred_reduced[:, 0], pred_reduced[:, 1], alpha=0.5, label="Predicted Embeddings", color="red")
    # pca.explained_variance_ratio_: array where each element represents the proportion of the total variance 
    # in the original data that is explained by the corresponding principal component.
    # it basically represents how much "information" or "variance" each direction (component, that are PC1 and PC2)
    # captures from the original data.
    plt.title(f"PCA Visualization of Embeddings (explained var: {sum(pca.explained_variance_ratio_):.2f})")
    plt.xlabel(f"PC1 ({pca.explained_variance_ratio_[0]:.2f})")
    plt.ylabel(f"PC2 ({pca.explained_variance_ratio_[1]:.2f})")
    plt.legend()
    plt.grid(True, alpha=0.3)
    
    if save_path:
        plt.savefig(f"{save_path}_pca.png", dpi=300, bbox_inches='tight')
    plt.show()
    
    return pca

print("\nVisualizing embeddings with PCA...")
pca = visualize_embeddings_pca(true_embeddings_normalized, pred_embeddings_normalized, save_path)

In [None]:
!pip install umap-learn

#### **UMAP**

- **UMAP** (*Uniform Manifold Approximation and Projection*) is a powerful dimensionality reduction technique that is often used for visualizing high-dimensional data in a lower-dimensional space (typically 2D or 3D).

1. **Constructing a High-Dimensional Graph**:
    - UMAP first tries to understand the relationships between the data points in the original high-dimensional space.
For each data point, it identifies its nearest neighbors (the number of neighbors is controlled by the n_neighbors parameter).
It then creates a weighted graph where edges connect each point to its neighbors. The weight of an edge reflects how "close" the two points are in the high-dimensional space.


2. **Approximating the Manifold**:

    - UMAP assumes that the high-dimensional data lies on a lower-dimensional "manifold" (a complex, curved surface).
    The graph constructed in the previous step is used to approximate the local structure of this manifold around each data point.


3. **Defining a Low-Dimensional Graph**:

    - UMAP then aims to create a similar graph in the low-dimensional space (2D) where the relationships between the points are as similar as possible to those in the high-dimensional graph.
    It starts with a random initial layout of the points in the low-dimensional space.


4. **Optimizing the Low-Dimensional Layout**:

    - UMAP uses an optimization process (similar to gradient descent) to adjust the positions of the points in the low-dimensional space.
    The goal of this optimization is to minimize the difference between the structure of the high-dimensional graph and the low-dimensional graph.
    It tries to pull points that were close in the high-dimensional space closer in the low-dimensional space, and push points that were far apart further away.
    The min_dist parameter plays a role here by setting a minimum allowed distance between points in the low-dimensional embedding.

In [None]:
import umap

# applies UMAP
# n_neighbors: number of nearest neighbors to consider for local manifold approximation.
# Balances local versus global structure in the final embedding. Smaller values focus on local details,
# larger values on global structure.
# min_dist: minimum distance between embedded points. Controls how tightly points are packed together.
# Smaller values allow tighter packing.
def visualize_embeddings_umap(true_embeddings, pred_embeddings, n_neighbors=15, min_dist=0.1, save_path=None):
    # combine the true embeddings with the predicted ones in a single matrix (by stacking them)
    combined_embeddings = np.vstack([true_embeddings, pred_embeddings])
    # initialize and fit the UMAP reducer
    # n_components=2: Reduces the dimensionality to 2 for visualization
    reducer = umap.UMAP(n_components=2, n_neighbors=n_neighbors, min_dist=min_dist)
    reduced_embeddings = reducer.fit_transform(combined_embeddings)

    # split the reduced embeddings back into true and predicted sets
    true_reduced = reduced_embeddings[:len(true_embeddings)]
    pred_reduced = reduced_embeddings[len(true_embeddings):]

    plt.figure(figsize=(10, 8))
    plt.scatter(true_reduced[:, 0], true_reduced[:, 1], alpha=0.5, label="True Embeddings", color="blue")
    plt.scatter(pred_reduced[:, 0], pred_reduced[:, 1], alpha=0.5, label="Predicted Embeddings", color="red")
    plt.title(f"UMAP Visualization of Embeddings (n_neighbors={n_neighbors}, min_dist={min_dist})")
    plt.legend()
    plt.grid(True, alpha=0.3)

    if save_path:
        plt.savefig(f"{save_path}_umap_nn{n_neighbors}.png", dpi=300, bbox_inches='tight')
    plt.show()

    return reducer

print("\nVisualizing embeddings with UMAP...")
for n_neighbors in [5, 15, 30]:
    umap_reducer = visualize_embeddings_umap(true_embeddings_normalized, pred_embeddings_normalized, n_neighbors=n_neighbors, save_path=save_path)

#### **t-SNE**

- **t-SNE**: it is a non-linear dimensionality reduction technique. It's primarily used for visualizing high-dimensional data in a low-dimensional space (typically 2D or 3D).

- How it Works:
    - **Probability Distributions**:
 
        - t-SNE converts high-dimensional distances between data points into conditional probabilities.
        - It creates a probability distribution in the high-dimensional space that represents the likelihood of points being neighbors.
        - It then attempts to recreate a similar probability distribution in the low-dimensional space.

    - **Minimizing Divergence**:
 
        - t-SNE minimizes the Kullback-Leibler (KL) divergence between the probability distributions in the high-dimensional and low-dimensional spaces. This means it tries to make the distances between points in the low-dimensional space reflect their relationships in the high-dimensional space.

    - **Non-linear Mapping**:
        
        - t-SNE uses a non-linear mapping, which allows it to capture complex relationships and structures in the data.



- **Perplexity** controls the balance between local and global aspects of the data. It roughly corresponds to the number of nearest neighbors considered for each point. Lower perplexity values focus on local relationships, while higher values consider global relationships

- t-SNE is generally considered better (than PCA) for visualization because it excels at revealing clusters and local patterns

In [None]:
# apply t-SNE (t-Distributed Stochastic Neighbor Embedding) and visualize the reduced embeddings
def visualize_embeddings_tsne(true_embeddings, pred_embeddings, perplexity=30, save_path=None):
    # combine the true embeddings with the predicted ones in a single matrix (by stacking them)
    combined_embeddings = np.vstack([true_embeddings, pred_embeddings])
    
    # To calculate t-SNE faster (since t-SNE can be computationally expensive for high-dimensional data),
    # we can first apply PCA (on 50 components) and then t-SNE on the PCA results
    if combined_embeddings.shape[1] > 50:
        pca = PCA(n_components=50)
        combined_embeddings = pca.fit_transform(combined_embeddings)
    
    tsne = TSNE(n_components=2, perplexity=perplexity, n_iter=1000) # verbose=1 to show progress
    reduced_embeddings = tsne.fit_transform(combined_embeddings)
    
    # split the results
    true_reduced = reduced_embeddings[:len(true_embeddings)]
    pred_reduced = reduced_embeddings[len(true_embeddings):]
    
    # plot
    plt.figure(figsize=(10, 8))
    plt.scatter(true_reduced[:, 0], true_reduced[:, 1], alpha=0.5, label="True Embeddings", color="blue")
    plt.scatter(pred_reduced[:, 0], pred_reduced[:, 1], alpha=0.5, label="Predicted Embeddings", color="red")
    plt.title(f"t-SNE Visualization of Embeddings (perplexity={perplexity})")
    plt.legend()
    plt.grid(True, alpha=0.3)
    
    if save_path:
        plt.savefig(f"{save_path}_tsne_perp{perplexity}.png", dpi=300, bbox_inches='tight')
    plt.show()
    
    return tsne

print("\nVisualizing embeddings with t-SNE...")
# Trials made on different perplexity
for perplexity in [5, 30, 50]:
    tsne = visualize_embeddings_tsne(true_embeddings_normalized, pred_embeddings_normalized, perplexity, save_path)

## **Transformer fine-tuning**

- We tried to fine-tune the pre-trained transformer, in order to improve its text generation capacities and adapt it to our dataset (medical context). Unfortunately, we have fallen into **catastrophic forgetting** (pre-trained transformer regression, that is the transformer loses all its previous knowledge)
- Hence, we avoided catastrophic forgetting by doing a light fine-tuning: we update only the last layers of the transformer
- NOTE: we tried to fine-tune only GPT2, since it's the pre-trained transformer with the best obtained results

In [None]:
# Update only GPT-2 parameters of the last two blocks and lm_head
for name, param in transformerModel.named_parameters():
    # transformer.h.<n> are the decoder blocks; lm_head is the output head
    if name.startswith("transformer.h.10.") or name.startswith("transformer.h.11.") or name.startswith("lm_head."):
        param.requires_grad = True
    else:
        param.requires_grad = False

# Freeze encoderCNN and mapper completely
for p in encoderCNN.parameters():
    p.requires_grad = False

for p in mapper.parameters():
    p.requires_grad = False

trainable_params = [p for n, p in transformerModel.named_parameters() if p.requires_grad]
optimizer = AdamW(trainable_params, lr=1e-4)

loss_fn = nn.CrossEntropyLoss(ignore_index=tokenizer.pad_token_id)

# Dataloader used to fine-tune
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=8)

In [None]:
def train_one_epoch():
    transformerModel.train()
    total_loss = 0.0
    for images, input_ids, attn_mask in tqdm(train_loader, desc="Train"):
        images, input_ids, attn_mask = images.to(device), input_ids.to(device), attn_mask.to(device)

        # encode image -> mapper -> inputs_embeds
        with torch.no_grad():
            latent_space = encoderCNN(images)
            inputs_embeds = mapper(latent_space)

        # forward through GPT‑2 with labels
        outputs = transformerModel(
            inputs_embeds=inputs_embeds,
            attention_mask=attn_mask,
            labels=input_ids,
            return_dict=True,
        )
        loss = outputs.loss

        # Backpropagate only on trainable params
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item() * images.size(0)

    return total_loss / len(train_loader.dataset)


def eval_one_epoch():
    mapper.eval()
    transformerModel.eval()
    total_loss = 0.0
    with torch.no_grad():
        for images, input_ids, attn_mask in tqdm(val_loader, desc="Val"):
            images, input_ids, attn_mask = images.to(device), input_ids.to(device), attn_mask.to(device)
            latent_space = encoderCNN(images)
            inputs_embeds = mapper(latent_space)
            outputs = transformerModel(
                inputs_embeds=inputs_embeds,
                attention_mask=attn_mask,
                labels=input_ids,
                return_dict=True,
            )
            total_loss += outputs.loss.item() * images.size(0)
    return total_loss / len(val_loader.dataset)


NUM_EPOCHS = 10
for epoch in range(1, NUM_EPOCHS+1):
    train_loss = train_one_epoch()
    val_loss   = eval_one_epoch()
    print(f"Epoch {epoch}  Train Loss: {train_loss:.4f}  Val Loss: {val_loss:.4f}")

#### **Check fine-tuning results by generating a text from a dataset row**

In [None]:
images, labels, att_mask = val_dataset[90]
images = images.unsqueeze(0).to(device)
labels = labels.to(device)
att_mask = att_mask.unsqueeze(0).to(device)

with torch.no_grad():
    encoder_output = encoderCNN(images)  # features extraction
    transformer_input = mapper(encoder_output)  # adapt to GPT-2

print("mapper output: ", transformer_input)
print("mapper output shape: ", transformer_input.shape)
print("mapper output dtype: ", transformer_input.dtype)

generate_text(model_name, transformerModel, tokenizer, transformer_input, att_mask)

In [None]:
torch.save(transformerModel.state_dict(), "gpt2_finetuned.pth")