In [None]:
# %%capture --no-stderr
! pip install rdkit deepchem torch_geometric dgllife
! pip install -f https://download.pytorch.org/whl/cu118/torch_stable.html torch==2.2.1+cu118
! pip install  dgl -f https://data.dgl.ai/wheels/torch-2.2/cu121/repo.html

Collecting rdkit
  Downloading rdkit-2024.3.5-cp310-cp310-manylinux_2_28_x86_64.whl.metadata (3.9 kB)
Collecting deepchem
  Downloading deepchem-2.8.0-py3-none-any.whl.metadata (2.0 kB)
Collecting torch_geometric
  Downloading torch_geometric-2.6.1-py3-none-any.whl.metadata (63 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m63.1/63.1 kB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting dgllife
  Downloading dgllife-0.3.2-py3-none-any.whl.metadata (667 bytes)
Downloading rdkit-2024.3.5-cp310-cp310-manylinux_2_28_x86_64.whl (33.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m33.1/33.1 MB[0m [31m54.3 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading deepchem-2.8.0-py3-none-any.whl (1.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.0/1.0 MB[0m [31m49.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading torch_geometric-2.6.1-py3-none-any.whl (1.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 

In [None]:
import deepchem as dc

Instructions for updating:
experimental_relax_shapes is deprecated, use reduce_retracing instead
DGL backend not selected or invalid.  Assuming PyTorch for now.


Setting the default backend to "pytorch". You can change it in the ~/.dgl/config.json file or export the DGLBACKEND environment variable.  Valid options are: pytorch, mxnet, tensorflow (all lowercase)




In [None]:
# imports
from deepchem.feat.molecule_featurizers import MATFeaturizer, MolGraphConvFeaturizer, DMPNNFeaturizer
from deepchem.models.torch_models import GCNModel, MATModel, DMPNNModel
from deepchem.data import NumpyDataset, CSVLoader
import numpy
from sklearn.model_selection import train_test_split
from deepchem.metrics import mean_squared_error, Metric, accuracy_score
import pandas as pd
from deepchem.models.losses import SparseSoftmaxCrossEntropy
from rdkit.Chem import AllChem
from rdkit import Chem
import torch
import uuid
import os
import json

### Weighted Directed Graphs Featurization

In [None]:
def make_mol(s: str, keep_h: bool, add_h: bool):
    """
    Builds an RDKit molecule from a SMILES string.
    """
    if keep_h:
        mol = Chem.MolFromSmiles(s, sanitize = False)
        Chem.SanitizeMol(mol, sanitizeOps = Chem.SanitizeFlags.SANITIZE_ALL^Chem.SanitizeFlags.SANITIZE_ADJUSTHS)
    else:
        mol = Chem.MolFromSmiles(s)
    if add_h:
        mol = Chem.AddHs(mol)
    return mol


def make_polymer_mol(smiles: str, keep_h: bool, add_h: bool, fragment_weights: list):
    """
    Builds an RDKit molecule from a SMILES string.
    """

    # check input is correct, we need the same number of fragments and their weights
    num_frags = len(smiles.split('.'))
    if len(fragment_weights) != num_frags:
        raise ValueError(f'number of input monomers/fragments ({num_frags}) does not match number of '
                         f'input number of weights ({len(fragment_weights)})')

    # if it all looks good, we create one molecule object per fragment, add the weight as property
    # of each atom, and merge fragments into a single molecule object
    mols = []
    for s, w in zip(smiles.split('.'), fragment_weights):
        m = make_mol(s, keep_h, add_h)
        for a in m.GetAtoms():
            a.SetDoubleProp('w_frag', float(w)) # assinging metadata to store the weight fragment of the monomer
        mols.append(m)

    # combine all mols into single mol object
    mol = mols.pop(0)
    while len(mols) > 0:
        m2 = mols.pop(0)
        mol = Chem.CombineMols(mol, m2)

    return mol

In [None]:
from collections import Counter
import numpy as np
def parse_polymer_rules(rules):
    """
    converting probabilty weight distribution details from string to list of tuples to (start, end, weight_forward, weight_reverse)
    """
    polymer_info = []
    counter = Counter()  # used for validating the input

    # check if deg of polymerization is provided
    if '~' in rules[-1]:
        Xn = float(rules[-1].split('~')[1])
        rules[-1] = rules[-1].split('~')[0]
    else:
        Xn = 1.

    for rule in rules:
        # handle edge case where we have no rules, and rule is empty string
        if rule == "":
            continue
        # QC of input string
        if len(rule.split(':')) != 3:
            raise ValueError(f'incorrect format for input information "{rule}"')
        idx1, idx2 = rule.split(':')[0].split('-')
        w12 = float(rule.split(':')[1])  # weight for bond R_idx1 -> R_idx2
        w21 = float(rule.split(':')[2])  # weight for bond R_idx2 -> R_idx1
        polymer_info.append((idx1, idx2, w12, w21))
        counter[idx1] += float(w21)
        counter[idx2] += float(w12)

    # validate input: sum of incoming weights should be one for each vertex
    for k, v in counter.items():
        if np.isclose(v, 1.0) is False:
            raise ValueError(f'sum of weights of incoming stochastic edges should be 1 -- found {v} for [*:{k}]')
    return polymer_info, 1. + np.log10(Xn)

In [None]:
def tag_atoms_in_repeating_unit(mol):
    """
    Tags atoms that are part of the core units, as well as atoms serving to identify attachment points. In addition,
    create a map of bond types based on what bonds are connected to R groups in the input.
    """
    atoms = [a for a in mol.GetAtoms()]
    neighbor_map = {}  # map R group to index of atom it is attached to
    r_bond_types = {}  # map R group to bond type

    # go through each atoms and: (i) get index of attachment atoms, (ii) tag all non-R atoms
    for atom in atoms:
        # if R atom
        if '*' in atom.GetSmarts():
            # get index of atom it is attached to
            neighbors = atom.GetNeighbors()
            assert len(neighbors) == 1
            neighbor_idx = neighbors[0].GetIdx()
            r_tag = atom.GetSmarts().strip('[]').replace(':', '')  # *1, *2, ...
            neighbor_map[r_tag] = neighbor_idx
            # tag it as non-core atom
            atom.SetBoolProp('core', False)
            # create a map R --> bond type
            bond = mol.GetBondBetweenAtoms(atom.GetIdx(), neighbor_idx)
            r_bond_types[r_tag] = bond.GetBondType()
        # if not R atom
        else:
            # tag it as core atom
            atom.SetBoolProp('core', True)

    # use the map created to tag attachment atoms
    for atom in atoms:
        if atom.GetIdx() in neighbor_map.values():
            r_tags = [k for k, v in neighbor_map.items() if v == atom.GetIdx()]
            atom.SetProp('R', ''.join(r_tags))
        else:
            atom.SetProp('R', '')

    return mol, r_bond_types

In [None]:
class Featurization_parameters:
    """
    A class holding standard molecule featurization parameters as attributes.
    """
    def __init__(self) -> None:

        # Atom feature sizes
        self.MAX_ATOMIC_NUM = 100
        self.ATOM_FEATURES = {
            'atomic_num': list(range(self.MAX_ATOMIC_NUM)),
            'degree': [0, 1, 2, 3, 4, 5],
            'formal_charge': [-1, -2, 1, 2, 0],
            'chiral_tag': [0, 1, 2, 3],
            'num_Hs': [0, 1, 2, 3, 4],
            'hybridization': [
                Chem.rdchem.HybridizationType.SP,
                Chem.rdchem.HybridizationType.SP2,
                Chem.rdchem.HybridizationType.SP3,
                Chem.rdchem.HybridizationType.SP3D,
                Chem.rdchem.HybridizationType.SP3D2
            ],
        }

        # Distance feature sizes
        self.PATH_DISTANCE_BINS = list(range(10))
        self.THREE_D_DISTANCE_MAX = 20
        self.THREE_D_DISTANCE_STEP = 1
        self.THREE_D_DISTANCE_BINS = list(range(0, self.THREE_D_DISTANCE_MAX + 1, self.THREE_D_DISTANCE_STEP))

        # len(choices) + 1 to include room for uncommon values; + 2 at end for IsAromatic and mass
        self.ATOM_FDIM = sum(len(choices) + 1 for choices in self.ATOM_FEATURES.values()) + 2
        self.EXTRA_ATOM_FDIM = 0
        self.BOND_FDIM = 14
        self.EXTRA_BOND_FDIM = 0
        self.REACTION_MODE = None
        self.EXPLICIT_H = False
        self.REACTION = False
        self.POLYMER = False
        self.ADDING_H = False

# Create a global parameter object for reference throughout this module
PARAMS = Featurization_parameters()

In [None]:
def atom_features(atom: Chem.rdchem.Atom, PARAMS: Featurization_parameters, functional_groups = None):
    """
    Builds a feature vector for an atom.
    """
    if atom is None:
        features = [0] * PARAMS.ATOM_FDIM
    else:
        features = onek_encoding_unk(atom.GetAtomicNum() - 1, PARAMS.ATOM_FEATURES['atomic_num']) + \
            onek_encoding_unk(atom.GetTotalDegree(), PARAMS.ATOM_FEATURES['degree']) + \
            onek_encoding_unk(atom.GetFormalCharge(), PARAMS.ATOM_FEATURES['formal_charge']) + \
            onek_encoding_unk(int(atom.GetChiralTag()), PARAMS.ATOM_FEATURES['chiral_tag']) + \
            onek_encoding_unk(int(atom.GetTotalNumHs()), PARAMS.ATOM_FEATURES['num_Hs']) + \
            onek_encoding_unk(int(atom.GetHybridization()), PARAMS.ATOM_FEATURES['hybridization']) + \
            [1 if atom.GetIsAromatic() else 0] + \
            [atom.GetMass() * 0.01]  # scaled to about the same range as other features
        if functional_groups is not None:
            features += functional_groups
    return features

def bond_features(bond: Chem.rdchem.Bond, PARAMS: Featurization_parameters):
    """
    Builds a feature vector for a bond.
    """
    if bond is None:
        fbond = [1] + [0] * (PARAMS.BOND_FDIM - 1)
    else:
        bt = bond.GetBondType()
        fbond = [
            0,  # bond is not None
            bt == Chem.rdchem.BondType.SINGLE,
            bt == Chem.rdchem.BondType.DOUBLE,
            bt == Chem.rdchem.BondType.TRIPLE,
            bt == Chem.rdchem.BondType.AROMATIC,
            (bond.GetIsConjugated() if bt is not None else 0),
            (bond.IsInRing() if bt is not None else 0)
        ]
        fbond += onek_encoding_unk(int(bond.GetStereo()), list(range(6)))
    return fbond

In [None]:
def onek_encoding_unk(value: int, choices):
    """
    Creates a one-hot encoding with an extra category for uncommon values.
    """
    encoding = [0] * (len(choices) + 1)
    index = choices.index(value) if value in choices else -1
    encoding[index] = 1

    return encoding

In [None]:
def remove_wildcard_atoms(rwmol):
  """Removes the connection virtual atoms for open bonds"""
  indices = [a.GetIdx() for a in rwmol.GetAtoms() if '*' in a.GetSmarts()]
  while len(indices) > 0:
      rwmol.RemoveAtom(indices[0])
      indices = [a.GetIdx() for a in rwmol.GetAtoms() if '*' in a.GetSmarts()]
  Chem.SanitizeMol(rwmol, Chem.SanitizeFlags.SANITIZE_ALL)
  return rwmol

In [None]:
def convert_node_edge_mapping(node_edge_mapping):
  start_map_index = []
  end_map_index = []
  for i,edge_list in enumerate(node_edge_mapping):
    for edge_index in edge_list:
      start_index = i
      if edge_index % 2 != 0:
        prev_check = edge_index - 1
        for n,check_content in enumerate(node_edge_mapping):
          if prev_check in check_content:
            end_index = n
            start_map_index.extend([start_index, end_index])
            end_map_index.extend([end_index, start_index])
            break
  return [start_map_index, end_map_index]

In [None]:
import re
def smarts_to_smiles(smarts: str):
  pattern = r"\*:(\d+)"
  replacement = r"\1*"
  smiles = re.sub(pattern, replacement, smarts)
  return smiles

In [None]:
# testing featurization compatibility
def get_valid_smiles(smarts):
  smile = smarts_to_smiles(smarts)
  smile = smile.split("|")[0]
  return smile

In [None]:
from copy import deepcopy

In [None]:
from deepchem.feat.graph_data import GraphData

class CustomWeightedDirectedGraphFeaturizer():
  def __init__(self, atom_fdim = 133, bond_fdim=147):
    self.atom_fdim = atom_fdim
    self.bond_fdim = bond_fdim

  def _featurize(self, mol_string):
    # -----------------
    # Featurization Logic
    # -----------------

    n_atoms = 0  # number of atoms
    degree_of_polym = 1  # degree of polymerization
    n_bonds = 0  # number of bonds
    f_atoms = []  # mapping from atom index to atom features
    f_bonds = []  # mapping from bond index to concat(in_atom, bond) features
    w_bonds = []  # mapping from bond index to bond weight
    w_atoms = []  # mapping from atom index to atom weight
    a2b = []  # mapping from atom index to incoming bond indices
    b2a = []  # mapping from bond index to the index of the atom the bond is coming from
    b2revb = [] # mapping from bond index to the index of the reverse bond
    atom_features_extra = None
    overwrite_default_atom_features = True
    bond_features_extra = None
    overwrite_default_bond_features = False


    m = make_polymer_mol(mol_string.split("|")[0], keep_h=False, add_h=False, fragment_weights=mol_string.split("|")[1:-1])
    rules = mol_string.split("<")[1:]  # [str], list of rules
    polymer_info, degree_of_polym = parse_polymer_rules(rules)
    # make molecule editable
    rwmol = Chem.rdchem.RWMol(m)
    # tag (i) attachment atoms and (ii) atoms for which features needs to be computed
    # also get map of R groups to bonds types, e.f. r_bond_types[*1] -> SINGLE
    rwmol, r_bond_types = tag_atoms_in_repeating_unit(rwmol)

    # -----------------
    # Get atom features
    # -----------------
    # for all 'core' atoms, i.e. not R groups, as tagged before. Do this here so that atoms linked to
    # R groups have the correct saturation
    f_atoms = [atom_features(atom, PARAMS) for atom in rwmol.GetAtoms() if atom.GetBoolProp('core') is True]
    w_atoms = [atom.GetDoubleProp('w_frag') for atom in rwmol.GetAtoms() if atom.GetBoolProp('core') is True]

    if atom_features_extra is not None:
      if overwrite_default_atom_features:
        f_atoms = [descs.tolist() for descs in atom_features_extra]
      else:
        f_atoms = [f_atoms + descs.tolist() for f_atoms, descs in zip(f_atoms, atom_features_extra)]

    n_atoms = len(f_atoms)
    if atom_features_extra is not None and len(atom_features_extra) != n_atoms:
      raise ValueError(f'The number of atoms in {Chem.MolToSmiles(rwmol)} is different from the length of '
                                    f'the extra atom features')

    # remove R groups -> now atoms in rdkit Mol object have the same order as self.f_atoms
    rwmol = remove_wildcard_atoms(rwmol)

    # Initialize atom to bond mapping for each atom
    for _ in range(n_atoms):
      a2b.append([])

    # ---------------------------------------
    # Get bond features for separate monomers
    # ---------------------------------------
    for a1 in range(n_atoms):
      for a2 in range(a1 + 1, n_atoms):
        bond = rwmol.GetBondBetweenAtoms(a1, a2)
        if bond is None:
          continue

        f_bond = bond_features(bond, PARAMS)
        if bond_features_extra is not None:
          descr = bond_features_extra[bond.GetIdx()].tolist()
          if overwrite_default_bond_features:
            f_bond = descr
          else:
            f_bond += descr

        f_bonds.append(f_atoms[a1] + f_bond)
        f_bonds.append(f_atoms[a2] + f_bond)

        # Update index mappings
        b1 = n_bonds
        b2 = b1 + 1
        a2b[a2].append(b1)  # b1 = a1 --> a2
        b2a.append(a1)
        a2b[a1].append(b2)  # b2 = a2 --> a1
        b2a.append(a2)
        b2revb.append(b2)
        b2revb.append(b1)
        w_bonds.extend([1.0, 1.0])  # edge weights of 1.
        n_bonds += 2

    # ---------------------------------------------------
    # Get bond features for bonds between repeating units
    # ---------------------------------------------------
    # we duplicate the monomers present to allow (i) creating bonds that exist already within the same
    # molecule, and (ii) collect the correct bond features, e.g., for bonds that would otherwise be
    # considered in a ring when they are not, when e.g. creating a bond between 2 atoms in the same ring.
    rwmol_copy = deepcopy(rwmol)
    _ = [a.SetBoolProp('OrigMol', True) for a in rwmol.GetAtoms()]
    _ = [a.SetBoolProp('OrigMol', False) for a in rwmol_copy.GetAtoms()]
    # create an editable combined molecule
    cm = Chem.CombineMols(rwmol, rwmol_copy)
    cm = Chem.RWMol(cm)
    # show_mol(cm)

    # for all possible bonds between monomers:
    # add bond -> compute bond features -> add to bond list -> remove bond
    for r1, r2, w_bond12, w_bond21 in polymer_info:
      # get index of attachment atoms
      a1 = None  # idx of atom 1 in rwmol
      a2 = None  # idx of atom 1 in rwmol --> to be used by MolGraph
      _a2 = None  # idx of atom 1 in cm --> to be used by RDKit
      # print(r1,r2)
      for atom in cm.GetAtoms():
        # print(atom.GetProp('R'))
        # take a1 from a fragment in the original molecule object
        if f'{r1}*' in atom.GetProp('R') and atom.GetBoolProp('OrigMol') is True:
          a1 = atom.GetIdx()
        # take _a2 from a fragment in the copied molecule object, but a2 from the original
        if f'{r2}*' in atom.GetProp('R'):
          if atom.GetBoolProp('OrigMol') is True:
            a2 = atom.GetIdx()
          elif atom.GetBoolProp('OrigMol') is False:
            _a2 = atom.GetIdx()

      if a1 is None:
        raise ValueError(f'cannot find atom attached to [*:{r1}]')
      if a2 is None or _a2 is None:
        raise ValueError(f'cannot find atom attached to [*:{r2}]')
      # create bond
      order1 = r_bond_types[f'{r1}*']
      order2 = r_bond_types[f'{r2}*']
      if order1 != order2:
        raise ValueError(f'two atoms are trying to be bonded with different bond types: '
                                        f'{order1} vs {order2}')
      cm.AddBond(a1, _a2, order=order1)
      Chem.SanitizeMol(cm, Chem.SanitizeFlags.SANITIZE_ALL)

      # get bond object and features
      bond = cm.GetBondBetweenAtoms(a1, _a2)
      f_bond = bond_features(bond, PARAMS)
      if bond_features_extra is not None:
        descr = bond_features_extra[bond.GetIdx()].tolist()
        if overwrite_default_bond_features:
          f_bond = descr
        else:
          f_bond += descr

      f_bonds.append(f_atoms[a1] + f_bond)
      f_bonds.append(f_atoms[a2] + f_bond)
      # Update index mappings
      b1 = n_bonds
      b2 = b1 + 1
      a2b[a2].append(b1)  # b1 = a1 --> a2
      b2a.append(a1)
      a2b[a1].append(b2)  # b2 = a2 --> a1
      b2a.append(a2)
      b2revb.append(b2)
      b2revb.append(b1)
      w_bonds.extend([w_bond12, w_bond21])  # add edge weights
      n_bonds += 2

      # remove the bond
      cm.RemoveBond(a1, _a2)
      Chem.SanitizeMol(cm, Chem.SanitizeFlags.SANITIZE_ALL)

    if bond_features_extra is not None and len(bond_features_extra) != n_bonds / 2:
      raise ValueError(f'The number of bonds in {Chem.MolToSmiles(rwmol)} is different from the length of '
                                  f'the extra bond features')
    return f_atoms, f_bonds, w_atoms, w_bonds, a2b, b2a, b2revb, degree_of_polym

  def featurize(self, smiles: list, include_edge_features = True, generate_global_features_bool = True) -> np.ndarray:
    graph_data_list = []
    smiles = [smarts_to_smiles(s) for s in smiles]
    for smile in smiles:
      values = self._featurize(smile)
      node_features = np.array(values[0])
      edge_features = np.array(values[1])
      node_edge_mapping = values[4]
      converted_mapping = np.array(convert_node_edge_mapping(node_edge_mapping))
      if not generate_global_features_bool:
        G = GraphData(node_features, converted_mapping) if not include_edge_features else GraphData(node_features, converted_mapping, edge_features)
      else:
        mol = Chem.MolFromSmiles(get_valid_smiles(smile))
        # print("smiles", smiles)
        # print("mol", mol)
        global_features = np.empty(0)
        G = GraphData(node_features, converted_mapping, global_features=global_features) if not include_edge_features else GraphData(node_features, converted_mapping, edge_features, global_features=global_features)
      graph_data_list.append(G)
    return np.array(graph_data_list, dtype=object)

In [None]:
featurizer = CustomWeightedDirectedGraphFeaturizer()
feature_vector = featurizer.featurize(["[*:1]c1cc(F)c([*:2])cc1F.[*:3]c1c(O)cc(O)c([*:4])c1O|0.5|0.5|<1-3:0.5:0.5<1-4:0.5:0.5<2-3:0.5:0.5<2-4:0.5:0.5"])

In [None]:
print(feature_vector)

[GraphData(node_features=[17, 133], edge_index=[2, 42], edge_features=[42, 147], global_features=[0])]


## Discriminator Pipeline Setup

In [None]:
featurizer_mapper = {
    "GCN": MolGraphConvFeaturizer,
    "MAT": MATFeaturizer,
    "DMPNN": DMPNNFeaturizer,
    "DMPNN_WDGRAPH": CustomWeightedDirectedGraphFeaturizer
}

model_mapper = {
    "GCN": GCNModel,
    "MAT": MATModel,
    "DMPNN": DMPNNModel
}

In [None]:
def crossEntropyLoss(output, labels):
    ce_loss = torch.nn.CrossEntropyLoss(reduction='none')
    # Convert (batch_size, tasks, classes) to (batch_size, classes, tasks)
    # CrossEntropyLoss only supports (batch_size, classes, tasks)
    if len(output.shape) == 3:
        output = output.permute(0, 2, 1)

    if len(labels.shape) == len(output.shape):
        labels = labels.squeeze(-1)
    return ce_loss(torch.tensor(output), torch.tensor(labels))

In [None]:
def validate_PSMILES_for_encoding(psmiles: str):
    mol = Chem.MolFromSmiles(psmiles)
    if mol is None:
        return False
    try:
        mol = Chem.AddHs(mol)
        AllChem.EmbedMolecule(mol, maxAttempts=5000)
        AllChem.UFFOptimizeMolecule(mol)
        mol = Chem.RemoveHs(mol)
        return True
    except ValueError:
        try:
            AllChem.Compute2DCoords(mol)
            return True
        except:
            return False
    except:
        return False

def validate_modify_PSMILES_for_MAT(psmiles_df):
    psmiles_df['validate'] = psmiles_df['smiles'].apply(validate_PSMILES_for_encoding)
    return psmiles_df[psmiles_df['validate'] == True]

In [None]:
class PolymerDiscriminatorPipeline():
  def __init__(self, task: str, model_name: str, input_type: str, batch_size: int = 3, save: bool = True, filepath: str= "/content/artifacts"):
    ALLOWED_MODELS = ["GCN", "MAT", "DMPNN"]
    ALLOWED_INPUT_TYPE = ["PSMILES", "WDGRAPH"]

    if task not in ["regression", "classification"]:
      raise ValueError("Task must be either 'regression' or 'classification'")

    if model_name not in ALLOWED_MODELS:
      raise ValueError(f"Model must be one of {ALLOWED_MODELS}")

    if input_type not in ALLOWED_INPUT_TYPE:
      raise ValueError(f"Input type must be one of {ALLOWED_INPUT_TYPE}")

    if input_type  == "WDGRAPH" and model_name == "MAT":
      raise ValueError("MAT model does not support WDGRAPH input")

    self.id = str(uuid.uuid4())
    self.base_dir = os.path.join(filepath, str(self.id))
    self.task = task
    self.model_name = model_name
    self.input_type = input_type
    self.batch_size = batch_size
    self.save = save
    self.model = None
    self.featurizer = None
    self.report = None
    self.num_classes = None

  def _init_filesystem(self):
    # Create the base directory if it does not exist
    if not os.path.exists(self.base_dir):
        os.makedirs(self.base_dir)

    # Define the subdirectories to create
    subdirectories = ['dataset', 'models']

    # Create each subdirectory within the base directory
    for subdir in subdirectories:
        subdir_path = os.path.join(self.base_dir, subdir)
        if not os.path.exists(subdir_path):
            os.makedirs(subdir_path)

  def _write_artifacts(self):
    model_path = os.path.join(self.base_dir, 'models')
    report_path = os.path.join(self.base_dir, 'report.json')
    params_path = os.path.join(self.base_dir, 'params.json')
    self.model.save_checkpoint(model_dir = model_path)
    with open(report_path, 'w') as f:
      json.dump(self.report, f)
    params = {
        "task" : self.task,
        "model_name" : self.model_name,
        "batch_size" : self.batch_size,
        "id" : str(self.id),
        "input_type" : self.input_type
    }
    if self.task == "classification":
      params["num_classes"] = str(self.num_classes)
    with open(params_path, 'w') as f:
      json.dump(params, f)

  def _prepare_data(self, df, train_ratio: float = 0.8):
    if self.model_name == "MAT":
      df = validate_modify_PSMILES_for_MAT(df)
    train_df, test_df = train_test_split(df, test_size=1-train_ratio, random_state=42)
    train_path, test_path = os.path.join(self.base_dir, "train.csv"), os.path.join(self.base_dir, "test.csv")
    train_df.to_csv(train_path)
    test_df.to_csv(test_path)
    return train_path, test_path

  def _featurize(self, train_path, test_path):
    train_df = pd.read_csv(train_path)
    test_df = pd.read_csv(test_path)

    if self.model_name == "DMPNN" and self.input_type == "WDGRAPH":
      featurizer = featurizer_mapper["DMPNN_WDGRAPH"]()
    else:
      featurizer = featurizer_mapper[self.model_name]()

    self.featurizer = featurizer

    X_train = featurizer.featurize(train_df["smiles"].values)
    y_train = train_df["value"].values

    X_test = featurizer.featurize(test_df["smiles"].values)
    y_test = test_df["value"].values

    return X_train, y_train, X_test, y_test

  def _prepare_input(self, X_train, y_train, X_test, y_test):
    train_dataset = NumpyDataset(X_train, y_train)
    test_dataset = NumpyDataset(X_test, y_test)
    return train_dataset, test_dataset

  def _train(self, train_dataset, num_epochs):
    if self.task == "regression" and self.input_type == "PSMILES":
      model = model_mapper[self.model_name](mode=self.task, batch_size=self.batch_size, n_tasks = 1)
    elif self.task == "regresssion" and self.input_type == "WDGRAPH":
      if self.model_name == "GCN":
        model = model_mapper[self.model_name](mode=self.task, batch_size=self.batch_size, n_tasks = 1, n_classes = 1, number_atom_features=133)
      if self.model_name == "DMPNN":
        if self.featurizer is not None:
          model = model_mapper[self.model_name](model=self.task,
                                                batch_size=self.batch_size,
                                                n_tasks= 1,
                                                use_default_fdim = False,
                                                atom_fdim = self.featurizer.atom_fdim,
                                                bond_fdim = self.featurizer.bond_fdim)
    else:
      max_target_encode = train_dataset.y.max()
      self.num_classes = max_target_encode + 1
      model = model_mapper[self.model_name](mode=self.task, batch_size=self.batch_size, n_tasks = 1, n_classes = self.num_classes)
    train_loss = model.fit(train_dataset, nb_epoch=num_epochs)
    return model, train_loss

  def _evaluate(self, model, test_dataset):
    if self.task == "regression":
      metric = Metric(mean_squared_error)
      test_loss = model.evaluate(test_dataset, [metric])
    else:
      pred = model.predict(test_dataset)
      test_loss = crossEntropyLoss(pred, test_dataset.y)
      test_loss = test_loss.mean().item()
    return test_loss

  def __call__(self, df: pd.DataFrame, num_epochs: int, train_ratio: float = 0.8):
    if self.model is not None:
      return self.report
    self._init_filesystem()
    train_path, test_path = self._prepare_data(df, train_ratio)
    if self.model_name == "MAT":
      data_loader = CSVLoader(tasks=['value'], feature_field='smiles', featurizer=featurizer_mapper[self.model_name]())
      train_dataset = data_loader.create_dataset(train_path)
      test_dataset = data_loader.create_dataset(test_path)
    else:
      X_train, y_train, X_test, y_test = self._featurize(train_path, test_path)
      train_dataset, test_dataset = self._prepare_input(X_train, y_train, X_test, y_test)
    model, train_loss = self._train(train_dataset, num_epochs)
    test_loss = self._evaluate(model, test_dataset)
    report = {
        "model" : self.model_name,
        "task" : self.task,
        "train_loss" : train_loss,
        "test_loss" : test_loss
    }
    self.model = model
    self.report = report
    if self.save:
      self._write_artifacts()
    return report

  def predict(self, datapoints):
    if self.model is None:
      raise ValueError("Model has not been trained yet")
    pred_featurizer = featurizer_mapper[self.model_name]()
    pred_dataset = pred_featurizer.featurize(datapoints)
    pred_dataset = NumpyDataset(pred_dataset)
    pred = self.model.predict(pred_dataset)
    return pred

  @classmethod
  def load_pipeline(cls, pipeline_id: str, filepath: str = "/content/artifacts"):
    base_dir = os.path.join(filepath, pipeline_id)
    model_path = os.path.join(base_dir, 'models')
    report_path = os.path.join(base_dir, 'report.json')
    params_path = os.path.join(base_dir, 'params.json')
    with open(params_path, 'r') as f:
      params = json.load(f)
    with open(report_path, 'r') as f:
      report = json.load(f)
    if params["task"] == "regression":
      model= model_mapper[params["model_name"]](mode=params["task"], batch_size=params["batch_size"], n_tasks=1)
    else:
      max_target_encode = params["num_classes"]
      model= model_mapper[params["model_name"]](mode=params["task"], batch_size=params["batch_size"], n_tasks=1, n_classes=int(max_target_encode))
    checkpoints = model.get_checkpoints(model_dir = model_path)
    if len(checkpoints) == 0:
      raise ValueError("No checkpoints found")
    else:
      checkpoint = checkpoints[-1]
    checkpoint_path = os.path.join(model_path, checkpoint)
    model.restore(checkpoint = checkpoint_path)
    pipeline = cls(task=params["task"], input_type=params["input_type"], model_name=params["model_name"], batch_size=params["batch_size"], save=False)
    pipeline.model = model
    pipeline.report = report
    return pipeline

### Regression Application (PSMILES)

In [None]:
! wget "https://media.githubusercontent.com/media/ChangwenXu98/TransPolymer/master/data/Xc.csv"

--2024-10-07 10:44:20--  https://media.githubusercontent.com/media/ChangwenXu98/TransPolymer/master/data/Xc.csv
Resolving media.githubusercontent.com (media.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to media.githubusercontent.com (media.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 19747 (19K) [text/plain]
Saving to: ‘Xc.csv’


2024-10-07 10:44:20 (90.1 MB/s) - ‘Xc.csv’ saved [19747/19747]



In [None]:
reg_df = pd.read_csv("/content/Xc.csv")
print("Number of data points", reg_df.shape[0])
reg_df.head()

Number of data points 432


Unnamed: 0,smiles,value
0,*C*,47.8
1,*CC(*)C,44.47
2,*CC(*)CC,34.04
3,*CC(*)CCC,20.01
4,*CC(*)CC(C)C,21.64


In [None]:
gcn_reg_pipeline = PolymerDiscriminatorPipeline(task="regression", model_name="GCN", input_type="PSMILES")
gcn_reg_report = gcn_reg_pipeline(reg_df, num_epochs = 30)
print("GCN regression report >>", gcn_reg_report)

GCN regression report >> {'model': 'GCN', 'task': 'regression', 'train_loss': 414.6894921875, 'test_loss': {'mean_squared_error': 502.17264126766247}}


In [None]:
gcn_reg_pipeline.id

'483f8eee-83b9-40a0-8462-a0d18b0ede65'

In [None]:
restored_pipeline = PolymerDiscriminatorPipeline.load_pipeline(gcn_reg_pipeline.id)

In [None]:
dataset = ["*CC(*)CCC"]
prediction = restored_pipeline.predict(dataset)
print(prediction)

[[34.301296]]


In [None]:
mat_reg_pipeline = PolymerDiscriminatorPipeline(task="regression", model_name="MAT", input_type = "PSMILES")
mat_reg_report = mat_reg_pipeline(reg_df, num_epochs = 1)

  node_features = torch.tensor(data[0]).float()
  adjacency_matrix = torch.tensor(data[1]).float()
  distance_matrix = torch.tensor(data[2]).float()
  torch.sum(torch.tensor(adj_matrix), dim=-1).unsqueeze(2) + eps)
  distance_matrix = torch.tensor(distance_matrix).squeeze().masked_fill(


In [None]:
print("MAT regression report >>", mat_reg_report)

MAT regression report >> {'model': 'MAT', 'task': 'regression', 'train_loss': 301.09622628348217, 'test_loss': {'mean_squared_error': 641.5117670057623}}


In [None]:
restored_pipeline = PolymerDiscriminatorPipeline.load_pipeline(mat_reg_pipeline.id)

In [None]:
dataset = ["*CC(*)CCC"]
prediction = restored_pipeline.predict(dataset)
print(prediction)

[[33.681526]]


  node_features = torch.tensor(data[0]).float()
  adjacency_matrix = torch.tensor(data[1]).float()
  distance_matrix = torch.tensor(data[2]).float()
  torch.sum(torch.tensor(adj_matrix), dim=-1).unsqueeze(2) + eps)
  distance_matrix = torch.tensor(distance_matrix).squeeze().masked_fill(


In [None]:
dmpnn_reg_pipeline = PolymerDiscriminatorPipeline(task="regression", model_name="DMPNN",input_type="PSMILES")
dmpnn_reg_report = dmpnn_reg_pipeline(reg_df, num_epochs = 35)
print("DMPNN regression report >>", dmpnn_reg_report)

DMPNN regression report >> {'model': 'DMPNN', 'task': 'regression', 'train_loss': 254.54958984375, 'test_loss': {'mean_squared_error': 345.74142942520024}}


In [None]:
dmpnn_reg_pipeline.id

'6761f33d-e04a-4e41-8588-6b08e15aa6ad'

In [None]:
restored_dmpnn_pipeline = PolymerDiscriminatorPipeline.load_pipeline(gcn_reg_pipeline.id)

In [None]:
dataset = ["*CC(*)CCC"]
pred_dmpnn = restored_dmpnn_pipeline.predict(dataset)
print(pred_dmpnn)

[[34.301296]]


### Regression Application (WDGRAPH)

In [None]:
! wget "https://raw.githubusercontent.com/coleygroup/polymer-chemprop-data/refs/heads/main/datasets/vipea/chemprop_inputs/dataset-poly_chemprop.csv"

--2024-10-07 10:47:57--  https://raw.githubusercontent.com/coleygroup/polymer-chemprop-data/refs/heads/main/datasets/vipea/chemprop_inputs/dataset-poly_chemprop.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 10542141 (10M) [text/plain]
Saving to: ‘dataset-poly_chemprop.csv’


2024-10-07 10:47:58 (132 MB/s) - ‘dataset-poly_chemprop.csv’ saved [10542141/10542141]



In [None]:
reg_df_wdgraph = pd.read_csv("/content/dataset-poly_chemprop.csv")
print("Number of data points", reg_df_wdgraph.shape[0])
reg_df_wdgraph["smiles"] = reg_df_wdgraph["poly_chemprop_input"].values
reg_df_wdgraph["value"] = reg_df_wdgraph["IP vs SHE (eV)"].values
reg_df_wdgraph = reg_df_wdgraph[["smiles", "value"]]
# reg_df_wdgraph.head()
reg_df_wdgraph = reg_df_wdgraph.iloc[:200,:]
reg_df_wdgraph.shape

Number of data points 42966


TypeError: 'tuple' object is not callable

In [None]:
dmpnn_wdgraph_reg_pipeline = PolymerDiscriminatorPipeline(task="regression", model_name="DMPNN", input_type="WDGRAPH")
dmpnn_wdgraph_reg_report = dmpnn_wdgraph_reg_pipeline(reg_df_wdgraph, num_epochs = 10)
print("DMPNN WDGRAPH regression report >>", dmpnn_wdgraph_reg_report)

In [None]:
print(reg_df_wdgraph["smiles"].values[0])

In [None]:
custom = CustomWeightedDirectedGraphFeaturizer()
vector = custom.featurize(["[*:1]c1cc(F)c([*:2])cc1F.[*:3]c1c(O)cc(O)c([*:4])c1O|0.5|0.5|<1-3:0.5:0.5<1-4:0.5:0.5<2-3:0.5:0.5<2-4:0.5:0.5"])

### Classification Application

In [None]:
! wget https://raw.githubusercontent.com/TRY-ER/sample_polymer_discriminator_pipeline/master/OPV_cat_split.csv

In [None]:
class_df = pd.read_csv("/content/OPV_cat_split.csv")

In [None]:
class_df.head()

In [None]:
gcn_class_pipeline = PolymerDiscriminatorPipeline(task="classification", model_name="GCN")
gcn_class_report = gcn_class_pipeline(class_df, num_epochs = 2)
print("GCN classification report >>", gcn_class_report)

In [None]:
gcn_class_pipeline.id

In [None]:
gcn_restored_class_pipeline = PolymerDiscriminatorPipeline.load_pipeline(gcn_class_pipeline.id)

In [None]:
dataset = ["CC1=CC(CCCCCC)=C(C)S1"]
prediction = gcn_restored_class_pipeline.predict(dataset)
print(prediction)

In [None]:
dmpnn_class_pipeline = PolymerDiscriminatorPipeline(task="classification", model_name="DMPNN")
dmpnn_class_report = dmpnn_class_pipeline(class_df, num_epochs = 2)
print("DMPNN classification report >>", dmpnn_class_report )

In [None]:
dmpnn_class_pipeline.id

In [None]:
dmpnn_restored_class_pipeline = PolymerDiscriminatorPipeline.load_pipeline(dmpnn_class_pipeline.id)

In [None]:
dataset = ["CC1=CC(CCCCCC)=C(C)S1"]
prediction = dmpnn_restored_class_pipeline.predict(dataset)
print(prediction)