In [1]:
!git clone https://github.com/vladislareon/Sparse_vector
!git clone https://github.com/vladislareon/z_dna

Cloning into 'Sparse_vector'...
remote: Enumerating objects: 8, done.[K
remote: Counting objects: 100% (8/8), done.[K
remote: Compressing objects: 100% (5/5), done.[K
remote: Total 8 (delta 0), reused 0 (delta 0), pack-reused 0[K
Unpacking objects: 100% (8/8), done.
Cloning into 'z_dna'...
remote: Enumerating objects: 2052, done.[K
remote: Counting objects: 100% (31/31), done.[K
remote: Compressing objects: 100% (25/25), done.[K
remote: Total 2052 (delta 8), reused 0 (delta 0), pack-reused 2021[K
Receiving objects: 100% (2052/2052), 1.75 GiB | 10.06 MiB/s, done.
Resolving deltas: 100% (8/8), done.
Checking out files: 100% (2024/2024), done.


In [22]:
!git clone https://github.com/vladislareon/Interpretation

Cloning into 'Interpretation'...
remote: Enumerating objects: 37, done.[K
remote: Counting objects: 100% (37/37), done.[K
remote: Compressing objects: 100% (35/35), done.[K
remote: Total 37 (delta 14), reused 2 (delta 0), pack-reused 0[K
Unpacking objects: 100% (37/37), done.


In [1]:
import numpy as np
import scipy
from tqdm import trange
from tqdm.notebook import tqdm
import sys
import os
import seaborn as sns
from matplotlib import pyplot as plt
import joblib
from joblib import Parallel, delayed, dump, load
from matplotlib import pyplot as plt
#import Sparse_vector
#sys.modules['sparse_vector'] = Sparse_vector
from Sparse_vector.sparse_vector import SparseVector
from Interpretation.lrp_layers import LRP

In [2]:
import torch
from torch.utils import data
from sklearn.preprocessing import LabelBinarizer
from sklearn.model_selection import train_test_split, StratifiedKFold

In [3]:
chroms = [f'chr{i}' for i in list(range(1, 23)) + ['X', 'Y','M']]
all_features = [i[:-4] for i in os.listdir('z_dna/hg38_features/sparse/') if i.endswith('.pkl')]
groups = ['DNase-seq', 'Histone', 'RNA polymerase', 'TFs and others']
feature_names = [i for i in all_features]

def chrom_reader(chrom):
    files = sorted([i for i in os.listdir(f'z_dna/hg38_dna/') if f"{chrom}_" in i])
    return ''.join([load(f"z_dna/hg38_dna/{file}") for file in files])

In [4]:
%%time
DNA = {chrom:chrom_reader(chrom) for chrom in tqdm(chroms)}
# ZDNA_shin = load('z_dna/hg38_zdna/sparse/ZDNA_shin.pkl')
# ZDNA_cousine = load('z_dna/hg38_zdna/sparse/ZDNA_cousine.pkl')

ZDNA = load('z_dna/hg38_zdna/sparse/ZDNA_cousine.pkl')

DNA_features = {feature: load(f'z_dna/hg38_features/sparse/{feature}.pkl')
                for feature in tqdm(feature_names)}

  0%|          | 0/25 [00:00<?, ?it/s]

  0%|          | 0/1946 [00:00<?, ?it/s]

CPU times: user 1min 25s, sys: 5.7 s, total: 1min 31s
Wall time: 1min 59s


In [5]:
class Dataset(data.Dataset):
    def __init__(self, chroms, features, 
                 dna_source, features_source, 
                 labels_source, intervals):
        self.chroms = chroms
        self.features = features
        self.dna_source = dna_source
        self.features_source = features_source
        self.labels_source = labels_source
        self.intervals = intervals
        self.le = LabelBinarizer().fit(np.array([["A"], ["C"], ["T"], ["G"]]))
        
    def __len__(self):
        return len(self.intervals)
    
    def __getitem__(self, index):
        interval = self.intervals[index]
        chrom = interval[0]
        begin = int(interval[1])
        end = int(interval[2])
        dna_OHE = self.le.transform(list(self.dna_source[chrom][begin:end].upper()))
        
        feature_matr = []
        for feature in self.features:
            source = self.features_source[feature]
            feature_matr.append(source[chrom][begin:end])
        if len(feature_matr) > 0:
            X = np.hstack((dna_OHE, np.array(feature_matr).T/1000)).astype(np.float32)
        else:
            X = dna_OHE.astype(np.float32)
        y = self.labels_source[interval[0]][interval[1]: interval[2]]
        
        return (X, y)
        
        

In [6]:
width = 100

np.random.seed(10)

ints_in = []
ints_out = []

for chrm in chroms:
    for st in trange(0, ZDNA[chrm].shape - width, width):
        interval = [st, min(st + width, ZDNA[chrm].shape)]
        if ZDNA[chrm][interval[0]: interval[1]].any():
            ints_in.append([chrm, interval[0], interval[1]])
        else:
            ints_out.append([chrm, interval[0], interval[1]])

ints_in = np.array(ints_in)
ints_out = np.array(ints_out)[np.random.choice(range(len(ints_out)), size=len(ints_in) * 3, replace=False)]

100%|███████████████████████████████████████████████████████████████████████████████████████████| 2489564/2489564 [00:35<00:00, 69540.21it/s]
100%|███████████████████████████████████████████████████████████████████████████████████████████| 2421935/2421935 [00:33<00:00, 72082.37it/s]
100%|███████████████████████████████████████████████████████████████████████████████████████████| 1982955/1982955 [00:27<00:00, 71829.95it/s]
100%|███████████████████████████████████████████████████████████████████████████████████████████| 1902145/1902145 [00:26<00:00, 72358.58it/s]
100%|███████████████████████████████████████████████████████████████████████████████████████████| 1815382/1815382 [00:25<00:00, 71032.63it/s]
100%|███████████████████████████████████████████████████████████████████████████████████████████| 1708059/1708059 [00:23<00:00, 71811.50it/s]
100%|███████████████████████████████████████████████████████████████████████████████████████████| 1593459/1593459 [00:20<00:00, 76325.16it/s]
100%|█

In [7]:
equalized = ints_in
equalized = [[inter[0], int(inter[1]), int(inter[2])] for inter in equalized]

In [8]:
train_inds, test_inds = next(StratifiedKFold().split(equalized, [f"{int(i < 400)}_{elem[0]}"
                                                                 for i, elem 
                                                                 in enumerate(equalized)]))

train_intervals, test_intervals = [equalized[i] for i in train_inds], [equalized[i] for i in test_inds]

train_dataset = Dataset(chroms, feature_names, 
                       DNA, DNA_features, 
                       ZDNA, train_intervals)

test_dataset = Dataset(chroms, feature_names, 
                       DNA, DNA_features, 
                       ZDNA, test_intervals)

In [9]:
params = {'batch_size':1,
          'num_workers':20,
          'shuffle':True}

loader_train = data.DataLoader(train_dataset, **params)
loader_test = data.DataLoader(test_dataset, **params)

# CNN Model

In [10]:
from torch import nn
import torch.nn.functional as F
from sklearn.metrics import roc_auc_score, f1_score
from IPython.display import clear_output

class DeepCNNLayerNorm_v2(nn.Module):
    def __init__(self):
        super().__init__()
        self.seq = nn.Sequential(
            
            nn.Conv2d(1, 3, kernel_size=(3, 3), padding=1),
            nn.LayerNorm([3, 100, 1950]),
            nn.ReLU(),
            
            nn.Conv2d(3, 5, kernel_size=(3, 3), padding=1),
            nn.LayerNorm([5, 100, 1950]),
            nn.ReLU(),
            
            
            nn.Conv2d(5, 7, kernel_size=(3, 3), padding=1),
            nn.LayerNorm([7, 100, 1950]),
            nn.ReLU(),
            
            
            nn.Conv2d(7, 9, kernel_size=(3, 3), padding=1),
            nn.LayerNorm([9, 100, 1950]),
            nn.ReLU(),
            
            
            nn.Conv2d(9, 11, kernel_size=(3, 3), padding=1),
            nn.LayerNorm([11, 100, 1950]),
            nn.ReLU(),
            
            nn.Conv2d(11, 13, kernel_size=(3, 3), padding=1),
            nn.LayerNorm([13, 100, 1950]),
            nn.ReLU(),
            
            nn.Conv2d(13, 13, kernel_size=(3, 3), padding=1),
            nn.ReLU(),
            
            nn.Conv2d(13, 11, kernel_size=(3, 3), padding=1),
            nn.LayerNorm([11, 100, 1950]),
            nn.ReLU(),
            
            nn.Conv2d(11, 9, kernel_size=(3, 3), padding=1),
            nn.LayerNorm([9, 100, 1950]),
            nn.ReLU(),
            
            nn.Conv2d(9, 7, kernel_size=(3, 3), padding=1),
            nn.LayerNorm([7, 100, 1950]),
            nn.ReLU(),
            
            nn.Conv2d(7, 5, kernel_size=(3, 3), padding=1),
            nn.LayerNorm([5, 100, 1950]),
            nn.ReLU(),
            
            nn.Conv2d(5, 3, kernel_size=(3, 3), padding=1),
            nn.LayerNorm([3, 100, 1950]),
            nn.ReLU(),
            
            nn.Conv2d(3, 1, kernel_size=(3, 3), padding=1),
            nn.LayerNorm([1, 100, 1950]),  # Укажите размеры после свертки
            nn.ReLU(),
            
            
            nn.Dropout(0.25),
            
            nn.Linear(1950, 500),
            nn.Dropout(0.25),
            nn.ReLU(),
            
            nn.Linear(500, 2)
        )

    def forward(self, x):
        batch = x.shape[0]
        x = x.reshape(batch, 1, width, 1950)
        x = self.seq(x)
        x = torch.squeeze(x)
        x = F.log_softmax(x, dim=-1)
        return x

In [11]:
torch.cuda.empty_cache()
device = torch.device('cuda:2')

In [12]:
model = DeepCNNLayerNorm_v2()
torch.cuda.empty_cache()
model.load_state_dict(torch.load("Cousine_DeepCNNLayerNorm_v2_interval=100_F1=0.759_epoch=20.pt"))
model = model.to(device)
model.eval()

DeepCNNLayerNorm_v2(
  (seq): Sequential(
    (0): Conv2d(1, 3, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): LayerNorm((3, 100, 1950), eps=1e-05, elementwise_affine=True)
    (2): ReLU()
    (3): Conv2d(3, 5, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): LayerNorm((5, 100, 1950), eps=1e-05, elementwise_affine=True)
    (5): ReLU()
    (6): Conv2d(5, 7, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (7): LayerNorm((7, 100, 1950), eps=1e-05, elementwise_affine=True)
    (8): ReLU()
    (9): Conv2d(7, 9, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (10): LayerNorm((9, 100, 1950), eps=1e-05, elementwise_affine=True)
    (11): ReLU()
    (12): Conv2d(9, 11, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (13): LayerNorm((11, 100, 1950), eps=1e-05, elementwise_affine=True)
    (14): ReLU()
    (15): Conv2d(11, 13, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (16): LayerNorm((13, 100, 1950), eps=1e-05, elementwise_affine=True)
  

# Captum methods

In [13]:
import captum
from captum.attr import IntegratedGradients, GradientShap, LayerGradCam, LRP, InputXGradient, GuidedBackprop, Deconvolution
from captum.attr import visualization as viz
import matplotlib.pyplot as plt
from matplotlib.colors import LinearSegmentedColormap

# Integrated Gradients

In [43]:
mean_ig1 = np.zeros(1950, dtype=float)
cnt_ig = 0

for x, y_true in tqdm(loader_train):
    # make prediction
    x, y_true = x.to(device), y_true.to(device).long()
    output = model(x)
    pred = torch.argmax(output, dim=1).reshape(1, width)

    # find True Positive indices
    idxs = []
    for i in range(width):
        if pred[0][i] == y_true[0][i] and y_true[0][i] == 1:
            idxs.append(i)

    # IntegratedGradients
    #torch.cuda.empty_cache()
    integrated_gradients = IntegratedGradients(model).attribute(x, target=1, n_steps=1)
    integrated_gradients = torch.squeeze(integrated_gradients, dim=0)
    
    if integrated_gradients[idxs, :].shape != (0, 1950):
        integrated_gradients = torch.mean(integrated_gradients[idxs, :], dim=0)
        integrated_gradients = np.array(integrated_gradients.cpu())
        #print(np.max(integrated_gradients))
        mean_ig1 += integrated_gradients
        cnt_ig += 1


# for test data
for x, y_true in tqdm(loader_test):
    # make prediction
    x, y_true = x.to(device), y_true.to(device).long()
    output = model(x)
    pred = torch.argmax(output, dim=1).reshape(1, width)

    # find True Positive indices
    idxs = []
    for i in range(width):
        if pred[0][i] == y_true[0][i] and y_true[0][i] == 1:
            idxs.append(i)

    # IntegratedGradients
    #torch.cuda.empty_cache()
    integrated_gradients = IntegratedGradients(model).attribute(x, target=1, n_steps=1)
    integrated_gradients = torch.squeeze(integrated_gradients, dim=0)
    
    if integrated_gradients[idxs, :].shape != (0, 1950):
        integrated_gradients = torch.mean(integrated_gradients[idxs, :], dim=0)
        integrated_gradients = np.array(integrated_gradients.cpu())
        #print(np.max(integrated_gradients))
        mean_ig1 += integrated_gradients
        cnt_ig += 1

print('done IntegratedGradients interpretation')

  0%|          | 0/36160 [00:00<?, ?it/s]

  0%|          | 0/9041 [00:00<?, ?it/s]

done IntegratedGradients interpretation


In [44]:
# mean for IntegratedGradients
mean_ig = mean_ig1 / cnt_ig
print(mean_ig)

[-1.45094543e+00  1.66814336e+01  1.60696188e+01 ...  0.00000000e+00
  1.12107443e-04  2.45436798e-06]


In [45]:
np.save('mean_ig_CNN_075.npy', mean_ig)

# InputXGradient

In [18]:
mean_ixg1 = np.zeros(1950, dtype=float)
cnt_ixg = 0

for x, y_true in tqdm(loader_train):
    # make prediction
    x, y_true = x.to(device), y_true.to(device).long()
    output = model(x)
    pred = torch.argmax(output, dim=1).reshape(1, width)

    # find True Positive indices
    idxs = []
    for i in range(width):
        if pred[0][i] == y_true[0][i] and y_true[0][i] == 1:
            idxs.append(i)

    # InputXGradient
    inputx_gradients = InputXGradient(model).attribute(x, target=1)
    inputx_gradients = torch.squeeze(inputx_gradients, dim=0)
    
    if inputx_gradients[idxs, :].shape != (0, 1950):
        inputx_gradients = torch.mean(inputx_gradients[idxs, :], dim=0)
        inputx_gradients = inputx_gradients.cpu().detach().numpy()
        #print(np.max(integrated_gradients))
        mean_ixg1 += inputx_gradients
        cnt_ixg += 1


# for test data
for x, y_true in tqdm(loader_test):
    # make prediction
    x, y_true = x.to(device), y_true.to(device).long()
    output = model(x)
    pred = torch.argmax(output, dim=1).reshape(1, width)

    # find True Positive indices
    idxs = []
    for i in range(width):
        if pred[0][i] == y_true[0][i] and y_true[0][i] == 1:
            idxs.append(i)

    # InputXGradient
    inputx_gradients = InputXGradient(model).attribute(x, target=1)
    inputx_gradients = torch.squeeze(inputx_gradients, dim=0)
    
    if inputx_gradients[idxs, :].shape != (0, 1950):
        inputx_gradients = torch.mean(inputx_gradients[idxs, :], dim=0)
        inputx_gradients = inputx_gradients.cpu().detach().numpy()
        #print(np.max(integrated_gradients))
        mean_ixg1 += inputx_gradients
        cnt_ixg += 1

print('done InputXGradient interpretation')

  0%|          | 0/36160 [00:00<?, ?it/s]

  0%|          | 0/9041 [00:00<?, ?it/s]

done InputXGradient interpretation


In [19]:
# mean for InputXGradient
mean_ixg = mean_ixg1 / cnt_ixg
print(mean_ixg)

[-8.51535648e-01  1.08634184e+01  7.69323746e+00 ...  0.00000000e+00
  1.45848921e-05 -5.00266947e-08]


In [20]:
np.save('mean_input_x_gradient_CNN_075.npy', mean_ixg)

# GuidedBackprop

In [31]:
mean_gbp1 = np.zeros(1950, dtype=float)
cnt_gbp = 0

for x, y_true in tqdm(loader_train):
    # make prediction
    x, y_true = x.to(device), y_true.to(device).long()
    output = model(x)
    pred = torch.argmax(output, dim=1).reshape(1, width)

    # find True Positive indices
    idxs = []
    for i in range(width):
        if pred[0][i] == y_true[0][i] and y_true[0][i] == 1:
            idxs.append(i)

    # GuidedBackprop

    gbp = GuidedBackprop(model).attribute(x, target=1)
    gbp = torch.squeeze(gbp, dim=0)
    
    if gbp[idxs, :].shape != (0, 1950):
        gbp = torch.mean(gbp[idxs, :], dim=0)
        gbp = np.array(gbp.cpu())
        #print(np.max(integrated_gradients))
        mean_gbp1 += gbp
        cnt_gbp += 1


# for test data
for x, y_true in tqdm(loader_test):
    # make prediction
    x, y_true = x.to(device), y_true.to(device).long()
    output = model(x)
    pred = torch.argmax(output, dim=1).reshape(1, width)

    # find True Positive indices
    idxs = []
    for i in range(width):
        if pred[0][i] == y_true[0][i] and y_true[0][i] == 1:
            idxs.append(i)

    # GuidedBackprop

    gbp = GuidedBackprop(model).attribute(x, target=1)
    gbp = torch.squeeze(gbp, dim=0)
    
    if gbp[idxs, :].shape != (0, 1950):
        gbp = torch.mean(gbp[idxs, :], dim=0)
        gbp = np.array(gbp.cpu())
        #print(np.max(integrated_gradients))
        mean_gbp1 += gbp
        cnt_gbp += 1

print('done GuidedBackprop interpretation')

  0%|          | 0/36160 [00:00<?, ?it/s]

  0%|          | 0/9041 [00:00<?, ?it/s]

done GuidedBackprop interpretation


In [32]:
# mean for GuidedBackprop
mean_gbp = mean_gbp1 / cnt_gbp
print(mean_gbp)

[ 5.38537559e-01  1.33175356e+00  2.05166828e+00 ... -9.81499988e-04
 -9.51754645e-04 -8.76049912e-04]


In [33]:
np.save('mean_guided_backprop_CNN_075.npy', mean_gbp)

# Deconvolution

In [18]:
mean_dec1 = np.zeros(1950, dtype=float)
cnt_dec = 0

for x, y_true in tqdm(loader_train):
    # make prediction
    x, y_true = x.to(device), y_true.to(device).long()
    output = model(x)
    pred = torch.argmax(output, dim=1).reshape(1, width)

    # find True Positive indices
    idxs = []
    for i in range(width):
        if pred[0][i] == y_true[0][i] and y_true[0][i] == 1:
            idxs.append(i)

    # Deconvolution

    dec = Deconvolution(model).attribute(x, target=1)
    dec = torch.squeeze(dec, dim=0)
    
    if dec[idxs, :].shape != (0, 1950):
        dec = torch.mean(dec[idxs, :], dim=0)
        dec = np.array(dec.cpu())
        #print(np.max(integrated_gradients))
        mean_dec1 += dec
        cnt_dec += 1

for x, y_true in tqdm(loader_test):
    # make prediction
    x, y_true = x.to(device), y_true.to(device).long()
    output = model(x)
    pred = torch.argmax(output, dim=1).reshape(1, width)

    # find True Positive indices
    idxs = []
    for i in range(width):
        if pred[0][i] == y_true[0][i] and y_true[0][i] == 1:
            idxs.append(i)

    # Deconvolution

    dec = Deconvolution(model).attribute(x, target=1)
    dec = torch.squeeze(dec, dim=0)
    
    if dec[idxs, :].shape != (0, 1950):
        dec = torch.mean(dec[idxs, :], dim=0)
        dec = np.array(dec.cpu())
        #print(np.max(integrated_gradients))
        mean_dec1 += dec
        cnt_dec += 1

print('done Deconvolution interpretation')

  0%|          | 0/36160 [00:00<?, ?it/s]

  0%|          | 0/9041 [00:00<?, ?it/s]

done Deconvolution interpretation


In [19]:
# mean for Deconvolution
mean_dec = mean_dec1 / cnt_dec
print(mean_dec)

[ 4.50489990e+00  5.78859345e+00  8.06691777e+00 ... -8.18742583e-03
 -7.97827533e-03 -7.27760765e-03]


In [20]:
np.save('mean_deconv_CNN_075.npy', mean_dec)

# LayerGradCam

In [46]:
mean_gcam1 = np.zeros(1950, dtype=float)
cnt_gcam = 0

for x, y_true in tqdm(loader_train):
    # make prediction
    x, y_true = x.to(device), y_true.to(device).long()
    output = model(x)
    pred = torch.argmax(output, dim=1).reshape(1, width)

    # find True Positive indices
    idxs = []
    for i in range(width):
        if pred[0][i] == y_true[0][i] and y_true[0][i] == 1:
            idxs.append(i)

    # GuidedGradCam
    
    layers = list(model.modules())[2:]
    #layers[5] = Conv2d(3, 1, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))

    #torch.cuda.empty_cache()
    grad_cam = LayerGradCam(model, layer=layers[5]).attribute(x, target=1)
    grad_cam = torch.squeeze(grad_cam, dim=0)
    grad_cam = torch.squeeze(grad_cam, dim=0)
    
    if grad_cam[idxs, :].shape != (0, 1950):
        grad_cam = torch.mean(grad_cam[idxs, :], dim=0)
        grad_cam = grad_cam.cpu().detach().numpy()
        mean_gcam1 += grad_cam
        cnt_gcam += 1

for x, y_true in tqdm(loader_test):
    # make prediction
    x, y_true = x.to(device), y_true.to(device).long()
    output = model(x)
    pred = torch.argmax(output, dim=1).reshape(1, width)

    # find True Positive indices
    idxs = []
    for i in range(width):
        if pred[0][i] == y_true[0][i] and y_true[0][i] == 1:
            idxs.append(i)

    # GuidedGradCam
    
    layers = list(model.modules())[2:]
    #layers[5] = Conv2d(3, 1, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))

    #torch.cuda.empty_cache()
    grad_cam = LayerGradCam(model, layer=layers[5]).attribute(x, target=1)
    grad_cam = torch.squeeze(grad_cam, dim=0)
    grad_cam = torch.squeeze(grad_cam, dim=0)
    
    if grad_cam[idxs, :].shape != (0, 1950):
        grad_cam = torch.mean(grad_cam[idxs, :], dim=0)
        grad_cam = grad_cam.cpu().detach().numpy()
        mean_gcam1 += grad_cam
        cnt_gcam += 1


print('done LayerGradCam interpretation')

  0%|          | 0/36160 [00:00<?, ?it/s]

  0%|          | 0/9041 [00:00<?, ?it/s]

done LayerGradCam interpretation


In [47]:
# mean for LayerGradCam
mean_gcam = mean_gcam1 / cnt_gcam
print(mean_gcam.shape)
print(mean_gcam)

(1950,)
[-0.00132167 -0.00194491 -0.00131476 ...  0.          0.
  0.        ]


In [48]:
np.save('mean_gcam_CNN_075.npy', mean_gcam)