# Convert SMILES/HELMS into images

In [1]:
import os
from rdkit import Chem
from rdkit.Chem import Draw
from joblib import Parallel, delayed
from tqdm import tqdm

def smile_to_image(smile, path, img_size=(250, 250)):
    mol = Chem.MolFromSmiles(smile)
    draw = Draw.MolToFile(mol, path, size=img_size)

def convert_smi_to_img(df, out_dir, img_size=(250, 250)):
    smiles = list(df.smi)
    ids = list(df.index)
    
    img_paths = [os.path.join(out_dir, f'smi_images/{id}.png') for id in ids]

    os.makedirs(os.path.join(out_dir, 'smi_images'), exist_ok=True)
    Parallel(n_jobs=os.cpu_count())(delayed(smile_to_image)(s, p, img_size) for s, p in tqdm(zip(smiles, img_paths), cols=80))

    return img_paths

# CycPeptMPDB


In [2]:
import pandas as pd


# train = True
train = False

if train:
    df_cycpdb_all = pd.read_csv('data/CycPeptMPDB/all.csv')
    df_cycpdb_all['smi_img'] = convert_smi_to_img(df_cycpdb_all, 'data/CycPeptMPDB')

    from sklearn.model_selection import train_test_split

    df_cycpdb_train, df_cycpdb_test = train_test_split(df_cycpdb_all, test_size=0.2, random_state=42)

    df_cycpdb_all.to_csv('data/CycPeptMPDB/img_all.csv', index=True)
    df_cycpdb_train.to_csv('data/CycPeptMPDB/img_train.csv', index=True)
    df_cycpdb_test.to_csv('data/CycPeptMPDB/img_test.csv', index=True)
else:
    df_cycpdb_all = pd.read_csv('data/CycPeptMPDB/img_all.csv', index_col=0)
    df_cycpdb_train = pd.read_csv('data/CycPeptMPDB/img_train.csv', index_col=0)
    df_cycpdb_test = pd.read_csv('data/CycPeptMPDB/img_test.csv', index_col=0)

print(len(df_cycpdb_all), len(df_cycpdb_train), len(df_cycpdb_test))


7451 5960 1491


## Test fastai

In [55]:
import numpy as np
import cv2
import os
import tqdm
import pandas as pd
from joblib import Parallel, delayed
import os
import tqdm
import re

class ImageAugmentations:

    def __init__(self, out_dir):
        self.out_dir = out_dir

    @staticmethod
    def rotating(img, num_rotations=6):
        (h, w) = img.shape[:2]
        center = (w / 2, h / 2)
        rotated_images = []
        for i in range(num_rotations):
            rotations = list(range(0, 180, num_rotations))
            M = cv2.getRotationMatrix2D(center, rotations[i], 1.0)
            rotated = cv2.warpAffine(img, M, (w, h))
            rotated_images.append(rotated)
        return rotated_images

    @staticmethod
    def flipping(img):
        flipped_images = []
        originalImage = img
        flipVertical = cv2.flip(originalImage, 0)
        flipHorizontal = cv2.flip(originalImage, 1)
        flipBoth = cv2.flip(originalImage, -1)
        flipped_images.append(flipVertical)
        flipped_images.append(flipHorizontal)
        flipped_images.append(flipBoth)
        return flipped_images

    def aug_image(self, raw_img_path, save_dir):
        counter = 0
        fname = os.path.basename(raw_img_path)
        img = cv2.imread(raw_img_path)
        rot_imgs = self.rotating(img)
        aug_img_paths = []
        for im in rot_imgs:
            flip = self.flipping(im)
            for flipped in flip:
                aug_img_path = os.path.join(save_dir, f'{fname}_aug_{counter}.png')
                cv2.imwrite(aug_img_path, flipped)
                counter += 1
                aug_img_paths.append(aug_img_path)

        return raw_img_path, aug_img_paths

    def do_image_augmentations(self, paths, dir_name='aug_images'):
        save_dir = os.path.join(self.out_dir, dir_name)
        os.makedirs(save_dir, exist_ok=True)
        print(f"augmented images are saved to {save_dir}")
        aug_paths = Parallel(n_jobs=os.cpu_count())(delayed(self.aug_image)(p, save_dir) for p in tqdm.tqdm(paths, ncols=80))
        return dict(aug_paths) # return dict of raw image path and augmented image paths

In [62]:
df_to_aug = df_cycpdb_train.sample(5)
augmentor = ImageAugmentations(out_dir='test/')
aug_paths_dict = augmentor.do_image_augmentations(df_to_aug.smi_img)

columns = df_to_aug.columns

augmented_images = []
for index, row in df_to_aug.iterrows():
    row_values = {col: row[col] for col in columns}
    for aug_path in aug_paths_dict[row['smi_img']]:
        row_values['smi_img'] = aug_path
        augmented_images.append(tuple(row_values.values()))

df_augmented_images = pd.DataFrame(augmented_images, columns=columns)
len(df_augmented_images)

augmented images are saved to test/aug_images


100%|███████████████████████████████████████████| 5/5 [00:00<00:00, 5414.80it/s]


90

In [96]:
from model.vit import VisionTransformer, CONFIGS
from urllib.request import urlretrieve

def load_vit_model():
    os.makedirs("data/vit_models", exist_ok=True)
    if not os.path.isfile("data/vit_models/ViT-B_16-224.npz"):
        urlretrieve("https://storage.googleapis.com/vit_models/imagenet21k+imagenet2012/ViT-B_16-224.npz", 
                    "data/vit_models/ViT-B_16-224.npz")
        
    config = CONFIGS["ViT-B_16"]
    model = VisionTransformer(config, num_classes=1000, zero_head=False, img_size=224, vis=True) 
    model.load_from(np.load("data/vit_models/ViT-B_16-224.npz"))
    return model

In [95]:
import logging

import torch

from torchvision import transforms, datasets
from torch.utils.data import DataLoader, RandomSampler, DistributedSampler, SequentialSampler


def get_loader(batch_size=128):
    transform_train = transforms.Compose([
        transforms.RandomResizedCrop((224, 224), scale=(0.05, 1.0)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]),
    ])
    transform_test = transforms.Compose([transforms.Resize((224, 224)),
                                         transforms.ToTensor(),
                                         transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]),])

    trainset = datasets.CIFAR10(root="./data",
                                    train=True,
                                    download=True,
                                    transform=transform_train)
    testset = datasets.CIFAR10(root="./data",
                                   train=False,
                                   download=True,
                                   transform=transform_test)
    train_sampler = RandomSampler(trainset)
    test_sampler = SequentialSampler(testset)
    train_loader = DataLoader(trainset,
                              sampler=train_sampler,
                              batch_size=batch_size,
                              num_workers=4,
                              pin_memory=True)
    test_loader = DataLoader(testset,
                             sampler=test_sampler,
                             batch_size=batch_size,
                             num_workers=4,
                             pin_memory=True) if testset is not None else None

    return train_loader, test_loader


In [None]:
model = load_vit_model()
train_loader, test_loader = get_loader(batch_size=16)

optimizer = torch.optim.SGD(model.parameters(), lr=1e-4, momentum=0.9, weight_decay=0.98)

device = torch.device('cuda:1')
model.train()
model.to(device)
epoch_iterator = tqdm.tqdm(train_loader, desc="Training (X / X Steps) (loss=X.X)",)

for step, batch in enumerate(epoch_iterator):
    batch = tuple(t.to(device) for t in batch)
    x, y = batch
    loss = model(x, y)
    loss.backward()
    optimizer.step()
    optimizer.zero_grad()
    epoch_iterator.set_description(f"Training (loss={loss.item():2.5f})")