# Molecular generation when the pharmacophore is given in coordinate

# load model

In [2]:
from model.pgmg import PGMG
from pathlib import Path
import os
from tqdm.auto import tqdm
import math
import random
import numpy as np
from collections import Counter
import torch
import pickle
import dgl
import einops
import pandas as pd
MODEL_PATH = "./output/chembl_test/rs_mapping_2/rs_mapping/fold0_epoch32.pth"
TOKENIZER_PATH = "./output/chembl_test/rs_mapping_2/rs_mapping/tokenizer_r_iso.pkl" 
OUTPUT_DIR = Path(f'./output/')
path='./phar_example/'

class CFG:
    seed = 42
    
    n_mols = -1  # number of molecules used to build pharmacophore models
    n_pp_per_mol = 1  # number of pharmacophores 
    n_repeat = 10
    
    reload_ignore=['pos_encoding']
    
    gen_batch_size = 256
    n_workers = 12

def init_logger(log_path=OUTPUT_DIR, logger_name='main'):
    from logging import getLogger, INFO, FileHandler,  Formatter,  StreamHandler
    logger = getLogger(logger_name)
    logger.setLevel(INFO)
    logger.handlers.clear()
    handler1 = StreamHandler()
    handler1.setFormatter(Formatter("%(message)s"))
    handler2 = FileHandler(filename=log_path / f'log')
    handler2.setFormatter(Formatter("%(message)s"))
    logger.addHandler(handler1)
    logger.addHandler(handler2)
    return logger
LOGGER = init_logger(OUTPUT_DIR)

def seed_torch(seed=42):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
seed_torch(seed=CFG.seed)    
    
with open(TOKENIZER_PATH,'rb') as f:
    tokenizer = pickle.load(f)

model_setting = {
    "max_len": 128,
    "pp_v_dim": 7 + 1,
    "pp_e_dim": 1,
    "pp_encoder_n_layer": 4,
    "hidden_dim": 384,
    "latent_heads": 8,  # 试试看
    "latent_dim": 384,
    "n_layers": 8,
    "ff_dim": 1024,
    "n_head": 8,
    "init_token_num": 8,
    "kernel_size": 3,  # used only in logging
    "non_expand": True,
    'in': 'rs',
    'out': 'rs',
}
model_path = MODEL_PATH
model_params = dict(model_setting)

model = PGMG(model_params, tokenizer)
model.to('cuda:0')
LOGGER.info(f'reloading model weights from {model_path}...')
states = torch.load(model_path, map_location='cuda:0')
states['model'].update({k:model.state_dict()[k] for k in model.state_dict().keys() 
                    if k.startswith(tuple(CFG.reload_ignore))})
LOGGER.info(model.load_state_dict(states['model'], strict=False))

reloading model weights from ./output/chembl_test/rs_mapping_2/rs_mapping/fold0_epoch32.pth...
<All keys matched successfully>


# preprocess .phar file

In [3]:

def _onek_encoding_unk(x, allowable_set):
    if x not in allowable_set:
        x = allowable_set[-1]
    return [x == s for s in allowable_set]

def _atom_features(atom):
    ELEM_LIST = [[1],[2],[3],[4],[5],[6],[7]]
    return (torch.HalfTensor(_onek_encoding_unk(atom, ELEM_LIST)))

def eucliDist(A,B):
    return math.sqrt(sum([(a - b)**2 for (a,b) in zip(A,B)]))

def mappingDist(eucli_Dist):
    mapping_dist=0.0018*(eucli_Dist**2)+0.9731*eucli_Dist-0.1179
    return mapping_dist


def sample_probability(elment_array,plist,N):
    Psample=[]
    n=len(plist)
    index=int(random.random()*n)
    mw=max(plist)
    beta=0.0
    for i in range(N):                  ##核心算法
        beta=beta+random.random()*2.0*mw
        while beta > plist[index]:
            beta=beta-plist[index]
            index=(index+1)%n
        Psample.append(elment_array[index])
    cresult=Counter(Psample)
    psam=[cresult[x] for x in plist]
    pe=[x*N for x in plist]
    return Psample    


def preprocess_phar(path):
    files= os.listdir(path)
    uni_phar=[]
    g_list=[]
    mol_phco_list=[]
    for i in (range(len(files))):#len(files)
        mol_phco=[]
        dist_index_u=[]
        dist_index_v=[]
        dist_mapping=[]
        file_name=files[i]
        f=open(path+file_name, encoding='gbk')
        data = f.readlines()
        for j in range(1,len(data)-1):
            line=(data)[j].split('\t')
            if line[0]!='EXCL':
                phar_type=[line[0],float(line[4]),float(line[1]),float(line[2]),float(line[3])]
                if line[0]=='AROM':  
                    phar_type[0]=1
                    num=[5,6]
                    num_p=[0.5,0.5]
                    num_=sample_probability(num,num_p,1)
                    if num_[0]==5:
                        phar_type[1]=5
                    if num_[0]==6:
                        phar_type[1]=6
                if line[0]=='LIPO':
                    num=['Hydrophobe','LumpedHydrophobe']
                    num_p=[0.5,0.5]
                    num_=sample_probability(num,num_p,1)
                    if num_[0]=='Hydrophobe':
                        phar_type[0]=2
                        phar_type[1]=3
                    if num_[0]=='LumpedHydrophobe':
                        phar_type[0]=6
                        phar_type[1]=6
                if line[0]=='POSC':
                    phar_type[0]=3
                    phar_type[1]=1
                if line[0]=='HACC':
                    phar_type[0]=4
                    phar_type[1]=1
                if line[0]=='HDON':
                    phar_type[0]=5##
                    phar_type[1]=1
                if line[0]=='NEGC':
                    phar_type[0]=7
                    phar_type[1]=1
                mol_phco.append(phar_type)
        type_list=[]
        size_=[]
        for elment in mol_phco:
            type_list.append(_atom_features([elment[0]]))
            size_.append(elment[1])
        for ii in range(len(mol_phco)):
            for jj in range(len(mol_phco)):
                if ii!=jj:
                    pos_i=mol_phco[ii][2:]
                    pos_j=mol_phco[jj][2:]
                    dist_ij=eucliDist(pos_i,pos_j)
                    dist_index_u.append(ii)
                    dist_index_v.append(jj)
                    map_dist=mappingDist(dist_ij)
                    dist_mapping.append(map_dist)

        u_list_tensor=torch.tensor(dist_index_u)
        v_list_tensor=torch.tensor(dist_index_v)
        g=dgl.graph((u_list_tensor,v_list_tensor))
        g.edata['dist']=torch.HalfTensor(dist_mapping)

        type_list_tensor=torch.stack(type_list)
        g.ndata['type']=type_list_tensor
        g.ndata['size']=torch.HalfTensor(size_)
        g_list.append(g)
    return g_list
    

g1=preprocess_phar(path)
gs=g1
for g in gs:
    a=g.ndata['type']
    g.ndata['h'] = torch.cat((a,g.ndata['size'].reshape(-1, 1)), dim=1).float()
    g.edata['h'] = g.edata['dist'].reshape(-1, 1).float()

# generation

In [4]:

gsx = gs*40
bgx = dgl.batch(gsx).to('cuda')

GEN_RESULT_DIR = OUTPUT_DIR
GEN_RESULT_DIR.mkdir(exist_ok=True)

LOGGER.info('[max sampling]')

LOGGER.info(f'resetting random seed...')
seed_torch(CFG.seed)

res = []
for _ in tqdm(range(5)):
    predictions = model.generate(bgx, random_sample=False)
    res.extend(tokenizer.get_text(predictions))

gen_result = einops.rearrange(np.array(res),'(a b)->a b',b=len(gs))
pd.DataFrame(gen_result).T.reset_index().to_csv(GEN_RESULT_DIR/f'result_ePharmaLib_example.csv',index=False)

del model
torch.cuda.empty_cache()

LOGGER.info('-----------------------------------------------')

[max sampling]
resetting random seed...


  0%|          | 0/5 [00:00<?, ?it/s]

-----------------------------------------------
