In [55]:
from rdkit import Chem
import torch
import torch_geometric
from torch_geometric.data import Data, Dataset, download_url, DataLoader
from torch_geometric.loader import DataLoader
import os.path as osp
import pandas as pd
import numpy as np
from tqdm import tqdm

In [56]:
class NovoDataset(Dataset):
    def __init__(self, root, filename, test=False, transform=None, pre_transform=None):
        self.test = test
        self.filename = filename
        super(NovoDataset, self).__init__(root, transform, pre_transform)

    @property
    def raw_file_names(self):
        return self.filename

    @property
    def processed_file_names(self):
        self.data = pd.read_csv(self.raw_paths[0]).reset_index()

        if self.test:
            return [f'data_test_{i}.pt' for i in list(self.data.index)]
        else:
            return [f'data_{i}.pt' for i in list(self.data.index)]

    def download(self):
        # Download to `self.raw_dir`.
        pass

    def process(self):
        self.data = pd.read_csv(self.raw_paths[0])
        for index, mol in tqdm(self.data.iterrows(), total=self.data.shape[0]):
            mol_obj = Chem.MolFromSequence(mol["protein_sequence"])
            ph = mol["pH"]
            
            # Get node features
            node_feats = self._get_node_features(mol_obj, ph)
            
            # Get edge features
            edge_feats = self._get_edge_features(mol_obj)
            
            # Get adjacency info
            edge_index = self._get_adjacency_info(mol_obj)
            
            
            if self.test:
                #Create data object for testing data
                data = Data(x=node_feats, edge_index=edge_index, edge_attr=edge_feats) 
                
                #save test data
                torch.save(data, osp.join(self.processed_dir, f'data_test_{index}.pt'))
            else:
                # Get labels info
                label = self._get_labels(mol["tm"])
                
                #Create data object for training data
                data = Data(x=node_feats, edge_index=edge_index, edge_attr=edge_feats, y=label) 
                
                #save train data
                torch.save(data, osp.join(self.processed_dir, f'data_{index}.pt'))
        
    def _get_node_features(self, mol, ph):
        """ 
        This will return a matrix / 2d array of the shape
        [Number of Nodes, Node Feature size]
        """
        all_node_feats = []
    
        for atom in mol.GetAtoms():
            node_feats = []
            # Feature 1: Atomic number        
            node_feats.append(atom.GetAtomicNum())
            # Feature 2: Atom degree
            node_feats.append(atom.GetDegree())
            # Feature 3: Formal charge
            node_feats.append(atom.GetFormalCharge())
            # Feature 4: Hybridization
            node_feats.append(atom.GetHybridization())
            # Feature 5: Aromaticity
            node_feats.append(atom.GetIsAromatic())
            # Feature 6: Total Num Hs
            node_feats.append(atom.GetTotalNumHs())
            # Feature 7: Radical Electrons
            node_feats.append(atom.GetNumRadicalElectrons())
            # Feature 8: In Ring
            node_feats.append(atom.IsInRing())
            # Feature 9: Chirality
            node_feats.append(atom.GetChiralTag())
            # Feature 10: ph
            node_feats.append(ph)
    
            # Append node features to matrix
            all_node_feats.append(node_feats)

        all_node_feats = np.asarray(all_node_feats)
        return torch.tensor(all_node_feats, dtype=torch.float)

    def _get_edge_features(self, mol):
        """ 
        This will return a matrix / 2d array of the shape
        [Number of edges, Edge Feature size]
        """
        all_edge_feats = []

        for bond in mol.GetBonds():
            edge_feats = []
            # Feature 1: Bond type (as double)
            edge_feats.append(bond.GetBondTypeAsDouble())
            # Feature 2: Rings
            edge_feats.append(bond.IsInRing())
            # Append node features to matrix (twice, per direction)
            all_edge_feats += [edge_feats, edge_feats]

        all_edge_feats = np.asarray(all_edge_feats)
        return torch.tensor(all_edge_feats, dtype=torch.float)

    def _get_adjacency_info(self, mol):
        """
        We could also use rdmolops.GetAdjacencyMatrix(mol)
        but we want to be sure that the order of the indices
        matches the order of the edge features
        """
        edge_indices = []
        for bond in mol.GetBonds():
            i = bond.GetBeginAtomIdx()
            j = bond.GetEndAtomIdx()
            edge_indices += [[i, j], [j, i]]

        edge_indices = torch.tensor(edge_indices)
        edge_indices = edge_indices.t().to(torch.long).view(2, -1)
        return edge_indices

    def _get_labels(self, label):
        return torch.tensor(label, dtype=torch.float)
    
    def len(self):
        return self.data.shape[0]

    def get(self, idx):
        """ - Equivalent to __getitem__ in pytorch
            - Is not needed for PyG's InMemoryDataset
        """
        if self.test:
            data = torch.load(osp.join(self.processed_dir, f'data_test_{idx}.pt'))
        else:
            data = torch.load(osp.join(self.processed_dir, f'data_{idx}.pt'))   
        return data

In [57]:
train_dataset = NovoDataset(root="data/", filename="training.csv")

Processing...
100%|███████████████████████████████████| 26980/26980 [1:03:16<00:00,  7.11it/s]
Done!


In [64]:
test_dataset = NovoDataset(root="data/", filename="testing.csv", test=True)

Processing...
100%|███████████████████████████████████████| 2413/2413 [00:52<00:00, 46.22it/s]
Done!
