# Preprocessing

In [6]:
import pandas as pd

import os
import ast
import numpy as np

from typing import List, Union


from rdkit import Chem
import torch
from torch_geometric.data import Data, Dataset
from tqdm import tqdm
from joblib import Parallel, delayed

from utils.prepare import MoleculeDataset, convert_string_to_list, save_dataset, load_dataset

In [7]:
data = pd.read_csv('../data/QM_100.csv', converters={'CDD': convert_string_to_list})

In [8]:
from skipatom import SkipAtomInducedModel

skipatom_model = SkipAtomInducedModel.load(
    "../skipatom/data/mp_2020_10_09.dim30.model", 
    "../skipatom/data/mp_2020_10_09.training.data", 
    min_count=2e7, top_n=5
)

In [9]:
class FeaturizationParameters:
    def __init__(self):
        self.max_atomic_num = 100
        self.atom_features = {
            'atomic_num': list(range(self.max_atomic_num)),
            'degree': [0, 1, 2, 3, 4, 5],
            'formal_charge': [-1, -2, 1, 2, 0],
            'chiral_tag': [0, 1, 2, 3],
            'num_Hs': [0, 1, 2, 3, 4],
            'hybridization': [
                Chem.rdchem.HybridizationType.SP,
                Chem.rdchem.HybridizationType.SP2,
                Chem.rdchem.HybridizationType.SP3,
                Chem.rdchem.HybridizationType.SP3D,
                Chem.rdchem.HybridizationType.SP3D2
            ],
        }
        self.atom_fdim = sum(len(choices) + 1 for choices in self.atom_features.values()) + 2

def onek_encoding_unk(value, choices):
    encoding = [0] * (len(choices) + 1)
    index = choices.index(value) if value in choices else -1
    encoding[index] = 1
    return encoding

def atom_features(atom, params):

    features = onek_encoding_unk(atom.GetAtomicNum() - 1, params.atom_features['atomic_num']) + \
               onek_encoding_unk(atom.GetTotalDegree(), params.atom_features['degree']) + \
               onek_encoding_unk(atom.GetFormalCharge(), params.atom_features['formal_charge']) + \
               onek_encoding_unk(int(atom.GetChiralTag()), params.atom_features['chiral_tag']) + \
               onek_encoding_unk(int(atom.GetTotalNumHs()), params.atom_features['num_Hs']) + \
               onek_encoding_unk(int(atom.GetHybridization()), params.atom_features['hybridization']) + \
               [1 if atom.GetIsAromatic() else 0] + \
               [atom.GetMass() * 0.01]  # scaled to about the same range as other features
    return features

PARAMS = {
    'BOND_FDIM': 10
}

def bond_features(bond: Chem.rdchem.Bond, skipatom_model) -> List[Union[bool, int, float, np.ndarray]]:
    if bond is None:
        fbond = [1] + [0] * (PARAMS['BOND_FDIM'] - 1)
    else:
        bt = bond.GetBondType()
        fbond = [
            0,  # bond is not None
            bt == Chem.rdchem.BondType.SINGLE,
            bt == Chem.rdchem.BondType.DOUBLE,
            bt == Chem.rdchem.BondType.TRIPLE,
            bt == Chem.rdchem.BondType.AROMATIC,
            bond.GetIsConjugated() if bt is not None else 0,
            bond.IsInRing() if bt is not None else 0
        ]
        fbond += onek_encoding_unk(int(bond.GetStereo()), list(range(6)))

    
    # Добавили к началу и концу векторов связей 
    start_atom_vector = skipatom_model.vectors[skipatom_model.dictionary[bond.GetBeginAtom().GetSymbol()]]
    end_atom_vector = skipatom_model.vectors[skipatom_model.dictionary[bond.GetEndAtom().GetSymbol()]]

    fbond += list(start_atom_vector) + list(end_atom_vector)
    return fbond


class MoleculeData:
    def __init__(self, smiles, target, addHs=True, skipatom_model=None):
        self.smiles = smiles
        self.skipatom_model = skipatom_model
        self.target = torch.tensor(target, dtype=torch.float)
        self.mol = Chem.MolFromSmiles(smiles)
        if addHs:
            self.mol = Chem.AddHs(self.mol)
        self.params = FeaturizationParameters()
        self.edge_index, self.edge_attr = self.construct_graph()

    def construct_graph(self):
        edge_index = []
        edge_attr = []
        for bond in self.mol.GetBonds():
            start, end = bond.GetBeginAtomIdx(), bond.GetEndAtomIdx()
            edge_index.extend([[start, end], [end, start]])
            edge_attr.extend([bond_features(bond, self.skipatom_model), bond_features(bond, self.skipatom_model)])
        return torch.tensor(edge_index).t().contiguous(), torch.tensor(edge_attr, dtype=torch.float)

    def generate_atom_features(self):
        features = []
        for atom in self.mol.GetAtoms():
            features.append(atom_features(atom, self.params))
        return torch.tensor(features, dtype=torch.float)

class MoleculeDataset(Dataset):
    def __init__(self, dataframe, smiles_column='smiles', target_column='target', addHs=True, n_jobs=-1):
        super(MoleculeDataset, self).__init__()
        
        self.data_list = Parallel(n_jobs=n_jobs)(
            delayed(lambda row: MoleculeData(row[smiles_column], row[target_column], addHs))(
                row) for _, row in tqdm(dataframe.iterrows(), total=dataframe.shape[0]))

    def len(self): 
        return len(self.data_list)

    def get(self, idx):
        molecule_data = self.data_list[idx]
        x = molecule_data.generate_atom_features()
        edge_index = molecule_data.edge_index
        edge_attr = molecule_data.edge_attr
        y = molecule_data.target
        
        data = Data(x=x, edge_index=edge_index, edge_attr=edge_attr, y=y)
        data.smiles = molecule_data.smiles
        
        return data





def save_dataset(dataset, file_path):
    torch.save(dataset, file_path)
    print(f"Датасет успешно сохранен в {file_path}")



def load_dataset(file_path):
    dataset = torch.load(file_path)

    print(dataset)
    print(dataset[0])
    
    print(f"Shape of atom features (x): {dataset[0].x.shape}")
    print(f"Shape of edge index: {dataset[0].edge_index.shape}")
    print(f"Shape of edge attr: {dataset[0].edge_attr.shape}")
    print(f"Target value (y): {dataset[0].y}")
    print(f"Shape of target value: {dataset[0].y.shape}")
    print(f"Number of atoms in the molecule: {dataset[0].x.size(0)}")
    print(f"Number of bonds in the molecule: {dataset[0].edge_index.size(1) // 2}") 

    return dataset

def convert_string_to_list(string):
    try:
        return ast.literal_eval(string)
    except ValueError:
        return []


In [10]:
dataset = MoleculeDataset(data, smiles_column='smiles', target_column='CDD')

100%|██████████| 100/100 [00:04<00:00, 22.01it/s]


AttributeError: 'NoneType' object has no attribute 'vectors'

In [None]:
datapoint = dataset[0]
datapoint

Data(x=[31, 133], edge_index=[2, 64], edge_attr=[64, 14], y=[31], smiles='CNC(=S)N/N=C/c1c(O)ccc2ccccc12')

In [None]:
print(f"Shape of atom features (x): {datapoint.x.shape}")
print(f"Shape of edge index: {datapoint.edge_index.shape}")
print(f"Shape of edge attr: {datapoint.edge_attr.shape}")
print(f"Target value (y): {datapoint.y}")
print(f"Shape of target value: {datapoint.y.shape}")
print(f"Number of atoms in the molecule: {datapoint.x.size(0)}")
print(f"Number of bonds in the molecule: {datapoint.edge_index.size(1) // 2}")

Shape of atom features (x): torch.Size([31, 133])
Shape of edge index: torch.Size([2, 64])
Shape of edge attr: torch.Size([64, 14])
Target value (y): tensor([-0.0756, -0.1880,  0.1527, -0.9194, -0.1204, -0.2333, -0.0461, -0.1620,
         0.1055, -0.4826, -0.1695, -0.1810, -0.0448, -0.1405, -0.1686, -0.1502,
        -0.1488, -0.0190,  0.0474,  0.0435,  0.0400,  0.2173,  0.2133,  0.0341,
         0.1984,  0.0413,  0.0254,  0.0345,  0.0283,  0.0312,  0.0369])
Shape of target value: torch.Size([31])
Number of atoms in the molecule: 31
Number of bonds in the molecule: 32


In [None]:
#save_dataset(dataset, "../data/QM_100.pt")

Датасет успешно сохранен в ../data/QM_100.pt
