In [2]:
import csv
import os
from subprocess import run
from typing import Optional, Union, List, Any

import ray
from CGRtools.containers import MoleculeContainer
from CGRtools.exceptions import InvalidAromaticRing
from CGRtools.files import SDFRead, SMILESRead
from tqdm import tqdm

smiles_parser = SMILESRead.create_parser(ignore=True, remap=False)

In [None]:
def standardize_molecule(molecule: MoleculeContainer) -> Optional[MoleculeContainer]:
    """
    :param molecule:
    :return:
    """
    if len(molecule) > 150 or len(molecule) < 2:
        return None

    bad_elements = set(range(58, 72))
    bad_elements.update([10, 18, 36, 54])

    bad_element_present = False
    for _, atom in molecule.atoms():
        if atom.atomic_number in bad_elements or atom.atomic_number > 84:
            bad_element_present = True
            break

    if bad_element_present:
        return None

    conn_comp = molecule.connected_components_count
    if conn_comp > 1:
        return None

    if molecule.is_radical:
        return None

    try:
        molecule.kekule()
        molecule.standardize(fix_stereo=False)
        molecule.clean_isotopes()

        if molecule.check_valence():
            return None

        molecule.implicify_hydrogens(fix_stereo=False)
        molecule.remove_hydrogen_bonds(fix_stereo=False)
        molecule.clean_stereo()
        molecule.thiele()
        return molecule
    except InvalidAromaticRing:
        return None

In [None]:
@ray.remote
def convert_molecule(molecule: Union[MoleculeContainer, str]) -> Optional[MoleculeContainer]:
    if type(molecule) == str:
        molecule = smiles_parser(molecule)
    return standardize_molecule(molecule)

In [None]:
def process_batch(molecules: List[Union[MoleculeContainer, str]]) -> List[Optional[MoleculeContainer]]:
    molecules_ids = [ray.put(mol) for mol in molecules]
    del molecules
    results_ids = [convert_molecule.remote(mol_id) for mol_id in molecules_ids]
    results = ray.get(results_ids)
    return results

In [None]:
def standardize_pubchem_batch(molecules: List[MoleculeContainer], pubchem_ids: List[int],
                              std_writer: Any, mistakes_writer: Any, pubchem_meta: List[str]) -> None:
    filtered_molecules = process_batch(molecules)
    for pubchem_id, molecule in zip(pubchem_ids, filtered_molecules):
        if molecule:
            mol_line = [str(molecule)]
            for key in pubchem_meta:
                if key in molecule.meta.keys():
                    mol_line.append(molecule.meta[key])
                else:
                    mol_line.append('nan')
            mol_line.append(molecule.rings_count)
            mol_line.append(molecule.molecular_mass)
            mol_line.append(molecule.bonds_count)
            std_writer.writerow(mol_line)
        else:
            mistakes_writer.writerow([pubchem_id])
    del filtered_molecules

In [None]:
def standardize_sdf_pubchem(name: str, std_writer: Any, mistakes_writer: Any,
                            batch_size: int, pubchem_meta: List[str]) -> None:
    """
    Принимает в качестве параметров: название архива, путь к этому архиву, файл для записи валидных данных,
    файл для записи невалидных данных и количество обрабатываемых строк
    1) Расхархивизация файлов
    2) Считывание с файла (формата SDF) массива данных (построчно)
    3) Передает данный массив в функцию handle_writing
    """
    readed_molecules, readed_ids = [], []
    with SDFRead(name, indexable=True) as inp:
        inp.reset_index()
        for n, molecule in enumerate(inp):
            readed_molecules.append(molecule)
            readed_ids.append(int(molecule.meta['PUBCHEM_COMPOUND_CID']))
            if len(readed_molecules) == batch_size:
                assert len(readed_ids) == batch_size
                standardize_pubchem_batch(readed_molecules, readed_ids, std_writer, mistakes_writer, pubchem_meta)
                del readed_molecules
                readed_molecules = []
                del readed_ids
                readed_ids = []
        if readed_molecules:
            standardize_pubchem_batch(readed_molecules, readed_ids, std_writer, mistakes_writer, pubchem_meta)
            del readed_molecules
            del readed_ids
        os.remove(name)

In [None]:
def standardize_pubchem(input_files_path: str, output_file: str, mistake_file: str, batch_size: int) -> None:
    pubchem_meta = ['PUBCHEM_COMPOUND_CID', 'PUBCHEM_OPENEYE_CAN_SMILES', 'PUBCHEM_IUPAC_OPENEYE_NAME',
                    'PUBCHEM_CACTVS_ROTATABLE_BOND', 'PUBCHEM_XLOGP3_AA', 'PUBCHEM_CACTVS_HBOND_ACCEPTOR',
                    'PUBCHEM_CACTVS_HBOND_DONOR', 'PUBCHEM_HEAVY_ATOM_COUNT']
    pubchem_headers = ['pubchem_id', 'pubchem_smiles', 'iupac_name', 'rot_bonds',
                       'logp', 'h_acc', 'h_don', 'heavy_atoms']
    cgrtools_headers = ['ring_count', 'mol_mass', 'bonds_count']
    files = os.listdir(input_files_path)
    files.sort()
    with open(output_file, 'w', newline='\n') as standardized_out, open(mistake_file, 'w', newline='\n') as mistakes_out:
        std_mols_writer = csv.writer(standardized_out, delimiter=',')
        mistakes_writer = csv.writer(mistakes_out, delimiter=',')
        fields_names = ['smiles'] + pubchem_headers + cgrtools_headers
        std_mols_writer.writerow(fields_names)
        mistakes_writer.writerow(['pubchem_id'])
        for file_name in tqdm(files[217:]):
            if file_name[-2:] == 'gz':
                file = input_files_path + file_name
                name = file[:-3]
                run(["gunzip", "-k", file])
                standardize_sdf_pubchem(name, std_mols_writer, mistakes_writer, batch_size, pubchem_meta)

In [3]:
path = os.path.join(os.getcwd()[:-9], 'data/')
output_file_path = os.path.join(path, 'processed/valid/pubchem_test.csv')
mistakes_file_path = os.path.join(path, 'processed/mis/pubchem_test_mistakes.csv')
path = os.path.join(path, 'raw/' )
batch_size = 10000
standardize_pubchem(path, output_file_path, mistakes_file_path, batch_size)

/home/almaz/Documents/project/chem_dataset/chem_dataset/data/raw/
