In [51]:
import sys
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import random
import pandas as pd
import matplotlib.pyplot as plt
import csv
import os
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device = torch.device("cpu")

In [None]:
with open('../Data/train.csv', 'r') as f:
    reader = csv.reader(f)
    train_data = list(reader)[1:]
  
train_data

In [55]:
module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path)

from src.tokenizer import GPT4Tokenizer
tokenizer = GPT4Tokenizer()
tokenizer.load_vocab('vocab.json')
tokenizer.vocab[512] = '<none>'


In [56]:
print(tokenizer.encode("NC(=O)c1ccc(Cl)cc1)[C@H]1[C@@H]2C[C@@H](n3cnc4cccc(F)c43)C[C@@H]21CNC[C@H](c1ccc2ccccc2c1)[C@H](O)c1cncc(OC)c1COc1cnc(NCCNCCCF)cc1[C@@H]1c2[nH]c3ccccc3c2C[C@@H](C)N1CC(F)(F)FC[C@@]1(c2cc(NC(=O)c3ccc(C#N)cn3)ccc2F)Cn2cc(C#N)n"))

[375, 269, 335, 324, 487, 438, 347, 413, 386, 256, 270, 288, 356, 488, 272, 289, 278, 352, 360, 325, 278, 302, 338, 256, 412, 41, 308, 79, 338, 313, 257, 78, 390, 498, 407, 485, 358, 400, 408, 457, 371, 336, 49, 385, 397, 284, 280, 454, 273, 361, 439, 67, 420, 259, 344, 296]


In [57]:
encoded_data = []
for row in train_data:
    text = row[0]  # SMILES string
    label = int(row[1])  # Label
    tokens = tokenizer.encode(text)
    encoded_data.append((torch.tensor(tokens), label))

In [58]:
# find max mun of tokens in encoded data
max_len = max([len(tokens) for tokens, label in encoded_data])
max_len

59

In [81]:
# pad all sequences to the same length
padded_data = []
for tokens, label in encoded_data:
    padded_tokens = F.pad(tokens, (0, 64 - len(tokens)), value=512)
    padded_data.append((padded_tokens, torch.tensor(label)))

In [82]:
padded_data[0]

(tensor([ 67, 289, 278, 352, 465, 325, 279, 302, 320, 353, 258, 512, 512, 512,
         512, 512, 512, 512, 512, 512, 512, 512, 512, 512, 512, 512, 512, 512,
         512, 512, 512, 512, 512, 512, 512, 512, 512, 512, 512, 512, 512, 512,
         512, 512, 512, 512, 512, 512, 512, 512, 512, 512, 512, 512, 512, 512,
         512, 512, 512, 512, 512, 512, 512, 512]),
 tensor(0))

In [None]:
def get_batch(data, batch_size=32):
    batch = random.sample(data, batch_size)
    tokens, labels = zip(*batch)
    tokens = torch.stack(tokens).to(device)
    labels = torch.tensor(labels, dtype=torch.long).to(device)
    return tokens, labels


TypeError: get_batch() missing 1 required positional argument: 'data'

In [104]:
def estimate_accuracy(model, data, batch_size=32):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for i in range(0, len(data), batch_size):
            batch = data[i:i+batch_size]
            tokens, labels = zip(*batch)
            tokens = torch.stack(tokens).to(device)
            labels = torch.tensor(labels, dtype=torch.long).to(device)
            outputs, _ = model(tokens)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    model.train()
    return correct / total

In [85]:
class Attention(nn.Module):
    def __init__(self, emb_dim, n_heads, dropout):
        super().__init__()
        self.att = nn.MultiheadAttention(emb_dim, n_heads, dropout=dropout)
        self.q = nn.Linear(emb_dim, emb_dim)
        self.k = nn.Linear(emb_dim, emb_dim)
        self.v = nn.Linear(emb_dim, emb_dim)
        
    def forward(self, x):
        q = self.q(x).transpose(0, 1)
        k = self.k(x).transpose(0, 1)
        v = self.v(x).transpose(0, 1)
        attn_output, _ = self.att(q, k, v)
        return attn_output.transpose(0, 1)

In [86]:
class FeedForward(nn.Sequential):
    def __init__(self, emb_dim, dropout):
        super().__init__(
            nn.Linear(emb_dim, emb_dim * 4),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(emb_dim * 4, emb_dim),
            nn.Dropout(dropout)
        )

In [87]:
class TransformerBlock(nn.Module):
    def __init__(self, emb_dim, n_heads, dropout):
        super().__init__()
        self.attention = Attention(emb_dim, n_heads, dropout)
        self.norm1 = nn.LayerNorm(emb_dim)
        self.ff = FeedForward(emb_dim, dropout)
        self.norm2 = nn.LayerNorm(emb_dim)
        
    def forward(self, x):
        x = x + self.attention(self.norm1(x))
        x = x + self.ff(self.norm2(x))
        return x

In [94]:
vocab_size = len(tokenizer.vocab)


class BioTransformer(nn.Module):
    def __init__(self, emb_dim=64, out_dim=2, n_heads=4, n_layers=6,
                 vocab_size=vocab_size, block_size=240, dropout=0.):
        super().__init__()
        self.token_embedding = nn.Embedding(vocab_size, emb_dim, padding_idx=vocab_size - 1)
        self.pos_embedding = nn.Embedding(block_size, emb_dim)
        self.cls_token = nn.Parameter(torch.randn(1, 1, emb_dim))
        self.blocks = nn.Sequential(
            *[TransformerBlock(emb_dim, n_heads, dropout) for _ in range(n_layers)]
        )
        self.l_head = nn.Linear(emb_dim, out_dim)
        
    def forward(self, x, targets=None):
        x = self.token_embedding(x)
        b, n, _ = x.shape
        x = x + self.pos_embedding(torch.arange(n, device=device)).unsqueeze(0)
        x = torch.cat((self.cls_token.expand(b, -1, -1), x), dim=1)
        x = self.blocks(x)
        x = self.l_head(x[:, 0])
        if targets is not None:
            loss = F.cross_entropy(x, targets)
        else:
            loss = None
        return x, loss
  
  
    def predict(self, x):
      return F.softmax(self.forward(x), dim=1)
    

In [105]:
from sklearn.model_selection import train_test_split

train_data, val_data = train_test_split(padded_data, test_size=0.2, shuffle=True)

In [108]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class Attention(nn.Module):
    def __init__(self, emb_dim, n_heads, dropout):
        super().__init__()
        self.att = nn.MultiheadAttention(emb_dim, n_heads, dropout=dropout)
        self.q_proj = nn.Linear(emb_dim, emb_dim)
        self.k_proj = nn.Linear(emb_dim, emb_dim)
        self.v_proj = nn.Linear(emb_dim, emb_dim)
        
    def forward(self, x):
        q = self.q_proj(x).transpose(0, 1)
        k = self.k_proj(x).transpose(0, 1)
        v = self.v_proj(x).transpose(0, 1)
        attn_output, _ = self.att(q, k, v)
        return attn_output.transpose(0, 1)

class FeedForward(nn.Sequential):
    def __init__(self, emb_dim, dropout):
        super().__init__(
            nn.Linear(emb_dim, emb_dim * 4),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(emb_dim * 4, emb_dim),
            nn.Dropout(dropout)
        )

class TransformerBlock(nn.Module):
    def __init__(self, emb_dim, n_heads, dropout):
        super().__init__()
        self.attention = Attention(emb_dim, n_heads, dropout)
        self.norm1 = nn.LayerNorm(emb_dim)
        self.ffn = FeedForward(emb_dim, dropout)
        self.norm2 = nn.LayerNorm(emb_dim)
        
    def forward(self, x):
        x = x + self.attention(self.norm1(x))
        x = x + self.ffn(self.norm2(x))
        return x

class BioTransformer(nn.Module):
    def __init__(self, emb_dim=64, out_dim=2, n_heads=4, n_layers=6, vocab_size=513, max_seq_len=240, dropout=0.1):
        super().__init__()
        self.token_embedding = nn.Embedding(vocab_size, emb_dim, padding_idx=vocab_size - 1)
        self.pos_embedding = nn.Embedding(max_seq_len + 1, emb_dim)
        self.cls_token = nn.Parameter(torch.randn(1, 1, emb_dim))
        self.layers = nn.Sequential(*[TransformerBlock(emb_dim, n_heads, dropout) for _ in range(n_layers)])
        self.norm = nn.LayerNorm(emb_dim)
        self.head = nn.Linear(emb_dim, out_dim)
        
    def forward(self, x, targets=None):
        b, n = x.size()
        x = self.token_embedding(x)
        positions = torch.arange(n, device=x.device).unsqueeze(0)
        x = x + self.pos_embedding(positions)
        cls_tokens = self.cls_token.expand(b, -1, -1)
        x = torch.cat((cls_tokens, x), dim=1)
        x = self.layers(x)
        x = self.norm(x)
        logits = self.head(x[:, 0])
        if targets is not None:
            loss = F.cross_entropy(logits, targets)
            return logits, loss
        else:
            return logits, None

In [109]:

model = BioTransformer().to(device)

optimizer = optim.AdamW(model.parameters(), lr=1e-3)
num_iters = 1000

for iter in range(num_iters):
    xb, yb = get_batch(train_data)
    logits, loss = model(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()

    if iter % 10 == 0:
        accuracy = estimate_accuracy(model, val_data)
        print(f'Iter {iter}, Loss: {loss.item()}, Accuracy: {accuracy}')
    
model_path = 'model.pth'
torch.save(model.state_dict(), model_path)

Iter 0, Loss: 0.6706021428108215, Accuracy: 0.5888625592417062
Iter 10, Loss: 0.6910193562507629, Accuracy: 0.41113744075829384
Iter 20, Loss: 0.5844273567199707, Accuracy: 0.5888625592417062


KeyboardInterrupt: 