In [None]:
from rdkit import Chem
import pandas as pd

In [None]:
# load dataset
path = "/home/jovyan/PharmaBench/data/final_datasets/bbb_cls_final_data.csv"
# path = "/home/jovyan/PharmaBench/data/final_datasets/logd_reg_final_data.csv"
data = pd.read_csv(path)

scaffold_training = data[data['scaffold_train_test_label'] == 'train']
scaffold_test = data[data['scaffold_train_test_label'] == 'test']

random_training = data[data['random_train_test_label'] == 'train']
random_test = data[data['scaffold_train_test_label'] == 'test']

In [None]:
Chem.MolFromSmiles(scaffold_training['Smiles_unify'][0])

In [None]:
for i in range(num_atoms):
    for j in range(num_atoms):
        if i != j:
            distances[i, j] = np.linalg.norm(positions[i] - positions[j])

In [None]:
scaffold_training = scaffold_training.reset_index()
scaffold_test = scaffold_test.reset_index()

In [None]:
print(len(scaffold_training['Smiles_unify']))
max_shape = 0 
num = 0
smi_list = []
for i in range(len(scaffold_training['Smiles_unify'])):
    shape = len(scaffold_training['Smiles_unify'][i])
    smi_list.append(scaffold_training['Smiles_unify'][i])
    num += 1
print(f"number of the valuable data is {num}.")

In [None]:
print(len(scaffold_test['Smiles_unify']))
max_shape = 0 
num = 0
smi_list = []
for i in range(len(scaffold_test['Smiles_unify'])):
    shape = len(scaffold_test['Smiles_unify'][i])
    smi_list.append(scaffold_test['Smiles_unify'][i])
    num += 1
print(f"number of the valuable data is {num}.")

In [None]:
import os
import numpy as np
import pandas as pd
import lmdb
from rdkit import Chem
from rdkit.Chem import AllChem
from tqdm import tqdm
import pickle
import glob
from multiprocessing import Pool
from collections import defaultdict
def smi2_2Dcoords(smi):
    mol = Chem.MolFromSmiles(smi)
    mol = AllChem.AddHs(mol)
    AllChem.Compute2DCoords(mol)
    coordinates = mol.GetConformer().GetPositions().astype(np.float32)
    len(mol.GetAtoms()) == len(coordinates), "2D coordinates shape is not align with {}".format(smi)
    return coordinates


def smi2_3Dcoords(smi,cnt):
    mol = Chem.MolFromSmiles(smi)
    mol = AllChem.AddHs(mol)
    coordinate_list=[]
    for seed in range(cnt):
        try:
            res = AllChem.EmbedMolecule(mol, randomSeed=seed)  # will random generate conformer with seed equal to -1. else fixed random seed.
            if res == 0:
                try:
                    AllChem.MMFFOptimizeMolecule(mol)       # some conformer can not use MMFF optimize
                    coordinates = mol.GetConformer().GetPositions()
                except:
                    print("Failed to generate 3D, replace with 2D")
                    coordinates = smi2_2Dcoords(smi)            
                    
            elif res == -1:
                mol_tmp = Chem.MolFromSmiles(smi)
                AllChem.EmbedMolecule(mol_tmp, maxAttempts=5000, randomSeed=seed)
                mol_tmp = AllChem.AddHs(mol_tmp, addCoords=True)
                try:
                    AllChem.MMFFOptimizeMolecule(mol_tmp)       # some conformer can not use MMFF optimize
                    coordinates = mol_tmp.GetConformer().GetPositions()
                except:
                    print("Failed to generate 3D, replace with 2D")
                    coordinates = smi2_2Dcoords(smi) 
        except:
            print("Failed to generate 3D, replace with 2D")
            coordinates = smi2_2Dcoords(smi) 

        assert len(mol.GetAtoms()) == len(coordinates), "3D coordinates shape is not align with {}".format(smi)
        coordinate_list.append(coordinates.astype(np.float32))
    return coordinate_list


def inner_smi2coords(content):
    smi = content
    cnt = 10 # conformer num,all==11, 10 3d + 1 2d

    mol = Chem.MolFromSmiles(smi)
    if len(mol.GetAtoms()) > 400:
        coordinate_list =  [smi2_2Dcoords(smi)] * (cnt+1)
        print("atom num >400,use 2D coords",smi)
    else:
        coordinate_list = smi2_3Dcoords(smi,cnt)
        # add 2d conf
        coordinate_list.append(smi2_2Dcoords(smi).astype(np.float32))
    mol = AllChem.AddHs(mol)
    atoms = [atom.GetSymbol() for atom in mol.GetAtoms()]  # after add H 
    return pickle.dumps({'atoms': atoms, 'coordinates': coordinate_list, 'smi': smi }, protocol=-1)


def smi2coords(content):
    try:
        return inner_smi2coords(content)
    except:
        print("failed smiles: {}".format(content[0]))
        return None


def write_lmdb(smiles_list, job_name, seed=42, outpath='./results', nthreads=8):
    os.makedirs(outpath, exist_ok=True)
    output_name = os.path.join(outpath,'{}.lmdb'.format(job_name))
    try:
        os.remove(output_name)
    except:
        pass
    env_new = lmdb.open(
        output_name,
        subdir=False,
        readonly=False,
        lock=False,
        readahead=False,
        meminit=False,
        max_readers=1,
        map_size=int(100e9),
    )
    txn_write = env_new.begin(write=True)
    with Pool(nthreads) as pool:
        i = 0
        for inner_output in tqdm(pool.imap(smi2coords, smiles_list)):
            if inner_output is not None:
                txn_write.put(f'{i}'.encode("ascii"), inner_output)
                i += 1
        print('{} process {} lines'.format(job_name, i))
        txn_write.commit()
        env_new.close()

seed = 42
# job_name = 'get_mol_repr'   # replace to your custom name
# job_name = 'get_mol_repr_test'   # replace to your custom name
job_name = 'bbb_test'
data_path = './results'  # replace to your data path
weight_path='../ckp/mol_pre_no_h_220816.pt'  # replace to your ckpt path
only_polar=0  # no h
dict_name='dict.txt'
batch_size=16
conf_size=11  # default 10 3d + 1 2d
results_path=data_path   # replace to your save path
write_lmdb(smi_list, job_name=job_name, seed=seed, outpath=data_path)

In [2]:
import torch
coor = torch.randn([11, 80, 3], dtype=torch.float32)
con, num, loc = coor.shape
# distance = torch.zeros([11, 80, 80, 3], dtype=torch.float32)


# for n_cur in range(num):
#     atom_cur = coor[:, n_cur:n_cur+1, :].repeat(1, num, 1)
#     distance_cur = atom_cur-coor
#     distance[:, n_cur, :, :] = distance_cur
# print(distance[0, 0, :, 0])
# del distance

# 计算距离
atom_expanded = coor.unsqueeze(2)  # shape (11, 80, 1, 3)
coor_expanded = coor.unsqueeze(1)   # shape (11, 1, 80, 3)
# distance = atom_expanded - coor_expanded
distance = torch.sqrt((atom_expanded - coor_expanded).pow(2).sum(dim=-1))
print(distance.shape)
                

torch.Size([11, 80, 80])


In [4]:
from rdkit import Chem
from rdkit.Chem import AllChem
import numpy as np
 
# 加载分子
mol = Chem.MolFromSmiles('CCC')
 
# 计算2D坐标
AllChem.Compute2DCoords(mol)
 
# 获取最短路径矩阵 SPD
distance_matrix = AllChem.GetDistanceMatrix(mol) 
 
# 获取边矩阵 Edge
adjacency_matrix = np.array(AllChem.GetAdjacencyMatrix(mol)) # 可以进一步获得度 Degree
adj = np.eye(3, 3)
adjacency_matrix = adj + adjacency_matrix 
 
# 打印结果
print("最短路径矩阵:\n", distance_matrix)
print("边矩阵:\n", adjacency_matrix)

最短路径矩阵:
 [[0. 1. 2.]
 [1. 0. 1.]
 [2. 1. 0.]]
边矩阵:
 [[1. 1. 0.]
 [1. 1. 1.]
 [0. 1. 1.]]
