In [None]:
import torch
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
import numpy as np
import matplotlib.pyplot as plt
import time
import math
from torch.nn.utils.rnn import pack_padded_sequence
from rdkit import Chem
import numpy as np
from sklearn.metrics import r2_score
from sklearn.metrics import mean_squared_error as mse
import pandas as pd
import os

In [2]:
# k_flod
def data_train_kf(data, hidden_size=200, n_layers=2, n_epoch=100, batch_size=16, USE_GPU = False, set_cv=5):
    def createTensor(tensor):
        if USE_GPU:
            device = torch.device("cuda:0")
            tensor = tensor.to(device)
        return tensor

    class RNNClassifier(torch.nn.Module):
        def __init__(self, input_size, hidden_size, output_size, num_layers = 1, bidirectional = True):
            super(RNNClassifier, self).__init__()
            self.hidden_size = hidden_size
            self.num_layers = num_layers
            self.num_directions = 2 if bidirectional else 1

            self.embedding = torch.nn.Embedding(input_size, hidden_size)
            self.gru = torch.nn.GRU(hidden_size, hidden_size, num_layers, bidirectional = bidirectional)

            self.fc = torch.nn.Linear(hidden_size * self.num_directions, output_size)

        def initHidden(self, batch_size):
            hidden = torch.zeros(self.num_layers * self.num_directions, batch_size, self.hidden_size)
            return createTensor(hidden)

        def forward(self, input, seq_lengths):
            input = input.t()
            batch_size = input.size(1)
            hidden = self.initHidden(batch_size)


            embedding = self.embedding(input)
            gru_input = pack_padded_sequence(embedding, seq_lengths)
            output, hidden = self.gru(gru_input, hidden)
            if self.num_directions == 2:
                hidden_cat = torch.cat([hidden[-1], hidden[-2]], dim = 1)
            else:
                hidden_cat = hidden[-1]

            fc_output = self.fc(hidden_cat)
            return fc_output

    # smiles to ASCIIlist
    def smiles_to_ASCIIlist(smiles_list):
        ASCIIlist = [ord(smiles) for smiles in smiles_list]
        return ASCIIlist


    def makeTensors(smiles_list, tox_list):
        smiles_sequences = [smiles_to_ASCIIlist(smiles) for smiles in smiles_list]
        smiles_seq_lens = torch.LongTensor([len(name_ASCII) for name_ASCII in smiles_sequences])

        smiles_tensor = torch.zeros(len(smiles_sequences), smiles_seq_lens.max()).long()
        for index, (smiles_sequence, smiles_seq_len) in enumerate(zip(smiles_sequences, smiles_seq_lens), 0):
            smiles_tensor[index, 0:smiles_seq_len] = torch.LongTensor(smiles_sequence)

        ordered_smiles_seq_lens, len_indexes = smiles_seq_lens.sort(dim = 0, descending = True)
        ordered_smiles_tensor = smiles_tensor[len_indexes]
        ordered_tox_list = tox_list[len_indexes]

        return createTensor(ordered_smiles_tensor), createTensor(ordered_smiles_seq_lens), createTensor(ordered_tox_list) 

    def train():
        loss = 0.0
        for batch_index, (smiles, tox) in enumerate(train_loader):

            inputs, seq_lens, targets = makeTensors(smiles, tox) 
            outputs = classifier_model(inputs, seq_lens)
            targets = targets.to(torch.float32).reshape(-1,1)

            loss = criterion(outputs, targets)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            loss += loss.item()

            if batch_index % 10 == 9:
                print('time_elapsed: {:.1f}m {:.1f}s, Epoch {}, '.format(timePassed(start_time)[0], timePassed(start_time)[1], epoch), end = '')
                print(f'[{(batch_index+1) * len(inputs)} / {len(training_set)}] ', end = '')
                print(f'loss = {loss / ((batch_index+1) * len(inputs))}')

        return loss/len(train_loader)

    def test():
        correct = 0
        total_samples = len(test_set)
        with torch.no_grad():
            pred = []
            obs = []
            for i, (smiles, tox) in enumerate(test_loader):
                inputs, seq_lens, targets = makeTensors(smiles, tox)
                outputs = classifier_model(inputs, seq_lens)
                pred.append(outputs.data)
                obs.append(targets.data.reshape(-1,1))
            pred = np.vstack(pred).ravel()
            obs = np.vstack(obs).ravel()
            r2 = r2_score(obs, pred)
            Mse = mse(obs, pred)
            print('r2 on test: %.3f %%' % (100 * r2), 'mse on test: %.5f \n' % (Mse))
        return r2, Mse, pred, obs

    def timePassed(start_time):
        time_passed = time.time() - start_time
        minute = math.floor(time_passed / 60)
        second = time_passed - minute * 60
        return [minute, second]

    seed = 1
    os.makedirs("./results/{}/DL/{}/RNN/test/".format(data, seed), exist_ok=True)
    os.makedirs("./results/{}/DL/{}/RNN/train/".format(data, seed), exist_ok=True)
    
    # data load
    train_mols = Chem.SDMolSupplier("./data/{}/{}_training.sdf".format(data, data))
    train_smiles = [Chem.MolToSmiles(mol) for mol in train_mols]
    train_smiles_len = [len(smiles) for smiles in train_smiles]
    train_tox = [float(mol.GetProp("Tox")) for mol in train_mols]
    training_set = [(train_smiles[i], train_tox[i]) for i in range(len(train_smiles))]

    test_mols = Chem.SDMolSupplier("./data/{}/{}_prediction.sdf".format(data, data))
    test_smiles = [Chem.MolToSmiles(mol) for mol in test_mols]
    test_smiles_len = [len(smiles) for smiles in test_smiles]
    test_tox = [float(mol.GetProp("Tox")) for mol in test_mols]
    test_set = [(test_smiles[i], test_tox[i]) for i in range(len(test_smiles))]

    base_indices = np.arange(0,len(train_tox))
    np.random.seed(seed)
    np.random.shuffle(base_indices)
    np.random.seed(seed)
    np.random.shuffle(base_indices)
    
    step = int(len(train_tox)/set_cv)
    pred_kflod = pd.DataFrame(index=range(len(train_tox)), columns=["cv1~{}".format(set_cv), "True"])

    for cv in range(set_cv):
        print("*"*20, "Kflod", cv ,"*"*20)
        index = base_indices
        if cv < set_cv-1:
            index_train = np.concatenate([index[:cv*step],index[(cv+1)*step:]], axis=0)
            index_val = index[cv*step:(cv+1)*step]
        else: 
            index_train = index[0:cv*step]
            index_val = index[cv*step:]

        n_chars = 0
        for smiles in train_smiles:
            for char in smiles:
                if n_chars < ord(char):
                    n_chars = ord(char)
        for smiles in test_smiles:
            for char in smiles:
                if n_chars < ord(char):
                    n_chars = ord(char)
        n_chars = n_chars+1
        output_size = 1
        batch_size = 16
        train_loader = DataLoader(dataset = [training_set[index] for index in index_train], batch_size = batch_size, shuffle = True)
        test_loader = DataLoader(dataset = [training_set[index] for index in index_val], batch_size = batch_size, shuffle = False)

        classifier_model = RNNClassifier(n_chars, hidden_size, output_size, n_layers) # input_size, hidden_size, output_size, num_layers
        if USE_GPU:
            device = torch.device("cuda:0")
            classifier_model.to(device)

        criterion = torch.nn.MSELoss()
        optimizer = torch.optim.Adam(classifier_model.parameters(), lr = 0.001, weight_decay=10 ** (-5.0))

        # training and test
        start_time = time.time()
        print("The num of total training epochs is %d. " % n_epoch)
        r2_list = []
        Mse_list = []
        best_r2 = 0
        for epoch in range(n_epoch):
            train()
            r2, Mse, pred, obs = test()
            r2_list.append(r2)
            Mse_list.append(Mse)
            if best_r2 < r2:
                best_r2 = r2
                pred_kflod.iloc[index_val,0] = pred.reshape(1,-1)
                pred_kflod.iloc[index_val,1] = obs.reshape(1,-1)

        plt.figure()
        plt.plot(np.arange(1, n_epoch+1), r2_list)
        plt.plot(np.arange(1, n_epoch+1), Mse_list)
        plt.xlabel("epoch")
        plt.ylabel("r2 & MSE")
        plt.grid()
        plt.show()

    display(pred_kflod)
    pred_kflod.to_csv("./results/{}/DL/{}/RNN/train//kf_pred_all.csv".format(data, seed), index=0)
    print(r2_score(pred_kflod.iloc[:,1], pred_kflod.iloc[:,0]))

data_train_kf("IGC50", hidden_size=200, n_layers=2, n_epoch=100, batch_size=16, USE_GPU = False, set_cv=5)

In [None]:
# test
def data_train_test(data, hidden_size=200, n_layers=2, n_epoch=100, batch_size=16, USE_GPU = False):
    """
    data: tox data name
    """
    
    def createTensor(tensor):
        if USE_GPU:
            device = torch.device("cuda:0")
            tensor = tensor.to(device)
        return tensor

    class RNNRegress(torch.nn.Module):
        def __init__(self, input_size, hidden_size, output_size, num_layers = 1, bidirectional = True):
            super(RNNRegress, self).__init__()
            self.hidden_size = hidden_size
            self.num_layers = num_layers
            self.num_directions = 2 if bidirectional else 1

            self.embedding = torch.nn.Embedding(input_size, hidden_size)

            self.gru = torch.nn.GRU(hidden_size, hidden_size, num_layers, bidirectional = bidirectional)

            self.fc = torch.nn.Linear(hidden_size * self.num_directions, output_size)

        def initHidden(self, batch_size):
            hidden = torch.zeros(self.num_layers * self.num_directions, batch_size, self.hidden_size)
            return createTensor(hidden)

        def forward(self, input, seq_lengths):
            input = input.t()
            batch_size = input.size(1)
            hidden = self.initHidden(batch_size)

            embedding = self.embedding(input)
            gru_input = pack_padded_sequence(embedding, seq_lengths)
            output, hidden = self.gru(gru_input, hidden)
            if self.num_directions == 2:
                hidden_cat = torch.cat([hidden[-1], hidden[-2]], dim = 1)
            else:
                hidden_cat = hidden[-1]

            fc_output = self.fc(hidden_cat)
            return fc_output

    # smiles to ASCIIlist
    def smiles_to_ASCIIlist(smiles_list):
        ASCIIlist = [ord(smiles) for smiles in smiles_list]
        return ASCIIlist

    def makeTensors(smiles_list, tox_list):
        smiles_sequences = [smiles_to_ASCIIlist(smiles) for smiles in smiles_list]
        smiles_seq_lens = torch.LongTensor([len(name_ASCII) for name_ASCII in smiles_sequences])

        smiles_tensor = torch.zeros(len(smiles_sequences), smiles_seq_lens.max()).long()
        for index, (smiles_sequence, smiles_seq_len) in enumerate(zip(smiles_sequences, smiles_seq_lens), 0):
            smiles_tensor[index, 0:smiles_seq_len] = torch.LongTensor(smiles_sequence)

        ordered_smiles_seq_lens, len_indexes = smiles_seq_lens.sort(dim = 0, descending = True)
        ordered_smiles_tensor = smiles_tensor[len_indexes]
        ordered_tox_list = tox_list[len_indexes]

        return createTensor(ordered_smiles_tensor), createTensor(ordered_smiles_seq_lens), createTensor(ordered_tox_list) 

    def train():
        loss = 0.0
        for batch_index, (smiles, tox) in enumerate(train_loader):

            inputs, seq_lens, targets = makeTensors(smiles, tox) 
            outputs = classifier_model(inputs, seq_lens)
            targets = targets.to(torch.float32).reshape(-1,1)

            loss = criterion(outputs, targets)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            loss += loss.item()

            if batch_index % 10 == 9:
                print('time_elapsed: {:.1f}m {:.1f}s, Epoch {}, '.format(timePassed(start_time)[0], timePassed(start_time)[1], epoch), end = '')
                print(f'[{(batch_index+1) * len(inputs)} / {len(training_set)}] ', end = '')
                print(f'loss = {loss / ((batch_index+1) * len(inputs))}')

        return loss/len(train_loader)

    def test():
        correct = 0
        total_samples = len(test_set)
        with torch.no_grad():
            pred = []
            obs = []
            for i, (smiles, tox) in enumerate(test_loader):
                inputs, seq_lens, targets = makeTensors(smiles, tox)
                outputs = classifier_model(inputs, seq_lens)
                pred.append(outputs.data)
                obs.append(targets.data.reshape(-1,1))
            pred = np.vstack(pred).ravel()
            obs = np.vstack(obs).ravel()
            r2 = r2_score(obs, pred)
            Mse = mse(obs, pred)
            print('r2 on test: %.3f %%' % (100 * r2), 'mse on test: %.5f \n' % (Mse))
        return r2, Mse, pred, obs

    def timePassed(start_time):
        time_passed = time.time() - start_time
        minute = math.floor(time_passed / 60)
        second = time_passed - minute * 60
        return [minute, second]

    seed = 1
    os.makedirs("./results/{}/DL/{}/RNN/test/".format(data, seed), exist_ok=True)
    os.makedirs("./results/{}/DL/{}/RNN/train/".format(data, seed), exist_ok=True)
    
    # data load
    train_mols = Chem.SDMolSupplier("./data/{}/{}_training.sdf".format(data, data))
    train_smiles = [Chem.MolToSmiles(mol) for mol in train_mols]
    train_smiles_len = [len(smiles) for smiles in train_smiles]
    train_tox = [float(mol.GetProp("Tox")) for mol in train_mols]
    training_set = [(train_smiles[i], train_tox[i]) for i in range(len(train_smiles))]

    test_mols = Chem.SDMolSupplier("./data/{}/{}_prediction.sdf".format(data, data))
    test_smiles = [Chem.MolToSmiles(mol) for mol in test_mols]
    test_smiles_len = [len(smiles) for smiles in test_smiles]
    test_tox = [float(mol.GetProp("Tox")) for mol in test_mols]
    test_set = [(test_smiles[i], test_tox[i]) for i in range(len(test_smiles))]

    n_chars = 0
    for smiles in train_smiles:
        for char in smiles:
            if n_chars < ord(char):
                n_chars = ord(char)
    for smiles in test_smiles:
        for char in smiles:
            if n_chars < ord(char):
                n_chars = ord(char)
    n_chars = n_chars+1
    output_size = 1
    
    train_loader = DataLoader(dataset = training_set, batch_size = batch_size, shuffle = True)
    test_loader = DataLoader(dataset = test_set, batch_size = batch_size, shuffle = False)
    
    classifier_model = RNNRegress(n_chars, hidden_size, output_size, n_layers) # input_size, hidden_size, output_size, num_layers
    if USE_GPU:
        device = torch.device("cuda:0")
        classifier_model.to(device)
    
    criterion = torch.nn.MSELoss()
    optimizer = torch.optim.Adam(classifier_model.parameters(), lr = 0.001, weight_decay=10 ** (-5.0))
    
    # training and test
    start_time = time.time()
    print("The num of total training epochs is %d. " % n_epoch)
    r2_list = []
    Mse_list = []
    best_r2 = 0
    train_loss = []
    for epoch in range(n_epoch):
        train_loss.append(train())
        r2, Mse, pred, obs = test()
        r2_list.append(r2)
        Mse_list.append(Mse)
        if best_r2 < r2:
            best_r2 = r2
            pd.DataFrame([pred, obs], index=["pred", "true"]).T.to_csv('./results/{}/DL/{}/RNN/test/test_pred_all.csv'.format(data, seed), index=0)
    plt.plot(np.arange(1, n_epoch+1), r2_list)
    plt.plot(np.arange(1, n_epoch+1), Mse_list)
    plt.xlabel("epoch")
    plt.ylabel("r2 & MSE")
    plt.grid()
    plt.show()
    
    pd.DataFrame(r2_list).to_csv("./results/{}/DL/{}/RNN/test/scores_r2.csv".format(data, seed), index=0)
    pd.DataFrame(Mse_list).to_csv("./results/{}/DL/{}/RNN/test/scores_mse.csv".format(data, seed), index=0)
    pd.DataFrame(train_loss).to_csv("./results/{}/DL/{}/RNN/test/train_loss.csv".format(data, seed), index=0)
    
    print(r2_list.index(max(r2_list)))
    print(max(r2_list))
    
data_train_test("IGC50", hidden_size=200, n_layers=2, n_epoch=100, batch_size=16, USE_GPU = False)