In [None]:
! [ -e /content ] && pip install -Uqq mrl-pypi  # download MRL

In [None]:
!pip install pandas==1.3.5

# Design de Peptídeo Antimicrobiano

## Design de Peptídeo Antimicrobiano

Esse é o código das etapas para design de novos peptídeos antimicrobianos, utilizando a biblioteca MRL.

In [None]:
import sys
sys.path.append('..')

from mrl.imports import *
from mrl.core import *
from mrl.chem import *
from mrl.templates.all import *

from mrl.torch_imports import *
from mrl.torch_core import *
from mrl.layers import *
from mrl.dataloaders import *
from mrl.g_models.all import *
from mrl.vocab import *
from mrl.policy_gradient import *
from mrl.train.all import *
from mrl.model_zoo import *
from sklearn.metrics import roc_curve, auc
from sklearn.metrics import roc_auc_score
from google.colab import files


In [None]:
os.makedirs('untracked_files', exist_ok=True)

## Dados

A base de dados foi atualizada e filtrada do banco de dados GRAMPA (WITTEN; WITTEN, 2019).  para conter somente peptídeos antimicrobianos com tamanho entre 5-15 peptídeos e atividade antimicrobiana menor que 32 uM.

In [None]:
download_files()
df = pd.read_csv('../dados/k_pneumonia_peptides.csv')

In [None]:
df = df[["name", 'sequence', "dataset", "label"]]
df.head()

## Função de Pontuação

Foi utilizada um encoder RNC com uma MLP head para predizer uma classificação binária para o valor de atividade antimicrobiana. Essa foi a função de pontuação para contar a recompensa dos peptídeos gerados nas etapas a frente.

In [None]:
train_df = df[df.dataset=='train']
valid_df = df[df.dataset=='valid']

In [None]:
aa_vocab = CharacterVocab(AMINO_ACID_VOCAB)

amp_ds = Text_Prediction_Dataset(train_df.sequence.values, train_df.label.values, aa_vocab)
test_ds = Text_Prediction_Dataset(valid_df.sequence.values, valid_df.label.values, aa_vocab)

Esse é modelo que foi utilizado:

In [None]:
class Predictive_CNN(nn.Module):
    def __init__(self,
                 d_vocab,
                 d_embedding,
                 d_latent,
                 filters,
                 kernel_sizes,
                 strides,
                 dropouts,
                 mlp_dims,
                 mlp_drops,
                 d_out
                ):
        super().__init__()

        self.conv_encoder = Conv_Encoder(
                                        d_vocab,
                                        d_embedding,
                                        d_latent,
                                        filters,
                                        kernel_sizes,
                                        strides,
                                        dropouts,
                                    )

        self.mlp_head = MLP(
                            d_latent,
                            mlp_dims,
                            d_out,
                            mlp_drops
                            )

    def forward(self, x):
        encoded = self.conv_encoder(x)
        out = self.mlp_head(encoded)
        return out

In [None]:
d_vocab = len(aa_vocab.itos)
d_embedding = 256
d_latent = 512
filters = [128, 256]
kernel_sizes = [5, 5]
strides = [1, 1]
dropouts = [0.2, 0.2, 0.2]
mlp_dims = [512, 256, 128]
mlp_drops = [0.2, 0.2, 0.2]
d_out = 1

In [None]:
from sklearn.model_selection import KFold
import numpy as np
import torch

all_preds = []
all_targs = []

k_folds = 5

sequences = np.array(df.sequence.values)
labels = np.array(df.label.values)

kf = KFold(n_splits=k_folds, shuffle=True, random_state=42)

fold_metrics = []

for fold, (train_idxs, valid_idxs) in enumerate(kf.split(sequences)):
    print(f"Fold {fold + 1}/{k_folds}")

    train_ds = Text_Prediction_Dataset(
        sequences[train_idxs],
        labels[train_idxs],
        aa_vocab
    )
    valid_ds = Text_Prediction_Dataset(
        sequences[valid_idxs],
        labels[valid_idxs],
        aa_vocab
    )

    amp_model = Predictive_CNN(
        d_vocab,
        d_embedding,
        d_latent,
        filters,
        kernel_sizes,
        strides,
        dropouts,
        mlp_dims,
        mlp_drops,
        d_out
    )

    r_agent = PredictiveAgent(
        amp_model,
        BinaryCrossEntropy(),
        train_ds,
        opt_kwargs={'lr': 1e-3}
    )

    r_agent.train_supervised(bs=32, epochs=30, lr=1e-3)

    valid_preds = r_agent.predict_dataset(valid_ds, detach=True)
    valid_labels = torch.tensor(labels[valid_idxs], dtype=torch.float32)

    valid_preds_bin = (valid_preds > 0.5).float()
    accuracy = (valid_preds_bin == valid_labels).sum().item() / len(valid_labels)

    print(f"Fold {fold + 1} Accuracy: {accuracy:.4f}")
    fold_metrics.append(accuracy)

    valid_dl = valid_ds.dataloader(256, shuffle=False)
    fold_preds = []
    fold_targs = []

    with torch.no_grad():
        for batch in valid_dl:
            batch = to_device(batch)
            x, y = batch
            pred = r_agent.model(x)
            fold_preds.append(pred.detach().cpu())
            fold_targs.append(y.detach().cpu())

    all_preds.append(torch.cat(fold_preds))
    all_targs.append(torch.cat(fold_targs))

mean_accuracy = np.mean(fold_metrics)
std_accuracy = np.std(fold_metrics)

print(f"\nCross-Validation Results:")
print(f"Mean Accuracy: {mean_accuracy:.4f}")
print(f"Standard Deviation: {std_accuracy:.4f}")

In [None]:
all_preds = torch.cat(all_preds).numpy()
all_targs = torch.cat(all_targs).numpy()

fpr, tpr, _ = roc_curve(all_targs, torch.tensor(all_preds).sigmoid().squeeze().numpy())
roc_auc = auc(fpr, tpr)

plt.figure()
lw = 2
plt.plot(fpr, tpr, color='darkorange',
         lw=lw, label='Curva ROC (área = %0.4f)' % roc_auc)
plt.plot([0, 1], [0, 1], color='navy', lw=lw, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('Taxa de falsos positivos')
plt.ylabel('Taxa de verdadeiros positivos')
plt.title('Curva ROC')
plt.legend(loc="lower right")
plt.show()

In [None]:
r_agent.save_weights('untracked_files/amp_predictor.pt')

## O Espaço químico

Nessa etapa, foi desenvolvido o espaço químico. Aqui foi decidido quais peptídeos deveriam ser inclusos e quais seriam removidos.

In [None]:
import math
from rdkit import Chem

moon_fleming_scale = {
    "A": -1.57,
    "R": 2.14,
    "N": 1.91,
    "D": 1.38,
    "C": -1.08,
    "Q": 1.44,
    "E": 0.07,
    "G": 0.15,
    "H": 3.19,
    "I": -3.12,
    "L": -3.32,
    "K": 3.82,
    "M": -2.33,
    "F": -3.77,
    "P": -3.09,
    "S": 0.26,
    "T": 0.21,
    "W": -1.95,
    "Y": -2.66,
    "V": -2.34,
}

def get_hydrophobic_values(mol: str) -> list[float]:
    """
    Transforma cada aminoácido no valor hidrofóbico correspondente da escala.

    :param mol: A sequência do peptídeo
    :return hvalues: A sequência do peptídeo convertida para valores da escala
    """
    hvalues = []

    for aa in mol:
        sc_hydrophobicity = moon_fleming_scale.get(aa, None)
        if sc_hydrophobicity is None:
            raise KeyError("Aminoácido não definido na escala: {}".format(aa))
        hvalues.append(sc_hydrophobicity)
    return hvalues


def calculate_moment(mol: str) -> float:
    """
    Calcula o momento dipolar hidrofóbico a partir de uma matriz de valores
    de hidrofobicidade. Fórmula definida por Eisenberg, 1982 (Nature).
    Retorna o momento médio (normalizado pelo comprimento da sequência).

    uH = sqrt(sum(Hi cos(i*d))**2 + sum(Hi sin(i*d))**2),
    onde i é o índice do aminoácido e d (delta) é um valor angular em
    graus (100 para alfa-hélice, 180 para folha beta).

    :param mol: A sequência do peptídeo
    :return hm: O valor do momento hidrofóbico
    """
    mol1 = Chem.MolToSequence(mol)
    angle = 100
    sum_cos, sum_sin = 0.0, 0.0
    hvalues = get_hydrophobic_values(mol1)
    for i, hv in enumerate(hvalues):
        rad_inc = ((i * angle) * math.pi) / 180.0
        sum_cos += hv * math.cos(rad_inc)
        sum_sin += hv * math.sin(rad_inc)
    return math.sqrt(sum_cos**2 + sum_sin**2) / len(hvalues)


def calculate_hydrophobicity(mol: str) -> float:
    """
    Calcula o valor da hidrofobicidade para cada aminoácido na sequência.

    :param mol: A sequência do peptídeo
    :return h: O valor da hidrofobicidade do peptídeo
    """
    mol1 = Chem.MolToSequence(mol)
    hvalues = get_hydrophobic_values(mol1)
    hydrophobicity = sum(hvalues) / len(hvalues)
    return hydrophobicity


class HydrophobicityFilter(PropertyFilter):
    def __init__(self, min_val, max_val, score=None, name=None, **kwargs):
        """
        Filtro que avalia a hidrofobicidade do peptídeo.

        :param min_val: Valor mínimo permitido de hidrofobicidade
        :param max_val: Valor máximo permitido de hidrofobicidade
        :param score: Pontuação atribuída ao filtro
        :param name: Nome do filtro
        :param kwargs: Argumentos adicionais
        """
        super().__init__(
            calculate_hydrophobicity,
            min_val=min_val,
            max_val=max_val,
            score=score,
            name=name,
            **kwargs,
        )


class HydrophobicityMomentFilter(PropertyFilter):
    def __init__(self, min_val, max_val, score=None, name=None, **kwargs):
        """
        Filtro que avalia o momento hidrofóbico do peptídeo.

        :param min_val: Valor mínimo permitido do momento hidrofóbico
        :param max_val: Valor máximo permitido do momento hidrofóbico
        :param score: Pontuação atribuída ao filtro
        :param name: Nome do filtro
        :param kwargs: Argumentos adicionais
        """
        super().__init__(
            calculate_moment,
            min_val=min_val,
            max_val=max_val,
            score=score,
            name=name,
            **kwargs,
        )


In [None]:
class CombinedCharacterCountFilter(Filter):
    '''
    CombinedCharacterCountFilter - valida um `Mol` com base na contagem combinada de caracteres específicos

    Entradas:
    - `char_groups list[list[str]]`: lista de grupos de caracteres a serem contados como uma única entidade
    - `min_val Optional[float, int]`: valor mínimo para a contagem
    - `max_val Optional[float, int]`: valor máximo para a contagem
    - `per_length bool`: se True, as contagens são normalizadas pelo comprimento da string
    - `score [None, int, float, ScoreFunction]`: veja `Filter.set_score`
    - `name Optional[str]`: nome do filtro usado na representação (`repr`)
    - `fail_score [float, int]`: usado em `Filter.set_score` se `score_function` for (int, float)
    - `mode str['smile', 'protein', 'dna', 'rna']`: determina como as entradas são convertidas em objetos Mol
    '''
    def __init__(self, char_groups, min_val=None, max_val=None, per_length=False,
                 score=None, name=None, fail_score=0., mode='smile'):
        if name is None:
            name = f"Filtro de Contagem Combinada de Caracteres"

        super().__init__(score, name, fail_score=fail_score, mode=mode)

        self.char_groups = char_groups
        self.min_val = min_val
        self.max_val = max_val
        self.per_length = per_length

    def property_function(self, mol):
        return self.to_string(mol)

    def criteria_function(self, property_output):
        total_count = sum(property_output.count(char) for group in self.char_groups for char in group)

        if self.per_length:
            total_count /= len(property_output)  # Normaliza pela extensão da string, se necessário

        meets_min = (total_count >= self.min_val) if self.min_val is not None else True
        meets_max = (total_count <= self.max_val) if self.max_val is not None else True

        return meets_min and meets_max

    def __repr__(self):
        return f'{self.name} ({self.min_val}, {self.max_val})'


In [None]:
aa_vocab = CharacterVocab(AMINO_ACID_VOCAB)

template = Template([
    ValidityFilter(),
    CombinedCharacterCountFilter(
        char_groups=[['R', 'K']],
        min_val=5,
        max_val=None,
        per_length=False,
        mode='protein'
    ),

    HydrophobicityFilter(min_val=0, max_val=1, mode='protein'),
    HydrophobicityMomentFilter(min_val=0, max_val=1, mode='protein'),
    CharacterCountFilter(['D'], min_val=0, max_val=0, per_length=True, mode='protein'),
    CharacterCountFilter(['E'], min_val=0, max_val=0, per_length=True, mode='protein'),
    CharacterCountFilter(['C'], min_val=0, max_val=0, per_length=True, mode='protein'),
    CharacterCountFilter(aa_vocab.itos[4:], min_val=0, max_val=0.4, per_length=True, mode='protein'),
    PropertyFilter(molwt, min_val=800, max_val=3000)
], [], fail_score=-10., log=False, use_lookup=False, mode='protein')

template_cb = TemplateCallback(template, prefilter=True)

## Carregar Modelo

O modelo `LSTM_LM_Small_Swissprot` foi carregado para base do modelo gerativo. Ele é um modelo básico treinado na base de dados Swissprot

In [None]:
agent = LSTM_LM_Small_Swissprot(drop_scale=0.3, opt_kwargs={'lr':1e-4})

## Fine-Tune o modelo

O modelo pré-treinado que foi carregado é muito generalizado e pode produzir uma grande diversidade de estruturas. Nós precisamos especificamente de peptídeos antimicrobianos, então realizamos um finetuning na base de dados AMPlify.

In [None]:
df = pd.read_csv('../dados/Amplify.csv')
df.head()

In [None]:
agent.update_dataset_from_inputs(df[df.label==1].sequence.values)
agent.train_supervised(32, 8, 5e-5)
agent.base_to_model()

In [None]:
agent.save_weights('untracked_files/finetuned_model.pt')

# Reinforcement Learning

Essa é a parte de Reinforcement Learning

### Perda

Foi usado `PPO` como a política de perda de gradiente

In [None]:
pg = PPO(0.99,
        0.5,
        lam=0.95,
        v_coef=0.5,
        cliprange=0.3,
        v_cliprange=0.3,
        ent_coef=0.01,
        kl_target=0.03,
        kl_horizon=3000,
        scale_rewards=True)

loss = PolicyLoss(pg, 'PPO',
                   value_head=ValueHead(256),
                   v_update_iter=2,
                   vopt_kwargs={'lr':1e-3})

### Recompensa

O agente de recompensa treinado anteriormente foi passado para um callback.  

Como o modelo é de classificação, foi necessário definir qual valor seria utilizado na função de pontuação. Duas abordagens foram consideradas:  

1. **Saída escalada pela função sigmoide** – Gera muitas amostras com pontuação próxima de `0.999`, dificultando a diferenciação entre as melhores.  
2. **Saída do logit bruto** – Permite melhor distinção das amostras no topo, mas pode gerar valores extremos.  

A abordagem escolhida foi a saída do logit bruto, com valores limitados ao intervalo `[-10, 10]`.  

In [None]:
d_vocab = len(aa_vocab.itos)
d_embedding = 256
d_latent = 512
filters = [128, 256]
kernel_sizes = [5, 5]
strides = [1, 1]
dropouts = [0.2, 0.2, 0.2]
mlp_dims = [512, 256, 128]
mlp_drops = [0.2, 0.2, 0.2]
d_out = 1


reward_model = Predictive_CNN(
                    d_vocab,
                    d_embedding,
                    d_latent,
                    filters,
                    kernel_sizes,
                    strides,
                    dropouts,
                    mlp_dims,
                    mlp_drops,
                    d_out
                )


r_ds = Text_Prediction_Dataset(['M'], [0.], aa_vocab)

r_agent = PredictiveAgent(reward_model, BinaryCrossEntropy(), r_ds, opt_kwargs={'lr':1e-3})

r_agent.load_weights('untracked_files/amp_predictor.pt')
# r_agent.load_state_dict(model_from_url('amp_predictor.pt')) # optional - load exact weights

reward_model.eval();

freeze(reward_model)

class ClippedModelReward():
    def __init__(self, agent, minclip, maxclip):
        self.agent = agent
        self.minclip = minclip
        self.maxclip = maxclip

    def __call__(self, sequences):
        preds = self.agent.predict_data(sequences)
        preds = torch.clamp(preds, self.minclip, self.maxclip)
        return preds

reward_function = Reward(ClippedModelReward(r_agent, -10, 10), weight=1)

amp_reward = RewardCallback(reward_function, 'amp')

### Métrica de Estabilidade  

Modelos de linguagem baseados em transformadores têm sido utilizados para aprendizado não supervisionado de estruturas de proteínas. Estudos recentes indicam uma relação entre a probabilidade logarítmica da sequência de uma proteína, gerada por um modelo generativo, e sua estabilidade.  

A probabilidade logarítmica fornecida por um modelo transformer pré-treinado foi utilizada como um indicativo de estabilidade. Incluir essa métrica como função de recompensa auxilia na geração de peptídeos mais realistas.  

Para essa etapa, foi utilizado o modelo ESM de grande escala, com **630M parâmetros**. Esse recurso melhora a qualidade dos resultados, mas aumenta significativamente o tempo de treinamento.  

In [None]:
! pip install fair-esm

In [None]:
import esm

In [None]:
protein_model, alphabet = esm.pretrained.esm2_t33_650M_UR50D()
batch_converter = alphabet.get_batch_converter()

In [None]:
class PeptideStability():
    def __init__(self, model, alphabet, batch_converter):
        self.model = model
        to_device(self.model)
        self.alphabet = alphabet
        self.batch_converter = batch_converter

    def __call__(self, samples):

        data = [
            (f'protein{i}', samples[i]) for i in range(len(samples))
        ]

        batch_labels, batch_strs, batch_tokens = self.batch_converter(data)

        with torch.no_grad():
            results = self.model(to_device(batch_tokens))

        lps = F.log_softmax(results['logits'], -1)

        mean_lps = lps.gather(2, to_device(batch_tokens).unsqueeze(-1)).squeeze(-1).mean(-1)

        return mean_lps

In [None]:
ps = PeptideStability(protein_model, alphabet, batch_converter)

In [None]:
stability_reward = Reward(ps, weight=0.1, bs=300)
stability_cb = RewardCallback(stability_reward, name='stability')

In [None]:
stability_reward(df.sequence.values[:10])

### Amostradores  

Foram utilizados os seguintes amostradores:  
- **`sampler1 ModelSampler`**: amostra do modelo principal, adicionando 1000 compostos ao buffer a cada atualização e extraindo 40% das amostras de cada lote diretamente do modelo.  
- **`sampler2 ModelSampler`**: amostra do modelo base, sem amostragem dinâmica em cada lote.  
- **`sampler3 LogSampler`**: seleciona amostras de alta pontuação do registro (`log`).  
- **`sampler4 TokenSwapSampler`**: utiliza a técnica de troca de tokens (`combi-chem`) para gerar novas amostras a partir das de maior pontuação.  
- **`sampler5 DatasetSampler`**: adiciona uma pequena quantidade de compostos ativos conhecidos em cada atualização do buffer, garantindo alinhamento com o comprimento gerado (75 aminoácidos).  

In [None]:
gen_bs = 1500

sampler1 = ModelSampler(agent.vocab, agent.model, 'live', 1000, 0., gen_bs)
sampler2 = ModelSampler(agent.vocab, agent.base_model, 'base', 1000, 0., gen_bs)
sampler3 = LogSampler('samples', 'rewards', 10, 98, 200)
sampler4 = TokenSwapSampler('samples', 'rewards', 10, 98, 200, aa_vocab, 0.2)
sampler5 = DatasetSampler(df[(df.label==1) & (df.sequence.map(lambda x: len(x)<=75))].sequence.values,
                          'data', buffer_size=4)

samplers = [sampler1, sampler2, sampler3, sampler4, sampler5]

### Callbacks  

- **`SupervisedCB`**: realiza treinamento supervisionado com os 3% melhores exemplos a cada 400 lotes.  
- **`MaxCallback`**: imprime a recompensa máxima de cada lote.  
- **`PercentileCallback`**: imprime a pontuação do percentil 90 a cada lote.  

In [None]:
supervised_cb = SupervisedCB(agent, 20, 0.5, 98, 1e-4, 64)
live_max = MaxCallback('rewards', 'live')
live_p90 = PercentileCallback('rewards', 'live', 90)

cbs = [supervised_cb, live_p90, live_max]

## Ambiente e treinamento

Aqui foi organizado o ambiente e rodado o treinamento em fim.

In [None]:
env = Environment(agent, template_cb, samplers=samplers, rewards=[amp_reward, stability_cb], losses=[loss],
                 cbs=cbs)

In [None]:
set_global_pool(min(12, os.cpu_count()))

In [None]:
env.fit(128, 75, 300, 20)

In [None]:
env.log.plot_metrics()