**Задание:** Обучить рекуррентную нейронную сеть на SMILES и сравнить результаты с сетями на фингерпринтах.

In [1]:
import pickle

from rdkit import Chem
from rdkit.Chem.Crippen import MolLogP
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.nn.utils.rnn as rnn_utils
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader, Dataset
import torch.optim as optim
from tqdm import tqdm

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
df = pd.read_csv('dataset_v1.csv', nrows=10000)
logP = [MolLogP(Chem.MolFromSmiles(x)) for x in df.SMILES]
#logP = pickle.load(open("logP",'rb'))

In [3]:
class Vocabulary():
    """
    Class to create a vocabulary from a list of SMILES strings and convert them to tokens.
    The class keeps the vocabulary in itself and provides methods to convert SMILES strings to tokenized tensors.
    """
    def __init__(self, data):
        """
        Initialize a new Vocabulary object.
        """
        chars = set()
        for string in data:
            chars.update(string)

        self.chars = chars
        all_syms = sorted(list(self.chars))
        self.data = data
        self.c2i = {c: i + 1 for i, c in enumerate(all_syms)}

    def tokens(self):
        """
        Convert the SMILES strings in the vocabulary to tokenized tensors.

        Returns:
            list: A list of tokenized tensors, where each tensor corresponds to a SMILES string.
        """
        tensors = [torch.tensor(self.string2ids(string))
                   for string in self.data]
        return tensors

    def string2ids(self, string):
        """
        Convert a single SMILES string to a list of token IDs.

        Args:
            string (str): A SMILES string.

        Returns:
            list: A list of token IDs, where each ID corresponds to a token in the SMILES string.
        """
        ids = [self.char2id(c) for c in string]
        return ids

    def char2id(self, char):
        """
        Convert a single character to a token ID.

        Args:
            char (str): A character in a SMILES string.

        Returns:
            int: The ID of the corresponding token.
        """
        return self.c2i[char]

In [4]:
class Dataset_logP(Dataset):
    """
    A PyTorch dataset for logP prediction.
    """
    def __init__(self, inputs, labels, lens):
        """
        Initialize the dataset.

        Args:
            inputs (list): A list of input molecules in SMILES format.
            labels (list): A list of corresponding logP values.
            lens (list): A list of lengths of the input molecules.
        """
        self.inputs = inputs
        self.labels = torch.from_numpy(np.asarray(labels, dtype=float)).type(torch.float)
        self.lens = lens

    def __len__(self):
        """
        Returns the length of the dataset.

        Returns:
            int: The length of the dataset.
        """
        return len(self.labels)

    def __getitem__(self, idx):
        """
        Returns a tuple of input, label, and length at the given index.

        Args:
            idx (int): The index of the sample to retrieve.

        Returns:
            tuple: A tuple of input, label, and length at the given index.
        """
        return self.inputs[idx], self.labels[idx], self.lens[idx]

In [5]:
data = Vocabulary(df.SMILES)
labels = logP

In [6]:
lens = [len(string) for string in data.tokens()]

In [7]:
data_list = Dataset_logP(data.tokens(), labels, lens)

In [8]:
train = []
test = []
test_scaffolds = []

for sid, split in enumerate(df.SPLIT):
    """
    Iterate over each row in the 'SPLIT' column of the DataFrame and add the corresponding data point to either 
    the train, test, or test_scaffolds list based on its value.

    Args:
        sid (int): The index of the current data point.
        split (str): The value of the 'SPLIT' column for the current data point.

    Returns:
        None
    """
    if split == 'train':
        train.append(data_list[sid])
    elif split == 'test':
        test.append(data_list[sid])
    else:
        test_scaffolds.append(data_list[sid])

In [9]:
def collate(data):
    """
    Collates a batch of data for use in a recurrent neural network (RNN) by padding sequences
    to a common length and returning the padded sequences, targets, and original sequence lengths.

    Args:
        data (list): A list of tuples, where each tuple contains a tensor of input sequences,
            a tensor of corresponding targets, and an integer representing the length of the
            original sequence.

    Returns:
        tuple: A tuple containing the padded input sequences, targets, and sequence lengths.
            The input sequences are padded with zeros to the length of the longest sequence
            in the batch, and the targets and sequence lengths are returned as tensors.
    """
    padded_inputs = pad_sequence([t[0] for t in data], batch_first=True)
    targets = torch.tensor([t[1] for t in data])
    lens = torch.tensor([t[2] for t in data])
    return padded_inputs, targets, lens

In [10]:
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

class RNN(nn.Module):
    """
    RNN module that performs forward pass of a recurrent neural network.

    Args:
        vocab_size (int): The size of the vocabulary including the pad token.

    Attributes:
        hidden_size (int): The number of features in the hidden state of the LSTM layer.
        num_layers (int): Number of recurrent layers. Default is 2.
        dropout (float): Dropout probability. Default is 0.3.
        vocab_size (int): The size of the vocabulary including the pad token.
        embedding_size (int): The size of the input to the LSTM layer.

    Methods:
        forward(x, lens, hiddens=None): Performs the forward pass of the RNN module.

    Returns:
        x (tensor): The output of the linear layer.
    """
    def __init__(self, vocab_size, hidden_size):
        super(RNN, self).__init__()

        self.vocab_size = vocab_size
        self.hidden_size = hidden_size
        self.num_layers = 2
        self.dropout = 0.3
        self.embedding_size = 100

        self.embedding_layer = nn.Embedding(self.vocab_size, self.embedding_size)
        self.lstm_layer = nn.LSTM(self.embedding_size, self.hidden_size,
                                  self.num_layers, dropout=self.dropout,
                                  batch_first=True)
        self.linear_layer = nn.Linear(self.hidden_size, 1)

    def forward(self, x, lens, hiddens=None):
        x = self.embedding_layer(x)
        x = rnn_utils.pack_padded_sequence(x, lens, batch_first=True, enforce_sorted=False)
        x, hiddens = self.lstm_layer(x, hiddens)
        x, _ = rnn_utils.pad_packed_sequence(x, batch_first=True)
        last_seq_idxs = lens - 1
        last_seq_items = x[range(x.shape[0]), last_seq_idxs, :]
        x = self.linear_layer(last_seq_items)

        return x

In [11]:
def get_params():
        return (p for p in model.parameters() if p.requires_grad)

In [12]:
def train_epoch(model, tqdm_data, criterion, optimizer=None):
    if optimizer is None:
        model.eval()
    else:
        model.train()

    postfix = {'loss': 0,
               'running_loss': 0}
    for i, (data, targets, lens) in enumerate(tqdm_data):
        data = data.to(device)
        targets = targets.to(device)
        outputs = model(data, lens)
        loss = criterion(outputs.flatten(), targets)
        if optimizer is not None:
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        postfix['loss'] = loss.item()
        postfix['running_loss'] += (loss.item() -
                                    postfix['running_loss']) / (i + 1)
        tqdm_data.set_postfix(postfix)

    postfix['mode'] = 'Eval' if optimizer is None else 'Train'
    return postfix

In [13]:
def test_model(model, data, criterion):
    model.eval()
    loss_out = []
    with torch.no_grad():
        for i, (data, targets, lens) in enumerate(data):
            data = data.to(device)
            targets = targets.to(device)
            outputs = model(data, lens)
            loss = criterion(outputs.flatten(), targets)
            loss_out.append(loss)
    return sum(loss_out) / len(loss_out)

In [14]:
batch_size = [5, 10, 20]
hidden_size = [10, 50, 100]
learning_rate = [0.1, 0.01, 0.001]

In [15]:
df_result = pd.DataFrame(columns=['batch_size', 'hidden_size', 'learning_rate', 'losses_test', 'losses_test_scaf'])

for bs in batch_size:
    for hs in hidden_size:
        for lr in learning_rate:
            
            model = RNN(len(data.chars) + 1, hs).to(device)

            train_loader = DataLoader(train, batch_size=bs, collate_fn = collate)
            test_loader = DataLoader(test, batch_size=bs, collate_fn = collate)
            test_scaffolds_loader = DataLoader(test_scaffolds, batch_size=bs, collate_fn = collate)

            criterion = nn.MSELoss()
            optimizer = optim.Adam(get_params(), lr=lr)

            model.zero_grad()
            for epoch in range(5):
                tqdm_data = tqdm(train_loader, desc='Training (epoch #{})'.format(epoch))
                postfix = train_epoch(model, tqdm_data, criterion, optimizer)

            test_loss = test_model(model, test_loader, criterion)
            test_scaffolds_loss = test_model(model, test_scaffolds_loader, criterion)

            df_result.loc[len(df_result.index)] = [bs, hs, lr, float(test_loss), float(test_scaffolds_loss)]

Training (epoch #0): 100%|██████████| 1660/1660 [00:22<00:00, 74.86it/s, loss=0.376, running_loss=1.06] 
Training (epoch #1): 100%|██████████| 1660/1660 [00:22<00:00, 74.31it/s, loss=0.341, running_loss=1.02] 
Training (epoch #2): 100%|██████████| 1660/1660 [00:22<00:00, 75.29it/s, loss=0.337, running_loss=1.01] 
Training (epoch #3): 100%|██████████| 1660/1660 [00:22<00:00, 73.22it/s, loss=0.342, running_loss=1.01] 
Training (epoch #4): 100%|██████████| 1660/1660 [00:24<00:00, 66.41it/s, loss=0.345, running_loss=1.01] 
Training (epoch #0): 100%|██████████| 1660/1660 [00:26<00:00, 62.84it/s, loss=0.205, running_loss=0.598] 
Training (epoch #1): 100%|██████████| 1660/1660 [00:26<00:00, 62.34it/s, loss=0.202, running_loss=0.44]  
Training (epoch #2): 100%|██████████| 1660/1660 [00:26<00:00, 62.86it/s, loss=0.119, running_loss=0.382]  
Training (epoch #3): 100%|██████████| 1660/1660 [00:27<00:00, 61.23it/s, loss=0.0572, running_loss=0.38] 
Training (epoch #4): 100%|██████████| 1660/1660 [0

#### Результаты

Видно, что в оптимизируемых параметрах отсутствует какая-либо тенденция. При этом наилучшим сочетанием оказались параметры batch_size = 20, hidden_size = 100, learning_rate = 0.001, что соответствует ошибке модели равной 0.075

In [17]:
df_result.sort_values(by=['losses_test']).head(5)

Unnamed: 0,batch_size,hidden_size,learning_rate,losses_test,losses_test_scaf
26,20.0,100.0,0.001,0.075192,0.069648
17,10.0,100.0,0.001,0.090875,0.091055
23,20.0,50.0,0.001,0.099885,0.093177
8,5.0,100.0,0.001,0.10625,0.114236
14,10.0,50.0,0.001,0.112716,0.101571


RNN показала улучшенные результаты в сравнении с сетями на фингерпринтах, минимальная ошибка которых составляла 0.15