- Rdkit : 분자를 Graph 형태로 만들고 분자의 logP 특성을 불러올 수 있는 Library

In [1]:
#!pip install rdkit

- 분자를 텍스트 형태로 표현한 smiles와 molecular graph로 표현하기 위한 vocab.npy 파일.

In [2]:
!curl -o ZINC.smiles https://raw.githubusercontent.com/heartcored98/Standalone-DeepLearning/master/Lec9/ZINC.smiles
!curl -o vocab.npy https://raw.githubusercontent.com/heartcored98/Standalone-DeepLearning/master/Lec9/vocab.npy

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 5374k  100 5374k    0     0  5394k      0 --:--:-- --:--:-- --:--:-- 5418k
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   256  100   256    0     0    504      0 --:--:-- --:--:-- --:--:--   509


# Import Library

In [3]:
import argparse
import sys
import time
import copy

import numpy as np

from rdkit import Chem, DataStructs
from rdkit.Chem import AllChem
from rdkit.Chem.Crippen import MolLogP

from sklearn.metrics import mean_absolute_error

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.autograd import Variable

from tqdm import tnrange, tqdm_notebook
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

In [4]:
paser = argparse.ArgumentParser()
# Set parser
args = paser.parse_args("")
args.seed = 123
args.val_size = 0.1
args.test_size = 0.1
args.shuffle = True
# Set Parser Parameter

In [5]:
np.random.seed(args.seed)
torch.manual_seed(args.seed)

if torch.cuda.is_available():
    torch.set_default_tensor_type('torch.cuda.FloatTensor')
else:
    torch.set_default_tensor_type('torch.FloatTensor')

# Pre-processing

In [6]:
def read_ZINC_smiles(file_name, num_mol):
    f = open(file_name, 'r')
    contents = f.readlines()
    
    smi_list = []
    logP_list = []
    for i in tqdm_notebook(range(num_mol), desc = 'Reading Data'):
        smi = contents[i].strip()
        # Read One line from contents
        m=Chem.MolFromSmiles(smi)
        smi_list.append(smi)
        logP_list.append(MolLogP(m))
        
    logP_list=np.asarray(logP_list).astype(float)
    
    # 1. smi_list : read string
    # 2. logP_list : MolFromSmiles and MolLogP
    return smi_list, logP_list

def smiles_to_onehot(smi_list):
    def smiles_to_vector(smiles, vocab, max_length):
        while len(smiles) < max_length:
            smiles +=" "
        vector = [vocab.index(str(x)) for x in smiles]
        one_hot = np.zeros((len(vocab), max_length),dtype=int)
        for i,elm in enumerate(vector):
            one_hot[elm][i] = 1
        return one_hot
    
    vocab = np.load('./vocab.npy')
    smi_total=[]
    
    for i,smi in tqdm_notebook(enumerate(smi_list), desc='Converting to One Hot'):
        smi_onehot = smiles_to_vector(smi, list(vocab),120)
        smi_total.append(smi_onehot)
    
    return np.asarray(smi_total)

def convert_to_graph(smiles_list):
    adj=[]
    adj_norm=[]
    features=[]
    maxNumAtoms=50
    
    for i in tqdm_notebook(smiles_list, desc='Converting to Graph'):
        # Mol
        iMol = Chem.MolFromSmiles(i.strip())
        # Adj
        iAdjTmp = Chem.rdmolops.GetAdjacencyMatrix(iMol)
        # Feature
        if( iAdjTmp.shape[0] <= maxNumAtoms):
            #Feature-preprocessing
            iFeature = np.zeros((maxNumAtoms,58))
            iFeatureTmp = []
            for atom in iMol.GetAtoms():
                #atom_result = atom_feature(atom)
                iFeatureTmp.append(atom_feature(atom)) #Atom Feature only
                #print(atom," is ",atom_result)
                #iFeatureTmp.append(atom_result) #Atom Feature only
                
            iFeature[0:len(iFeatureTmp), 0:58] = iFeatureTmp
            # Apply 0 padding for feature set.
            
            features.append(iFeature)
            
            # Adj- preprocessing
            iAdj = np.zeros((maxNumAtoms, maxNumAtoms))
            iAdj[0:len(iFeatureTmp), 0:len(iFeatureTmp)] = iAdjTmp + np.eye(len(iFeatureTmp))
            adj.append(np.asarray(iAdj))
    features = np.asarray(features)
    
    return features, adj
    
def atom_feature(atom):
    return np.array(one_of_k_encoding_unk(atom.GetSymbol(),
                                      ['C', 'N', 'O', 'S', 'F', 'H', 'Si', 'P', 'Cl', 'Br',
                                       'Li', 'Na', 'K', 'Mg', 'Ca', 'Fe', 'As', 'Al', 'I', 'B',
                                       'V', 'Tl', 'Sb', 'Sn', 'Ag', 'Pd', 'Co', 'Se', 'Ti', 'Zn',
                                       'Ge', 'Cu', 'Au', 'Ni', 'Cd', 'Mn', 'Cr', 'Pt', 'Hg', 'Pb']) +
                    one_of_k_encoding(atom.GetDegree(), [0, 1, 2, 3, 4, 5]) +
                    one_of_k_encoding_unk(atom.GetTotalNumHs(), [0, 1, 2, 3, 4]) +
                    one_of_k_encoding_unk(atom.GetImplicitValence(), [0, 1, 2, 3, 4, 5]) +
                    [atom.GetIsAromatic()])    # (40, 6, 5, 6, 1)
# one_of_k_encoding:
# 
# included value only

# one of k encoding unk:
# 
# excluded value is also.

def one_of_k_encoding(x,allowable_set):
    if x not in allowable_set:
        raise Exception("input {0} not in allowable set{1}:".format(x,allowable_set))
    return list(map(lambda s: x==s, allowable_set))

def one_of_k_encoding_unk(x,allowable_set):
    if x not in allowable_set:
        x=allowable_set[-1]
    return list(map(lambda s:x==s,allowable_set))



In [7]:
list_smi, list_logP = read_ZINC_smiles('ZINC.smiles',10000)
list_feature, list_adj = convert_to_graph(list_smi)

Please use `tqdm.notebook.tqdm` instead of `tqdm.tqdm_notebook`
  for i in tqdm_notebook(range(num_mol), desc = 'Reading Data'):


Reading Data:   0%|          | 0/10000 [00:00<?, ?it/s]

Please use `tqdm.notebook.tqdm` instead of `tqdm.tqdm_notebook`
  for i in tqdm_notebook(smiles_list, desc='Converting to Graph'):


Converting to Graph:   0%|          | 0/10000 [00:00<?, ?it/s]

> Until Here, We could get

> - Adjecency Matrix : list_adj
> - Node Feature Matrix : list_feature
> - logP Value Matrix : list_logP

- The Difference of Graph Neural Network is the Calculation that we extract information from the Node Feature Matrix and Adjacency Matrix and predict logP Value.
- but In CNN and RNN we did this with only One Matrix.

In [19]:
list_feature[0][0]

array([0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 1., 0., 0.,
       0., 0., 1., 0., 0., 0., 0.])

In [29]:
class GCNDataset(Dataset):
    def __init__(self,list_feature,list_adj, list_logP):
        self.list_feature = list_feature
        self.list_adj = list_adj
        self.list_logP = list_logP
        
    def __len__(self):
        return len(self.list_feature)
    
    def __getitem__(self, index):
        return self.list_feature[index], self.list_adj[index], self.list_logP[index]
    
def partition(list_feature, list_adj, list_logP, args):
    num_total = list_feature.shape[0] # length
    num_train = int(num_total * (1-args.test_size - args.val_size))
    num_val = int(num_total * args.val_size)
    num_test = int(num_total*args.test_size)
    
    ##
    feature_train = list_feature[:num_train]
    adj_train = list_adj[:num_train]
    logP_train = list_logP[:num_train]
    
    feature_val = list_feature[num_train:num_train+num_val]
    adj_val = list_adj[num_train:num_train+num_val]
    logP_val = list_logP[num_train:num_train+num_val]
    
    feature_test = list_feature[:num_train]
    adj_test = list_adj[:num_train]
    logP_test = list_logP[:num_train]
    ##
    
    train_set = GCNDataset(feature_train, adj_train, logP_train)
    val_set = GCNDataset(feature_val, adj_val, logP_val)
    test_set = GCNDataset(feature_test, adj_test, logP_test)
    
    partition={
        'train':train_set,
        'val':val_set,
        'test':test_set
    }
    
    return partition

In [30]:
dict_partition = partition(list_feature, list_adj,list_logP,args)

# Model Construction

- **GCNLayer** : node feature matrix와 adjacency matrix의 list를 받아 Graph Convolution 연산을 수행하는 Module이다.
- (Gate)SkipConnection : ResNet에서 사용되었던 skip connection technique을 구현한 Module이다.
- GCNBlock : node feature matrix와 adjacency matrix의 List를받아 원하는 갯수의 GCNLayer를 통과시킨 후, (gated) skip connection을 적용하는 모듈임.
- ReadOut : Graph structure에 permutation invariance를 주기 위하여 linear layer를 거친 뒤 batch별로 summation하는 모듈이다.
- Predictor : Readout Layer로부터, Graph feature vector로부터 logP value를 예측하기 위한 linear layer module이다.

In [31]:
class GCNLayer(nn.Module):
    def __init__(self, in_dim, out_dim, n_atom, act=None, bn=False):
        super(GCNLayer, self).__init__()
        
        self.use_bn = bn
        self.linear = nn.Linear(in_dim, out_dim)
        nn.init.xavier_uniform_(self.linear.weight)
        self.bn=nn.BatchNorm1d(n_atom)
        self.activation = act
        
    def forward(self, x, adj):
        out = self.linear(x)
        out = torch.matmul(adj, out)
        if self.use_bn:
            out = self.bn(out)
        if self.activation != None:
            out = self.activation(out)
        return out, adj

In [32]:
class SkipConnection(nn.Module):
    def __init__(self, in_dim, out_dim):
        super(SkipConnection, self).__init__()
        
        self.in_dim = in_dim
        self.out_dim = out_dim
        
        self.linear = nn.Linear(in_dim, out_dim, bias=False)
        
    def forward(self, in_x, out_x):
        if(self.in_dim != self.out_dim):
            in_x = self.linear(in_x)
        out= in_x + out_x
        return out

In [33]:
class GateSkipConnection(nn.Module):
    def __init__(self, in_dim, out_dim):
        super(GateSkipConnection, self).__init__()
        
        self.in_dim = in_dim
        self.out_dim = out_dim
        
        self.linear = nn.Linear(in_dim, out_dim, bias=False)
        self.linear_coef_in = nn.Linear(out_dim, out_dim)
        self.linear_coef_out = nn.Linear(out_dim, out_dim)
        self.sigmoid = nn.Sigmoid()
        
    def forward(self, in_x, out_x):
        if(self.in_dim != self.out_dim):
            in_x = self.linear(in_x)
        z = self.gate_coefficient(in_x,out_x)
        out = torch.mul(z, out_x) + torch.mul(1.0-z, in_x)
        return out
    
    def gate_coefficient(self, in_x, out_x):
        x1 = self.linear_coef_in(in_x)
        x2 = self.linear_coef_out(out_x)
        return self.sigmoid(x1+x2)
    

In [35]:
class GCNBlock(nn.Module):
    def __init__(self, n_layer, in_dim, hidden_dim, out_dim,n_atom, bn=True, sc='gsc'):
        super(GCNBlock, self).__init__()
        
        self.layers = nn.ModuleList()
        
        for i in range(n_layer):
            self.layers.append(GCNLayer(in_dim if i==0 else hidden_dim,
                                       out_dim if i==n_layer-1 else hidden_dim,
                                       n_atom,
                                       nn.ReLU() if i!=n_layer-1 else None,
                                       bn))
        self.relu = nn.ReLU()
        if sc=='gsc':
            self.sc = GatedSkipConnection(in_dim, out_dim)
        elif sc=='sc':
            self.sc = SKipConnection(in_dim, out_dim)
        elif sc=='no':
            self.sc=None
        else:
            assert False, "Wrong sc type."
    def forward(self, x, adj):
        residual = x
        for i, layer in enumerate(self.layers):
            out, adj = layer((x if i==0 else out), adj)
        if self.sc != None:
            out=self.sc(residual, out)
        out = self.relu(out)
        return out, adj

In [36]:
class ReadOut(nn.Module):
    def __init__(self, in_dim, out_dim, act=None):
        super(ReadOut, self).__init__()
        
        self.in_dim = in_dim
        self.out_dim = out_dim
        
        self.linear=nn.Linear(self.in_dim,
                             self.out_dim)
        nn.init.xavier_uniform_(self.linear.weight)
        self.activation = act
    def forward(self, x):
        out = self.linear(x)
        out = torch.sum(out, 1)
        if self.activation != None:
            out = self.activation(out)
        return out

In [37]:
class Predictor(nn.Module):
    def __init__(self, in_dim, out_dim, act=None):
        super(Predictor, self).__init__()
        
        self.in_dim = in_dim
        self.out_dim = out_dim
        
        self.linear = nn.Linear(self.in_dim,
                               self.out_dim)
        nn.init.xavier_uniform_(self.linear.weight)
        self.activation=act
    
    def forward(self,x):
        out = self.linear(x)
        if self.activation != None:
            out = self.activation(out)
        return out

In [43]:
class GCNNet(nn.Module):
    def __init__(self, args):
        super(GCNNet, self).__init__()
        
        self.blocks = nn.ModuleList()
        for i in range(args.n_block):
            self.blocks.append(GCNBlock(args.n_layer,
                                       args.in_dim if i==0 else args.hidden_dim,
                                       args.hidden_dim,
                                       args.hidden_dim,
                                       args.n_atom,
                                       args.bn,
                                       args.sc)
                              )
        self.readout = ReadOut(args.hidden_dim,
                              args.pred_dim1,
                              act=nn.ReLU())
        self.pred1=Predictor(args.pred_dim1,
                            args.pred_dim2,
                            act=nn.ReLU())
        self.pred1=Predictor(args.pred_dim2,
                            args.pred_dim3,
                            act=nn.ReLU())
        self.pred1=Predictor(args.pred_dim3,
                            args.out_dim)
    def forward(self, x, adj):
        for i, block in enumerate(self.blocks):
            out, adj = block((x if i==0 else out), adj)
        out = self.readout(out)
        out = self.pred1(out)
        out = self.pred2(out)
        out = self.pred3(out)
        return out

# Train, Validate, Test and Experiment

In [44]:
def train(net, partition, optimizer, criterion, args):
    trainloader = torch.utils.data.DataLoader(partition['train'],
                                             batch_size=args.train_batch_size,
                                             shuffle=True, num_workers=2)
    net.train()
    
    train_loss=0.0
    
    for i, data in enumerate(trainloader, 0):
        optimizer.zero_grad() # 매 Epoch마다. zero_grad()가 실행되는 것을 매 iteration마다 실행되도록
        
        # get the inputs
        list_feature, list_adj, list_logP=data
        
        list_feature = list_feature.cuda().float()
        list_adj = list_adj.cuda().float()
        list_logP = list_logP.cuda().float().view(-1,1)
        outputs = net(list_feature, list_adj)
        
        loss = criterion(outputs, list_logP)
        train_loss +=loss.item()
        loss.backward()
        optimizer.step() # for the next step
        
    train_loss = train_loss / len(trainloader)
    return net, train_loss

In [45]:
def validate(net, partition, criterion, args):
    valloader = torch.utils.data.DataLoader(partition['val'],
                                           batch_size= args.test_batch_size,
                                           shuffle=False, num_workers=2)
    net.eval()
    val_loss =0
    with torch.no_grad():
        for data in valloader:
            list_feature, list_adj, list_logP = data
            
            list_feature = list_feature.cuda().float()
            list_adj = list_adj.cuda().float()
            list_logP = list_logP.cuda().float().view(-1,1)
            
            outputs = net(list_feature, list_adj)
            
            loss = criterion(outputs,labels)
            val_loss+=loss.item()
            
        val_loss = val_loss / len(valloader)
    return val_loss

In [46]:
def test(net, partition, args):
    testloader = torch.utils.data.DataLoader(partition['test'],
                                            batch_size=args.test_batch_size,
                                            shuffle=False, num_workers=2)
    net.eval()
    with torch.no_grad():
        logP_total = list()
        pred_logP_total = list()
        for data in testloader:
            list_feature, list_adj, list_logP = data
            
            list_feature = list_feature.cuda().float()
            list_adj = list_adj.cuda().float()
            list_logP = list_logP.cuda().float().view(-1,1)
            
            logP_total += list_logP.tolist()
            list_logP = list_logP.view(-1,1)
            
            outputs = net(list_feature, list_adj)
            pred_logP_total += outputs.view(-1).tolist()
            
        mae = mean_absolute_error(logP_total,pred_logP_total)
        std = np.std(np.array(logP_total)-np.array(pred_logP_total))
        
    return mae, std, logP_total, pred_logP_total

In [47]:
def experiment(partition, args):
    net = GCNNet()
    net.cuda()
    
    criterion = nn.MSELoss()
    
    if args.optim == 'SGD':
        optimizer = optim.SGD(net.parameters(), lr = args.lr, weight_decay=args.l2)
    elif args.optim == 'RMSprop':
        optimizer = optim.RMSprop(net.parameters(), lr = args.lr, weight_decay=args.l2)
    elif args.optim == 'Adam':
        optimizer = optim.Adam(net.parameters(), lr=args.lr, weight_decay=args.l2)
    else:
        raise ValueError('In-valid optimizer choice')
        
    train_losses=[]
    val_losses=[]
    
    for epoch in range(args.epoch):
        ts = time.time()
        net, train_loss = train(net, partition, optimizer, criterion, args)
        val_loss = validate(net, partition, criterion, args)
        te = time.time()
        
        train_losses.append(train_loss)
        val_losses.append(val_loss)
        
        print('Epoch {}, Acc(train/val): {:2.2f}/{:2.2f}, Loss(train/val) {:2.2f}/{:2.2f}. Took {:2.2f} sec'.format(epoch, train_acc, val_acc, train_loss, val_loss, te-ts))
        
    mae, std, logP_total, pred_logP_total = test(net, partition, args)
    
    result = {}
    result['train_losses'] = train_losses
    result['val_losses'] = val_losses
    result['mae']= mae
    result['std']=std
    result['logP_total']=logP_total
    result['pred_logP_total'] = pred_logP_total
    
    return vars(args), result