In [5]:
import numpy as np 
import torch 
import torch.nn as nn
import pandas as pd
import matplotlib.pyplot as plt
import sys
sys.path.append("../")
from model import ConfigurableModel
from tqdm import tqdm
import time
import logomaker
import os
from typing import Callable
import gc

In [6]:
config = {"cnn_first_filter": 10, "cnn_first_kernel_size": 9, "cnn_length": 2, "cnn_filter": 64, "cnn_kernel_size": 5, "bilstm_layer": 3, "bilstm_hidden_size": 64, "fc_size": 256}
model = ConfigurableModel(input_channel=4, cnn_first_filter=config["cnn_first_filter"], cnn_first_kernel_size=config["cnn_first_kernel_size"],
                                cnn_length=config["cnn_length"], cnn_other_filter=config["cnn_filter"], cnn_other_kernel_size=config["cnn_kernel_size"], bilstm_layer=config["bilstm_layer"], bilstm_hidden_size=config["bilstm_hidden_size"], fc_size=config["fc_size"],
                                output_size=2)
#fold
fold = 1
model_weight = torch.load(f"/binf-isilon/renniegrp/vpx267/ucph_thesis/data/outputs/models/trained_model_{fold}th_fold_dual_outputs_m6_info-no_promoter-False_ONE_HOT_BEST_PARAM.pkl",
                          map_location=torch.device('cpu'))
model.load_state_dict(model_weight)
sys.path.append("/binf-isilon/renniegrp/vpx267/ucph_thesis/wrapper")
from wrapper import utils
seq_fasta_test_path = f"/binf-isilon/renniegrp/vpx267/ucph_thesis/data/dual_outputs/motif_fasta_test_SPLIT_{fold}.fasta"
seq_fasta_one_hot = utils.create_seq_tensor(seq_fasta_test_path)
meta_data_test_json_path = f"/binf-isilon/renniegrp/vpx267/ucph_thesis/data/dual_outputs/test_meta_data_SPLIT_{fold}.json"



In [87]:
one_hot_size_mb = sys.getsizeof(seq_fasta_one_hot.storage())/(1024**2)
one_hot_size_single_mb = sys.getsizeof(seq_fasta_one_hot[0,:,:].storage())/(1024**2)
# print(f"one hot data size: {one_hot_size_mb:,.3f}MB")
# param_size = 0
# for param in model.parameters():
#     param_size += param.nelement() * param.element_size()
# buffer_size = 0
# for buffer in model.buffers():
#     buffer_size += buffer.nelement() * buffer.element_size()

# size_all_mb = (param_size + buffer_size) / 1024**2
# print(f"model size: {size_all_mb:,.3f}MB")
# print(seq_fasta_one_hot.shape)
# # Point wise mutation will generate 4*seq_length new sequences
# print(f"Estimated size of total mutated data: {one_hot_size_mb*4*seq_fasta_one_hot.shape[2]*0.001:,.3f}GB")
# print(f"Estimated size of single mutated sequence: {one_hot_size_single_mb*4*seq_fasta_one_hot.shape[2]*0.001:,.3f}GB")

one hot data size: 419.777MB
model size: 62.971MB
torch.Size([27483, 4, 1001])
Estimated size of total mutated data: 1,680.786GB
Estimated size of single mutated sequence: 1,680.786GB


We can see that the size is too much for bulk analysis if we want to mutate all of the possible bases for each position. Hence we just need to focus on location of interest i.e. 50 up-down stream. Let's make estimation

In [9]:
slice_size = 100
print(f"Estimated size of fully mutated data: {one_hot_size_mb*4*slice_size*0.001:,.3f}GB")

Estimated size of fully mutated data: 167.911GB


In [13]:
def mutagenesis(x: torch.Tensor, model: torch.nn.Module, mutation_size :int=50, class_index: None|int=None, verbose: bool=True) -> np.ndarray:
    """ 
    in silico mutagenesis

    input: x: one-hot-encoded sequences with shape of (batch, seq_length, 4)
    input: model: trained model 

    """
    device = (torch.device("cuda") if torch.cuda.is_available()
            else torch.device("cpu"))

    if device == torch.device("cuda"):
        if not x.is_cuda:
            x = x.cuda()
        if next(model.parameters()).device == torch.device("cpu"): 
            model = model.cuda()
    
    model.eval()

    def generate_mutagenesis(x: torch.Tensor, mutation_size: int) -> np.ndarray:
        """
        Create a single point mutation at each position of the input sequence. 
        
        param: x: one-hot-encoded sequences with shape of (batch, seq_length, 4)
        return: list of mutagenized one-hot-encoded sequences with shape of (batch*seq_length*4, seq_length, 4)  
        """
        _,L,A = x.shape 
        mid = L//2
        x_mut = np.zeros((mutation_size*2+1, L, A))
        x_cpu = x.cpu().numpy()
        k = 0
        for l in tqdm(range(mid-mutation_size, mid+mutation_size+1)):
            for a in range(A):
                x_new = np.copy(x_cpu) # move to CPU if x in GPU
                x_new[0,l,:] = 0
                x_new[0,l,a] = 1
                print(x_mut[k].shape)
                print(x_new.shape)
                x_mut[k] = x_new
                k += 1
        return x_mut

    def reconstruct_map(x: torch.Tensor, predictions: np.ndarray, mutation_size: int) -> np.ndarray:
        """
        Reconstruct the mutagenezied score

        param: x: one-hot-encoded sequences with shape of (batch, seq_length, 4)
        predictions: model predictions with shape of (batch*seq_length*4)

        return: reconstructed mut_score with shape of (1, seq_length, 4)
        """
        _,L,A = x.shape 
        mid = L//2
        
        mut_score = np.zeros((1,L,A))
        k = 0
        for l in tqdm(range(mid-mutation_size, mid+mutation_size+1)):
            for a in range(A):
                mut_score[0,l,a] = predictions[k]
                k += 1
        return mut_score

    def get_score(x: torch.Tensor, model: torch.nn.Module, class_index: int=None, batch_size=256) -> np.ndarray:
      """
      Get score from the model and process based on class_index.
      """
      x_loader = torch.utils.data.DataLoader(x, batch_size=batch_size, shuffle=False)
      score = torch.Tensor().to("cpu")

      # Batching to prevent the GPU from running out of memory
      epoch_iterator = tqdm(x_loader, desc="Data Loader Iteration")
      for _, data in enumerate(epoch_iterator):
          pred = model.predict(data)
          torch.cat([score, pred.cpu().detach()])
      score = score.numpy()
      if class_index == None:
          # Square root of sum of squares of all classes
          score = np.sqrt(np.sum(score**2, axis=-1, keepdims=True)) 
      else:
          # Choosing class based on class_index
          score = score[:,class_index]
      return score.cpu().numpy()

    with torch.no_grad():
        # generate mutagenized sequences
        if verbose:
            print("Generating mutagenized sequences...")
        x_mut = generate_mutagenesis(x, mutation_size)
        
        # get baseline wildtype score
        if verbose:
            print("Getting baseline wildtype score...")
        wt_score = get_score(x, model, class_index) # [batch_size,1] 
        predictions = get_score(x_mut, model, class_index) # [batch_size * 4, 1]

        # reshape mutagenesis predictiosn
        if verbose:
            print("Reconstructing mutagenized score...")
        mut_score = reconstruct_map(x, predictions, mutation_size) # (1, seq_length, 4)

    # Back to cpu
    if device == torch.device("cuda"):
      x = x.cpu()
      model = model.cpu()

    return mut_score - wt_score # Non-mutagenized difference would be zero 

In [14]:
ctrl_mutant_score_diff = mutagenesis(x=seq_fasta_one_hot.transpose(1,2), model=model, class_index=0)
# case_mutant_score_diff = mutagenesis(seq_fasta_one_hot, model, class_index=1)

Generating mutagenized sequences...


  0%|          | 0/101 [00:00<?, ?it/s]


(1001, 4)
(27483, 1001, 4)


ValueError: could not broadcast input array from shape (27483,1001,4) into shape (1001,4)

In [73]:
def generate_mutagenesis(x: torch.Tensor) -> np.ndarray:
    """
    Create a single point mutation at each position of the input sequence. 
    
    param: x: one-hot-encoded sequences with shape of (batch, seq_length, 4)
    return: list of mutagenized one-hot-encoded sequences with shape of (batch*seq_length*4, seq_length, 4)  
    """
    _,L,A = x.shape 
    mid = L//2
    x_mut = []
    for l in range(L):
        for a in range(A):
            x_new = np.copy(x.cpu()) # move to CPU if x in GPU
            x_new[0,l,:] = 0
            x_new[0,l,a] = 1
            x_mut.append(x_new)
    return np.concatenate(x_mut, axis=0)

In [83]:
x = torch.tensor([[[1,0,0,0],[0,1,0,0],[0,1,0,0],[0,1,0,0],[0,1,0,0],[0,1,0,0],[0,1,0,0]]])
print(x.shape)
# print(x)
coba = generate_mutagenesis(x)
print(coba.shape)

torch.Size([1, 7, 4])
(28, 7, 4)


In [82]:
print(sys.getsizeof(x.storage()))
print(sys.getsizeof(torch.from_numpy(coba).storage()))

112
560
