# Transformer Model Data Preparation Workflow

This notebook documents the initial pipeline for mass spectrometry data preparation and preprocessing, which is essential for feeding our future Transformer model.

We start by loading and processing MGF files, applying a series of filters and transformations to ensure the quality and appropriate format of the data.

The first step involves validating the MGF file path (**path_check**) and loading the raw spectra. The **mgf_get_spectra** function is responsible for reading the MGF file and extracting each spectrum's data

In [1]:
from src.utils import *
from src.config import *
from src.mgf_tools.mgf_get import * 

In [2]:
mgf_data = r"/Users/carla/PycharmProjects/Mestrado/Transformer-Based-Models-for-Chemical-Fingerprint-Prediction/datasets/raw/cleaned_gnps_library.mgf"

path_check(mgf_data)

In [3]:
mgf_spect= mgf_get_spectra(mgf_data, num_spectra=10)

This is the core phase of transforming the raw data. The **mgf_deconvoluter** function iterates over each loaded spectrum, applying a series of cleaning and tokenization steps via the **mgf_spectrum_deconvoluter**

In [4]:
x = mgf_deconvoluter(mgf_data=mgf_spect, mz_vocabs=mz_vocabs, min_num_peaks=5, max_num_peaks=max_num_peaks, noise_rmv_threshold=0.01, mass_error=0.01, log=True)

The **mgf_deconvoluter** function returns a list of tuples, where each tuple (spectrum_id, tokenized_mz, tokenized_precursor, intensities) represents a spectrum that has successfully passed through the entire preprocessing pipeline.

In [15]:
if len(x) > 0:
    spectrum_tuple = x[2]

    spectrum_id, tokenized_mz, tokenized_precursor, intensities = spectrum_tuple

    print(f"\nTokenised spectrum details:")
    print(f"Spectrum ID: {spectrum_id}")
    print(f"Number of m/z tokens: {len(tokenized_mz)}")
    print(f"Number of intensities: {len(intensities)}")
    print(f"Precursor token: {tokenized_precursor}")

else:
    print("No spectrum passed through the filters and was processed")

In [5]:
# --- IMPORTS NECESSÁRIOS PARA ESTA SECÇÃO ---
import torch
from torch.utils.data import Dataset, DataLoader
import numpy as np 
# Embora o collate_fn já lide com isso, é bom ter

# Certifique-se de que as suas configurações estão importadas, especialmente max_seq_len e vocab_size
from src.config import max_seq_len, vocab_size

# Assumindo que a sua classe SpectraCollateFn está em src/data/collate_fn.py
# (Se mudou o caminho da pasta 'model' para 'data', certifique-se que o import está correto)
from src.data.collate_fn import SpectraCollateFn


# --- 1. Definir a Classe Dataset (Simples e Reutilizável) ---
# Esta classe irá encapsular a lista de tuplos processados pelo mgf_deconvoluter.
class SpectrumDataset(Dataset):
    """
    Custom Dataset for mass spectrometry data.
    It wraps a list of pre-processed spectrum tuples (ID, tokenized_mz, tokenized_precursor, intensities)
    and provides individual spectrum data for the DataLoader.
    """
    def __init__(self, processed_spectra_list):
        # processed_spectra_list é a lista de tuplos que o mgf_deconvoluter retorna
        self.data = processed_spectra_list

    def __len__(self):
        """Returns the total number of processed spectra in the dataset."""
        return len(self.data)

    def __getitem__(self, idx):
        """
        Retrieves a single processed spectrum tuple by index.
        This tuple is then passed to the collate_fn for batch processing.
        """
        # Retorna o tuplo individual de um espectro:
        # (spectrum_id, tokenized_mz_list, tokenized_precursor, intensity_array)
        return self.data[idx]

# --- 2. Preparar os Dados de Teste para o DataLoader ---
# 'x' é a lista de tuplos gerada pelo mgf_deconvoluter na célula anterior
# Ex: x = mgf_deconvoluter(...)

if not x: # Verifica se a lista 'x' não está vazia (ou seja, se há espectros processados)
    print("\nAVISO: NENHUM espectro foi processado com sucesso pelo deconvoluter. O DataLoader estará vazio.")
    print("Verifique as suas configurações (`min_num_peaks`, `max_num_peaks`) e os ficheiros de input MGF.")
else:
    print(f"\n--- Configurando o DataLoader ---")
    print(f"Número total de espectros processados e prontos para o Dataset: {len(x)}")

    # --- 3. Instanciar o seu Dataset ---
    # Passamos a lista 'x' (todos os espectros processados) para o Dataset
    spectrum_dataset = SpectrumDataset(x)

    # --- 4. Instanciar o seu Collate Function ---
    # A sua SpectraCollateFn precisa do max_length (max_seq_len) e padding_token_value (vocab_size)
    # Certifique-se que o construtor da sua SpectraCollateFn está de acordo com isto.
    # Ex: class SpectraCollateFn: def __init__(self, max_length, padding_token_value): ...
    # Se o seu __init__ já usa as variáveis globais, pode ser apenas my_collate_fn = SpectraCollateFn()
    # Pelo código anterior, a sua SpectraCollateFn já lida com isto, então:
    my_collate_fn = SpectraCollateFn() # Se o seu SpectraCollateFn usa os imports de config diretamente

    # --- 5. Criar o DataLoader ---
    batch_size = 2 # Um bom tamanho de batch para testar. Pode ajustar.
    my_dataloader = DataLoader(
        spectrum_dataset,    # O nosso Dataset
        batch_size=batch_size, # O número de espectros por batch
        shuffle=True,        # Shuffles os dados a cada época (bom para treino, opcional para teste)
        collate_fn=my_collate_fn, # A nossa função de agrupamento personalizada
        num_workers=0        # Para teste, 0 workers é mais simples. Aumentar para treino real.
    )

    print(f"DataLoader configurado com batch_size={batch_size}.")

    # --- 6. Iterar sobre o DataLoader e inspecionar o primeiro Batch ---
    print(f"\n--- Verificando o primeiro Batch do DataLoader ---")
    try:
        # Pega o primeiro batch para inspeção
        # next(iter(my_dataloader)) é uma forma comum de pegar um único batch
        mz_batch, int_batch, mask_batch, ids_batch = next(iter(my_dataloader))

        print(f"\nBatch obtido com sucesso!")
        print(f"Shape de mz_batch (tokens de m/z e precursor, com padding): {mz_batch.shape}")
        print(f"Shape de int_batch (intensidades e zeros, com padding): {int_batch.shape}")
        print(f"Shape de mask_batch (máscara de atenção): {mask_batch.shape}")
        print(f"IDs no primeiro batch: {ids_batch}")

        # Verificações de Sanidade (opcional, mas altamente recomendado)
        # Assegura que o comprimento da sequência é MAX_SEQ_LEN
        assert mz_batch.shape[1] == max_seq_len, f"Comprimento da sequência incorreto! Esperado {max_seq_len}, obtido {mz_batch.shape[1]}"
        assert int_batch.shape[1] == max_seq_len, f"Comprimento da sequência incorreto! Esperado {max_seq_len}, obtido {int_batch.shape[1]}"
        assert mask_batch.shape[1] == max_seq_len, f"Comprimento da sequência incorreto! Esperado {max_seq_len}, obtido {mask_batch.shape[1]}"

        # Verificar se o dtype está correto
        assert mz_batch.dtype == torch.long, f"Dtype incorreto para mz_batch! Esperado torch.long, obtido {mz_batch.dtype}"
        assert int_batch.dtype == torch.float32, f"Dtype incorreto para int_batch! Esperado torch.float32, obtido {int_batch.dtype}"
        assert mask_batch.dtype == torch.bool, f"Dtype incorreto para mask_batch! Esperado torch.bool, obtido {mask_batch.dtype}"

        print("\nVerificações de sanidade do shape e dtype concluídas com sucesso!")

        # Exemplo de um espectro dentro do batch (primeiro espectro do batch)
        print("\nPrimeiro espectro no batch (apenas os primeiros 15 e últimos 5 tokens/intensidades para ver o padding):")
        print(f"  Tokens (mz_batch[0, :15]): {mz_batch[0, :15].tolist()}")
        print(f"  Intensidades (int_batch[0, :15]): {int_batch[0, :15].tolist()}")
        print(f"  Máscara (mask_batch[0, :15]): {mask_batch[0, :15].tolist()}")

        print(f"  Tokens (mz_batch[0, -5:]): {mz_batch[0, -5:].tolist()}")
        print(f"  Intensidades (int_batch[0, -5:]): {int_batch[0, -5:].tolist()}")
        print(f"  Máscara (mask_batch[0, -5:]): {mask_batch[0, -5:].tolist()}")

        # Verificação do token de padding na máscara (se houver padding)
        # Encontra o primeiro índice onde a máscara é False (significa padding)
        first_padding_index = (mask_batch[0] == False).nonzero(as_tuple=True)
        if first_padding_index[0].numel() > 0:
            first_padding_index_val = first_padding_index[0][0].item()
            print(f"\nPrimeiro índice de padding detectado no primeiro espectro do batch: {first_padding_index_val}")
            print(f"Token esperado nesse índice: {vocab_size} (token de padding)")
            print(f"Token real nesse índice: {mz_batch[0, first_padding_index_val].item()}")
            print(f"Intensidade esperada nesse índice: 0.0")
            print(f"Intensidade real nesse índice: {int_batch[0, first_padding_index_val].item()}")
        else:
            print("\nNão foi detetado padding no primeiro espectro do batch (o espectro preencheu o MAX_SEQ_LEN).")


    except StopIteration:
        print("\nErro: O DataLoader está vazio. Nenhum batch para processar. Isso pode acontecer se 'x' estiver vazio.")
    except Exception as e:
        print(f"\nOcorreu um erro ao processar o batch: {e}")

In [6]:
len(spectrum_dataset)