## Installation

In [7]:
#%pip install 

# Download this repository
#!git clone https://github.com/pluskal-lab/DreaMS.git
#!cd DreaMS

# Create conda environment
#!conda update -n base -c defaults conda
#!conda create -n dreams python==3.11.0 --yes
#!conda init
#!conda activate dreams

# Install DreaMS
#%pip install -e ./DreaMS

#%pip install pytorch-lightning
#!git clone https://github.com/colorfulcereal/MassSpecGym

In [None]:
%pip install -e /teamspace/studios/this_studio/MassSpecGym


## Loading the MassSpecGym dataset

In [None]:
from massspecgym.utils import load_massspecgym
df = load_massspecgym()
df.head(1)


## MS/MS EDA

In [None]:
## Plot spectra

import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np

# Set plot style
sns.set_style('whitegrid')

# Function to plot histogram of mzs and intensities
def plot_spectrum(mzs, intensities):
    plt.figure(figsize=(8, 6))
    plt.stem(mzs, intensities, basefmt='-')
    plt.title('Mass Spectrum')
    plt.xlabel('m/z')
    plt.ylabel('Intensity')
    plt.show()

# Plot a random spectrum
random_index = np.random.randint(0, len(df))
print(random_index)
mzs = df.iloc[random_index]['mzs']
intensities = df.iloc[random_index]['intensities']
plot_spectrum(mzs, intensities)

# Plot multiple spectra
num_spectra = 5
random_indices = np.random.randint(0, len(df), num_spectra)
fig, axs = plt.subplots(nrows=num_spectra, ncols=1, figsize=(8, 6*num_spectra))
for i, idx in enumerate(random_indices):
    mzs = df.iloc[idx]['mzs']
    intensities = df.iloc[idx]['intensities']
    axs[i].stem(mzs, intensities, basefmt='-')
    axs[i].set_title(f'Spectrum {idx}')
    axs[i].set_xlabel('m/z')
    axs[i].set_ylabel('Intensity')
plt.tight_layout()
plt.show()

In [None]:
import pandas as pd
import matplotlib.pyplot as plt

# Halogen symbols
halogens = ['F', 'Cl', 'Br', 'I', 'At', 'Ts']

# Initialize halogen count dictionary
halogen_counts = {halogen: 0 for halogen in halogens}

# Count mass spectra containing each halogen
for index, row in df.iterrows():
    for halogen in halogens:
        if halogen in row['precursor_formula'] or halogen in row['formula']:
            halogen_counts[halogen] += 1

# Calculate percentages
halogen_percentages = {halogen: (count/len(df))*100 for halogen, count in halogen_counts.items()}

# Print results
print("Halogen Percentages:")
for halogen, percentage in halogen_percentages.items():
    print(f"{halogen}: {percentage:.2f}%")

# Plot (%)
plt.bar(halogen_percentages.keys(), halogen_percentages.values())
plt.xlabel('Halogen')
plt.ylabel('Percentage (%)')
plt.title('Distribution of Halogens in Mass Spectra')
plt.ylim(0, 50)  # Set y-axis limit to 100%
plt.gca().yaxis.set_major_formatter(plt.FuncFormatter(lambda x, loc: f"{x:.0f}%"))  # Format y-axis ticks as percentages
plt.show()

# Plot results (count)
plt.bar(halogen_counts.keys(), halogen_counts.values())
plt.xlabel('Halogen')
plt.ylabel('Count')
plt.title('Distribution of Halogens in Mass Spectra By Count')
plt.show()

## Testing an existing MassSpecGymModel

In [None]:
import torch
import torch.nn as nn
import pytorch_lightning as pl
from pytorch_lightning import Trainer
import numpy, sys

from massspecgym.data import RetrievalDataset, MassSpecDataModule
from massspecgym.data.transforms import SpecTokenizer, MolFingerprinter
from massspecgym.models.base import Stage
from massspecgym.models.retrieval.base import RetrievalMassSpecGymModel
numpy.set_printoptions(threshold=sys.maxsize)


In [2]:
class MyDeepSetsRetrievalModel(RetrievalMassSpecGymModel):
    # constructor
    def __init__(
        self,
        hidden_channels: int = 128,
        out_channels: int = 4096,  # fingerprint size
        # out_channels: int = 4096,  # fingerprint size
        *args,
        **kwargs
    ):
        """Implement your architecture."""
        super().__init__(*args, **kwargs)

        self.phi = nn.Sequential(
            nn.Linear(2, hidden_channels),
            nn.ReLU(),
            nn.Linear(hidden_channels, hidden_channels),
            nn.ReLU(),
        )
        self.rho = nn.Sequential(
            nn.Linear(hidden_channels, hidden_channels),
            nn.ReLU(),
            nn.Linear(hidden_channels, out_channels),
            nn.Sigmoid()
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """Implement your prediction logic."""
        x = self.phi(x)
        x = x.sum(dim=-2)  # sum over peaks
        x = self.rho(x)
        return x

    def step(
        self, batch: dict, stage: Stage
    ) -> tuple[torch.Tensor, torch.Tensor]:
        """Implement your custom logic of using predictions for training and inference."""
        # Unpack inputs
        x = batch["spec"]  # input spectra
        fp_true = batch["mol"]  # true fingerprints
        cands = batch["candidates"]  # candidate fingerprints concatenated for a batch
        #print(cands)
        batch_ptr = batch["batch_ptr"]  # number of candidates per sample in a batch
        #print(batch_ptr)

        # Predict fingerprint
        fp_pred = self.forward(x)

        # Calculate loss
        loss = nn.functional.mse_loss(fp_true, fp_pred)

        # Calculate final similarity scores between predicted fingerprints and retrieval candidates
        fp_pred_repeated = fp_pred.repeat_interleave(batch_ptr, dim=0)
        scores = nn.functional.cosine_similarity(fp_pred_repeated, cands)

        return dict(loss=loss, scores=scores)

In [9]:
# Init hyperparameters
n_peaks = 10
fp_size = 4096
batch_size = 2

# Load dataset
dataset = RetrievalDataset(
    spec_transform=SpecTokenizer(n_peaks=n_peaks),
    mol_transform=MolFingerprinter(fp_size=fp_size),
)

# Init data module
data_module = MassSpecDataModule(
    dataset=dataset,
    batch_size=batch_size,
    num_workers=4
)




In [None]:
from lightning.pytorch import loggers as pl_loggers

# Init model
model = MyDeepSetsRetrievalModel(out_channels=fp_size)

# Init trainer
tb_logger = pl_loggers.TensorBoardLogger(save_dir="logs/")
trainer = Trainer(accelerator="auto", devices="auto", max_epochs=1, logger=tb_logger)

# Train
trainer.fit(model, datamodule=data_module)

## Fluoride Detection

In [4]:
import torch
import torch.nn as nn
import pytorch_lightning as pl
from pytorch_lightning import Trainer
import numpy, sys

from massspecgym.data import MassSpecDataset, MassSpecDataModule
from massspecgym.data.transforms import SpecTokenizer, MolFingerprinter
from massspecgym.models.base import Stage
from massspecgym.models.retrieval.base import MassSpecGymModel
from sklearn.metrics import precision_score, recall_score
from pytorch_lightning import loggers as pl_loggers

numpy.set_printoptions(threshold=sys.maxsize)


In [5]:
import numpy as np
from rdkit import Chem
from massspecgym.data.transforms import MolToHalogensVector


# Example usage
checker = MolToHalogensVector()
smiles_string = "CC(F)(F)F"
halogen_vector = checker.from_smiles(smiles_string)
print(halogen_vector)
# Example usage
smiles_string = "CCBr"
halogen_vector = checker.from_smiles(smiles_string)
print(halogen_vector)

[1 0 0 0]
[0 0 1 0]


In [6]:
class HalogenPredMassSpecGymDataset(MassSpecDataset):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.mol_transform = MolToHalogensVector()


class HalogenPredMassSpecGymModel(MassSpecGymModel):
    def __init__(
        self,
        *args,
        **kwargs
    ):
        super().__init__(*args, **kwargs)
        self.alpha = 0.8;
        self.gamma = 2;

    def step(
        self, batch: dict, stage: Stage
    ) -> tuple[torch.Tensor, torch.Tensor]:
        """Implement your custom logic of using predictions for training and inference."""
        # Unpack inputs
        x = batch["spec"]  
        # input spectra [batch_size, num_peaks + 1, 2]
        halogen_vector_true = batch["mol"]  # true values
        halogen_vector_true = halogen_vector_true.float()
        # Predict halogen
        halogen_vector_pred = self.forward(x)
        halogen_vector_pred = halogen_vector_pred.float()
        
        #Calculate loss
        #loss = nn.BCELoss()
        #loss = loss(halogen_vector_pred, halogen_vector_true)
        
        # Focal Loss
        # Increase loss for minority misclassification (F = 1 but predicted as 0) and 
        # decreases loss for majority class misclassification (F = 0 but predicted as 1)
        # Our MassEpcGym training data is skewed with only 5% of molecules containing Fluorine
        focal_loss = -self.alpha * halogen_vector_true * torch.pow(1 - halogen_vector_pred, self.gamma) * torch.log(halogen_vector_pred) - \
                 (1 - self.alpha) * (1 - halogen_vector_true) * torch.pow(halogen_vector_pred, self.gamma) * torch.log(1 - halogen_vector_pred)

        # Return the mean focal loss
        return { 'loss': torch.mean(focal_loss) } 

    def on_batch_end(
        self, outputs: [], batch: dict, batch_idx: int, stage: Stage
    ) -> None:
        x = batch["spec"]
        halogen_vector_true = batch["mol"]
        halogen_vector_pred = self.forward(x)
        halogen_vector_pred_binary = torch.where(halogen_vector_pred >= 0.5, 1, 0)

        # Extract the 1st column --> fluorine predictions
        column = halogen_vector_true[:, 0]
        # Convert the column to a row
        true_values = column.unsqueeze(0).cpu().numpy().flatten()

        # Extract the 1st column --> fluorine predictions
        column = halogen_vector_pred_binary[:, 0]
        # Convert the column to a row
        pred_values = column.unsqueeze(0).cpu().numpy().flatten()

        precision = precision_score(true_values, pred_values, zero_division=0)
        recall = recall_score(true_values, pred_values, zero_division=0)

        self.log_dict(
            {"precision" : precision, "recall": recall, 'loss': outputs['loss']}, 
            prog_bar=True,
            on_step=False,
            on_epoch=True
        )

In [7]:
from torch import nn
import torch.nn.functional as F
from massspecgym.models.base import Stage
from dreams.api import PreTrainedModel
from dreams.models.dreams.dreams import DreaMS as DreaMSModel

# Example model
class HalogenDetectorDreams(HalogenPredMassSpecGymModel):
    def __init__(
        self,
        *args,
        **kwargs
    ):
        super().__init__(*args, **kwargs)
        self.spec_encoder = PreTrainedModel.from_ckpt(
            # ckpt_path should be replaced with the path to the ssl_model.ckpt model downloaded from https://zenodo.org/records/10997887
            ckpt_path="https://zenodo.org/records/10997887/files/ssl_model.ckpt?download=1", ckpt_cls=DreaMSModel, n_highest_peaks=60
        ).model.train()
        self.lin_out = nn.Linear(1024, 4) # for the 4 halogens (F, Cl, Br, I)

    def forward(self, x):
        x = self.spec_encoder(x)[:, 0, :] # to get the precursor peak token embedding 
        x = F.sigmoid(self.lin_out(x))
        return x


class HalogenDetectorDeepSets(HalogenPredMassSpecGymModel):
    def __init__(
        self,
        hidden_channels: int = 128,
        *args,
        **kwargs
    ):
        """Implement your architecture."""
        super().__init__(*args, **kwargs)
        self.out_channels = 4 # for the 4 halogens (F, Cl, Br, I)
        self.phi = nn.Sequential(
            nn.Linear(2, hidden_channels),
            nn.ReLU(),
            nn.Linear(hidden_channels, hidden_channels),
            nn.ReLU(),
        )
        self.rho = nn.Sequential(
            nn.Linear(hidden_channels, hidden_channels),
            nn.ReLU(),
            nn.Linear(hidden_channels, self.out_channels),
            nn.Sigmoid()
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """Implement your prediction logic."""
        x = self.phi(x)
        x = x.sum(dim=-2)  # sum over peaks
        x = self.rho(x)
        return x

In [None]:
# Init hyperparameters
n_peaks = 60
batch_size = 32

# Load dataset
dataset = HalogenPredMassSpecGymDataset(
    spec_transform=SpecTokenizer(n_peaks=n_peaks),
)

# Init data module
data_module = MassSpecDataModule(
    dataset=dataset,
    batch_size=batch_size,
    num_workers=4
)

# Init model
#model = HalogenDetectorDeepSets()
model = HalogenDetectorDreams()

# Init trainer
from pytorch_lightning.loggers import CSVLogger
logger = CSVLogger("logs", flush_logs_every_n_steps=200)
trainer = Trainer(accelerator="auto", devices="auto", max_epochs=1, logger=logger)

# Train
trainer.fit(model, datamodule=data_module)

## Scratch

In [None]:
import torch
import torch.nn as nn

# Define the network
hidden_channels = 5
net = nn.Sequential(
    nn.Linear(2, hidden_channels),
    nn.ReLU(),
    nn.Linear(hidden_channels, hidden_channels),
    nn.ReLU(),
)

# Initialize the network
net = net.float()

# Create a dummy input tensor
input_tensor = torch.ones(5, 10, 2)
# Forward pass
output = net(input_tensor)
print(output) 

In [None]:
true_values = np.array([1, 1., 1])
pred_values = np.array([0.0, 1, 1])

precision_score(true_values, pred_values)
recall_score(true_values, pred_values)

In [None]:
from torch import nn
import torch.nn.functional as F
from massspecgym.models.base import Stage
from dreams.api import PreTrainedModel
from dreams.models.dreams.dreams import DreaMS as DreaMSModel

# Example forward pass (not needed to explicitly initialize the DataLoader if you are using MassSpecGym)
from massspecgym.data.datasets import MassSpecDataset
from massspecgym.data.transforms import SpecTokenizer
from torch.utils.data import DataLoader

dataset = HalogenPredMassSpecGymDataset(
    spec_transform=SpecTokenizer(n_peaks=60),
)
dataloader = DataLoader(dataset, batch_size=4)
model = HalogenDetectorDreams()

dummy_batch = next(iter(dataloader))
dummy_output = model(dummy_batch)
print(dummy_output)  # Should print a tensor of shape (4, 4) containing halogen probabilties


In [None]:
m = nn.Sigmoid()
loss = nn.BCELoss()
input = torch.tensor([2.0, 3.0, 5.0])
print(m(input))
target = torch.tensor([1.0, 0.0, 1.0])
print(target)
output = loss(m(input), target)
output
#output.backward()
