In [1]:
import pandas as pd
import numpy as np
import random
import torch
import torch.nn as nn
import torch.nn.functional as F
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt
from scipy.stats import pearsonr
from sklearn.model_selection import KFold
import glob
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import time
data = pd.read_excel('/NAS/home/yychoi/research/capstone1/bio-ai-capstone/Training dataset.xlsx', engine = 'openpyxl')


In [2]:
data.head()

Unnamed: 0,Promoter,Reads
0,CACCTCCAAATTATCTAAGTTAGCCAAATACCAAGAAGATTGGGCA...,106
1,TTTTGGGGAAACCGGCCGGGTCAGATTTAATCAGCGGCAACATCAC...,643
2,CCAAACGATGCTGAAGTTTACCGTTGCTGGTGAGCAGCAATAGTCC...,372
3,CAGCAACTCTTAACGGGAAATCCCAATGGTCCCTGGCAGAAAAAAT...,834
4,CCTGAATATCTCCAGGGTTATACCGCCCCCGATGAAGCTTTTGTTT...,1496


In [3]:
def one_hot_encoding(df, seq_column, expression):
    bases = ['A','C','G','T']
    base_dict = dict(zip(bases,range(4)))
    n = len(df)
    # 데이터 잘림을 방지하기 위해 앞뒤로 빈 열을 추가(20line)
    total_width = df[seq_column].str.len().max()+20 
    # One-hot encoding된 nucleotide tensor 생성
    X = np.zeros((n,1,4,total_width))
    seqs = df[seq_column].values

    print(seqs)

    for i in range(n):
        seq = seqs[i]
        for b in range(len(seq)):
            X[i,0,base_dict[seq[b]], b+10+100-len(seq)] = 1    
    X = X.astype(np.float64)

    print(X)
    
    return X, total_width

X, total_width = one_hot_encoding(data,'Promoter','Reads')


['CACCTCCAAATTATCTAAGTTAGCCAAATACCAAGAAGATTGGGCAAAGAGGAAATCCCCCGCTAACACTGCCACCCGATTGTCAAATAAACTATTCACC'
 'TTTTGGGGAAACCGGCCGGGTCAGATTTAATCAGCGGCAACATCACCGCCCCAGCCCTATTTGCCATGGAAAAATATCCCCTACTTGGTAAATTAATTGA'
 'CCAAACGATGCTGAAGTTTACCGTTGCTGGTGAGCAGCAATAGTCCTTCGCTATCTTGATCCAAACGCCCCACGGGATATAAATCTGGCAAATTAATATA'
 ...
 'TGCTTAGTTGAATTAGCTATAAACTAAATCAGTCAATTAGTCAGCTAATTTAATTAGGGTTGTAACTTATTTGTTTGCTGATCAATGACATACTTAAGGA'
 'GGGGGCAAACAGGGAGGTAGTGGAGATCATCGGCTGGCCAAACTTGGCAGATGTGTTTACGAAAGTTTACATATCTTGATGCTTATTTTAAGATAACCTC'
 'GGCATCAGCGCAAAAGTAGAACAAAGTTCAGTCAAGGGCGGCGATCACCCCAAAAATCCAGAATCGACATTTTTAGTTCACAAGAGGTTATCTTAAAATA']
[[[[0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]]]


 [[[0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]]]


 [[[0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]]]


 ...


 [[[0. 0. 0. ... 0. 0. 0.]


In [4]:
X.shape

(3712, 1, 4, 120)

In [5]:
class VAE(nn.Module):
    def __init__(self, latent_dim):
        super(VAE, self).__init__()
        self.latent_dim = latent_dim

        # Encoder
        self.encoder = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=(4, 35), stride=(1, 1)),
            nn.ReLU(),
            nn.Conv2d(16, 16, kernel_size=(1, 21), stride=(1, 1)),
            nn.ReLU(),
            nn.Conv2d(16, 16, kernel_size=(1, 15), stride=(1, 1)),
            nn.ReLU(),
            nn.Flatten()
        )

        self.fc = nn.Linear(16 * 1 * 4 * 120, latent_dim * 2)

        # Decoder
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, 1 * 4 * 120),
            nn.ReLU()
        )
        self.decoder_conv = nn.Sequential(
            nn.ConvTranspose2d(1, 16, kernel_size=(1, 15), stride=(1, 1), padding='same'),
            nn.ReLU(),
            nn.ConvTranspose2d(16, 16, kernel_size=(1, 21), stride=(1, 1), padding='same'),
            nn.ReLU(),
            nn.ConvTranspose2d(16, 16, kernel_size=(1, 35), stride=(1, 1), padding='same'),
            nn.ReLU(),
            nn.ConvTranspose2d(16, 1, kernel_size=(4, 29), stride=1, padding='same'),
        )

    def encode(self, x):
        x = self.encoder(x)
        x = self.fc(x)
        mean, logvar = x.chunk(2, dim=1)
        return mean, logvar

    def reparameterize(self, mean, logvar):
        eps = torch.randn_like(mean)
        return eps * torch.exp(logvar * 0.5) + mean

    def decode(self, z):
        x = self.decoder(z)
        x = x.view(-1, 1, 4, 120)
        x = self.decoder_conv(x)
        return x

    def forward(self, x):
        mean, logvar = self.encode(x)
        z = self.reparameterize(mean, logvar)
        x_recon = self.decode(z)
        return x_recon, mean, logvar


In [6]:
def loss_function(recon_x, x, mu, log_var):
    BCE = F.binary_cross_entropy_with_logits(recon_x, x, reduction='sum')
    KLD = -0.5 * torch.sum(1 + log_var - mu.pow(2) - log_var.exp())
    return BCE + KLD

def train(model, dataloader, optimizer):
    model.train()
    train_loss = 0
    for i, data in enumerate(dataloader):
        data = data[0].to(device)
        optimizer.zero_grad()
        recon_batch, mu, log_var = model(data)
        loss = loss_function(recon_batch, data, mu, log_var)
        loss.backward()
        train_loss += loss.item()
        optimizer.step()
    return train_loss


In [7]:
epochs = 1000
latent_dim = 2
num_examples_to_generate = 10000

# In PyTorch, torch.randn is used to generate a tensor of random normal values
random_vector_for_generation = torch.randn(num_examples_to_generate, latent_dim)



In [8]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = VAE(latent_dim).to(device)
optimizer = optim.Adam(model.parameters(), lr=1e-4)

# Assuming X is your dataset
X_tensor = torch.tensor(X, dtype=torch.float32)
dataset = torch.utils.data.TensorDataset(X_tensor)
dataloader = DataLoader(dataset, batch_size=64, shuffle=True)

for epoch in range(1, epochs + 1):
    train_loss = train(model, dataloader, optimizer)
    print(f"Epoch {epoch}, Loss: {train_loss}")


RuntimeError: mat1 and mat2 shapes cannot be multiplied (64x832 and 7680x4)

In [None]:
# 6.    
    
def decoder2seq(x):
    seq = ''
    for i in range(np.shape(x)[-1]-20):
        Ascalar = x[0][0][i+10].numpy()
        Cscalar = x[0][1][i+10].numpy()
        Gscalar = x[0][2][i+10].numpy()
        Tscalar = x[0][3][i+10].numpy()
        maxcalar = max(Ascalar, Cscalar, Gscalar, Tscalar)
        if Ascalar == maxcalar:
            seq = seq+'A'
        elif Cscalar == maxcalar:
            seq = seq+'C'
        elif Gscalar == maxcalar:
            seq = seq+'G'
        else:
            seq = seq+'T'
    return seq

In [None]:
torch.save(model.state_dict(), 'cyano_model.pth')

In [None]:
# Ensure your model is in evaluation mode to disable dropout or batchnorm layers during inference
model.eval()

# Generate random vectors for generation
random_vector_for_generation = torch.randn(10000, 2, device=device)

# Decode each vector and convert to sequences
aplist = []
for i in range(10000):
    # Generate one sample at a time and move it to the correct device
    z = random_vector_for_generation[i].unsqueeze(0)
    
    # Decode the sample using the model
    with torch.no_grad():
        generated_seq, _, _ = model.decode(z)
    
    # Convert the tensor to a sequence string
    seq = decoder2seq(generated_seq.squeeze(0).cpu())
    aplist.append(seq)

# Create a DataFrame and save to Excel
ddff = pd.DataFrame(aplist, columns=['Generated promoters'])
ddff.to_excel('Generated promoter sequences 100bp.xlsx', index=False)
