In [None]:
!pip install pims

In [None]:
!pip install ruptures

In [None]:
import os
import gc

import random
from typing import Tuple

import numpy as np
import pandas as pd
from pims import ImageSequence
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torchvision.models as models
from torch.utils.data import Dataset, Subset, DataLoader

from scipy.ndimage.filters import gaussian_filter

import ruptures as rpt

device = "cuda" if torch.cuda.is_available() else "cpu"

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!unzip /content/drive/MyDrive/10changes-Copy1.zip
#unzip /content/drive/MyDrive/1_5.zip

In [None]:
path = '/content/10changes'

In [None]:
class CPDDatasets:
    """Class for experiments' datasets."""

    def __init__(self, experiments_name: str) -> None:
        """Initialize class.

        :param experiments_name: type of experiments (only mnist available now!)
        """
        super().__init__()

        # TODO make 
        if experiments_name in [
            "mnist"
        ]:
            self.experiments_name = experiments_name
        else:
            raise ValueError("Wrong experiment_name {}.".format(experiments_name))

    def get_dataset_(self) -> Tuple[Dataset, Dataset]:
        """Load experiments' dataset. Only MNIST available."""
        train_dataset = None
        test_dataset = None
        path_to_data = path # "/content/1changes/"

        dataset = MNISTSequenceDataset(path_to_data=path_to_data, type_seq="all")
        train_dataset, test_dataset = CPDDatasets.train_test_split_(
            dataset, test_size=0.3, shuffle=True
        )
        return train_dataset, test_dataset

    @staticmethod
    def train_test_split_(
        dataset: Dataset, test_size: float = 0.3, shuffle: bool = True
    ) -> Tuple[Dataset, Dataset]:
        """Split dataset on train and test.

        :param dataset: dataset for splitting
        :param test_size: size of test data
        :param shuffle: if True, shuffle data
        :return: tuple of
            - train dataset
            - test dataset
        """
        len_dataset = len(dataset)
        idx = np.arange(len_dataset)

        if shuffle:
            train_idx = random.sample(list(idx), int((1 - test_size) * len_dataset))
        else:
            train_idx = idx[: -int(test_size * len_dataset)]
        test_idx = np.setdiff1d(idx, train_idx)

        train_set = Subset(dataset, train_idx)
        test_set = Subset(dataset, test_idx)
        return train_set, test_set


class MNISTSequenceDataset(Dataset):
    """Class for Dataset consists of sequences of MNIST images."""

    def __init__(self, path_to_data: str, type_seq: str = "all") -> None:
        """Initialize datasets' parameters.

        :param path_to_data: path to folders with MNIST sequences
        :param type_seq: type of data for loading (only normal, only anomaly, all)
        """
        super().__init__()

        # set paths to data
        self.path_to_data = path_to_data
        self.path_to_normal = os.path.join(path_to_data, "normal/")
        self.path_with_change = os.path.join(path_to_data, "with_change/")

        self.normal_seq_paths = [
            os.path.join(self.path_to_normal, x)
            for x in os.listdir(self.path_to_normal)
        ]
        self.with_change_seq_paths = [
            os.path.join(self.path_with_change, x)
            for x in os.listdir(self.path_with_change)
        ]

        # load all sequences, only normal or only sequences with changes
        if type_seq == "all":
            self.sample_paths = self.normal_seq_paths + self.with_change_seq_paths
        elif type_seq == "normal":
            self.sample_paths = self.normal_seq_paths
        elif type_seq == "only_changes":
            self.sample_paths = self.with_change_seq_paths
        else:
            raise ValueError(
                'Unknown label type "{}". Please, choose one of {{all, normal, only_changes}}.'.format(
                    type_seq
                )
            )

    def __len__(self) -> int:
        """Get datasets' length.

        :return: length of dataset
        """
        return len(self.normal_seq_paths) + len(self.with_change_seq_paths)

    def __getitem__(self, idx: int) -> Tuple[np.array, np.array]:
        """Get one images' sequence and corresponding labels from dataset.

        :param idx: index of element in dataset
        :return: tuple of
             - sequence of images
             - sequence of labels
        """
        # read sequences of images
        path_img = self.sample_paths[idx]
        seq_images = ImageSequence(os.path.join(path_img, "*_*.png"))
        seq_images = np.transpose(seq_images, (0, 3, 1, 2))[:, 0, :, :].astype(float)#.reshape(64, 1, 28, 28)
        ################################################################
        
        seq_labels = sorted(os.listdir(path_img), key=lambda x: int(x.split("_")[0]))

        # get corresponding labels
        seq_labels = [int(x.split(".png")[0].split("_")[1]) for x in seq_labels]
        seq_labels = (np.array(seq_labels) != seq_labels[0]).astype(int)

        return seq_images, seq_labels

    @staticmethod
    def convert_to_gray_(frame: np.array) -> np.array:
        """Convert PIMS' images to gray scale. In MNIST case, all channels are equals.

        :param frame: image
        :return: image in gray scale
        """
        return frame[:, :, 0]

In [None]:
train_dataset, test_dataset = CPDDatasets(
            experiments_name='mnist'
        ).get_dataset_()


In [None]:


dataset = MNISTSequenceDataset(path_to_data=path, type_seq='all')
train_dataset, test_dataset = CPDDatasets.train_test_split_(dataset, test_size=0.3, shuffle=True)

In [None]:
train_dataloader = DataLoader( train_dataset, batch_size=64, shuffle=True)
test_dataloader = DataLoader( test_dataset, batch_size=64, shuffle=True)

In [None]:
figure = plt.figure(figsize=(10, 8))
sample_idx = torch.randint(len(train_dataset), size=(1,)).item()
cols, rows = 8, 8
plt.axis("off")

img = train_dataset[sample_idx][0]
label = train_dataset[sample_idx][1]
plt.title(label,  size=10, y=1.1)
for i in range(1, cols * rows + 1):
    figure.add_subplot(rows, cols, i)
    plt.axis("off")
    label = train_dataset[sample_idx][1][i-1]
    plt.title(label)
    plt.imshow(img.squeeze()[i-1], cmap="gray")

plt.subplots_adjust(wspace=1, hspace=1)

plt.show()

In [None]:
class NeuralNetwork(nn.Module):
  def __init__(self, input_size):
      super(NeuralNetwork, self).__init__()
      self.model_type = 'LinReg'
      

      self.l1 = nn.Linear(input_size, 1024)
      self.relu = nn.ReLU()
      self.l2 = nn.Linear(1024, 256)
      self.l3 = nn.Linear(256, 1)
      self.sigmoid = nn.Sigmoid()

  def forward(self, x):
     batch_size, seq_len = x.size()[:2]
     if x.type() != 'torch.FloatTensor':
            x = x.float() 
     x = x.flatten(2, -1)
     x = x.flatten(1, -1)
     out = self.l1(x)
     out = self.relu(out)
     out = self.l2(out)
     out = self.relu(out)
     out = self.l3(out)
     out = self.sigmoid(out)
     return out

In [None]:
####
class CNN_model(nn.Module):
  def __init__(self, in_channels=1, hidden_channels=8, out_channels=16, input_size=28, stride=2, window=8, batch_size=64):
      super(CNN_model, self).__init__()
      self.model_type = 'LinReg_CNN'
      self.window = window
      self.batch_size = batch_size
      self.conv1 = nn.Sequential(         
            nn.Conv2d(
                in_channels=in_channels,              
                out_channels=hidden_channels,            
                kernel_size=5,              
                stride=2, #1,                   
                padding=2,                  
            ),                              
            nn.ReLU(),                      
            #nn.MaxPool2d(kernel_size=1),    
        )
      self.conv2 = nn.Sequential(         
            nn.Conv2d(hidden_channels, out_channels, 5, 
                      #1,
                      2, 2),     
            nn.ReLU(),                      
            #nn.MaxPool2d(1),                
        )
      self.l1 = nn.Linear(int(self.window*out_channels*(input_size/(2*stride))**2), 1024)
      self.relu = nn.ReLU()
      self.l2 = nn.Linear(1024, 256)
      self.l3 = nn.Linear(256, 1)
      self.sigmoid = nn.Sigmoid()
      print(int(self.window*out_channels*(input_size/(2*stride))**2))
  def forward(self, x):
     #batch_size, seq_len = x.size()[:2]
     if x.type() != 'torch.FloatTensor':
            x = x.float() 
     out = self.conv1(x)
     out = self.conv2(out)
     
     out = out.view(out.size(0)//8, -1) 
     out = self.l1(out)
     out = self.relu(out)
     out = self.l2(out)
     out = self.relu(out)
     out = self.l3(out)
     out = self.sigmoid(out)
     return out

In [None]:
for batch in train_dataloader_window:
      X, y = batch[:, 0], batch[:, 1]
      X, y = np.vstack(X).astype(np.float), np.vstack(y.reshape(len(y), 1)).astype(np.float)
     
      X, y = torch.tensor(X,  dtype=torch.float).reshape(len(batch)*len(batch[0][0]),1, 28, 28).to(device), torch.tensor(y,  dtype=torch.float).to(device)
      break

In [None]:
X.shape

In [None]:
conv1 = nn.Sequential(         
            nn.Conv2d(
                in_channels=1,              
                out_channels=8,            
                kernel_size=5,              
                stride=2,                   
                padding=2,                  
            ),                              
            nn.ReLU(),                      
        )

conv2 = nn.Sequential(         
            nn.Conv2d(8, 16, 5, 2, 2),     
            nn.ReLU(),                      
        )

conv3 = nn.Sequential(         
            nn.Conv2d(16, 16*2, 5, 2, 2),     
            nn.ReLU(),                      
        )

print(X.shape)
out = conv1(X.cpu())
print(out.shape)
out = conv2(out)
print(out.shape)
out = conv3(out)
print(out.shape)

In [None]:
out = out.view(out.size(0)//8, -1) 

In [None]:
out.shape

In [None]:
window*32*(28//(2**3))**2

In [None]:
def get_dataset_for_logreg(train_data,window=8):
    data_x = []
    for  i in range(len(train_data)):
      #plt.plot(train_data[i][1])
      train_data[i][0] = np.array(train_data[i][0])/np.max(train_data[i][0])
      cpd = np.argmax(train_data[i][1])
      min_ = max(0, cpd- 2*window)
      max_ = min(len(train_data[i][0])-window-1,  cpd+1)
      for j in range( min_ ,max_):
        data_x.append([train_data[i][0][j:j+window], train_data[i][1][j+window]])
  
    return data_x


In [None]:
def find_1(x):
  for i in range(len(x)):
    if x[i] == 1:
      return i
  return len(x)

In [None]:
def get_data():
    generator = iter(train_dataloader)
    data_window = []
    for i in range(len(train_dataloader)):
        inputs,labels=next(iter(train_dataloader))
        seq_len = len(inputs)
        for seq in range(seq_len):
          if labels[seq].sum() != 0:
            output = my_resnet(torch.tensor(inputs[seq], dtype=torch.float, device=device).reshape(seq_len, 1, 28, 28) ).reshape(64)
            for j in range(seq_len - window ):
                data_window.append([output[j:j+window], labels[seq][j+window]])
            gc.collect()
    return data_window


In [None]:
def get_data_window(train_dataloader, window=8, my_resnet=None, blur=True, do_resnet=False):
    generator = iter(train_dataloader)
    data_window = []
    for i in range(len(train_dataloader)):
        inputs,labels=next(iter(train_dataloader))
        labels = labels.to(torch.float)
        batch_size = len(inputs)
        for seq in range(batch_size):
          seq_len = len(labels[seq])
          if labels[seq].sum() != 0:
              cp = find_1(labels[seq])
              labels[seq][cp+2:] = 0.
              labels[seq][cp-1] = 1.
              a = gaussian_filter(labels[seq].to(torch.float), sigma=1.5)
              labels[seq] = torch.Tensor(a)
              labels[seq][:cp] = 0.
              labels[seq][cp] = 1.
              labels[seq][cp+window-1:] = 0.
              output = inputs[seq].numpy()
              min_ = max(0, cp - window - 2)
              max_ = min(seq_len-window-1,  cp+1)
              for j in range( min_ ,max_):
                  data_window.append([output[j:j+window], labels[seq][j+window-1]])
        del inputs
    data_window = np.array(data_window)
    #data_window = np.random.shuffle(data_window)
    return data_window


In [None]:
#train_data = get_data_window(train_dataloader, my_resnet=my_resnet, blur=True)
#train_data = np.array(train_data)
#np.save('/content/drive/MyDrive/train_data_window.npy', train_data)

In [None]:
#test_data =  get_data_window(test_dataloader, my_resnet=my_resnet, blur=True)
#test_data = np.array(test_data)
#np.save('/content/drive/MyDrive/test_data_window.npy', test_data)

In [None]:
#train_data_resnet = np.load('/content/drive/MyDrive/train_data_window.npy', allow_pickle=True)
#test_data_resnet = np.load('/content/drive/MyDrive/test_data_window.npy', allow_pickle=True)

In [None]:
dataset = MNISTSequenceDataset(path_to_data=path, type_seq='all')
train_dataset, test_dataset = CPDDatasets.train_test_split_(dataset, test_size=0.3, shuffle=True)

In [None]:
train_dataloader = DataLoader( train_dataset, batch_size=64, shuffle=True)
test_dataloader = DataLoader( test_dataset, batch_size=64, shuffle=True)

In [None]:
test_data_window =  get_data_window(test_dataloader, blur=True)
train_data_window =  get_data_window(train_dataloader, blur=True)
np.random.shuffle(train_data_window)
np.random.shuffle(test_data_window)


In [None]:

import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec

fig = plt.figure(figsize=(10, 10))
outer = gridspec.GridSpec(9, 1, wspace=0.5, hspace=0.5)
sample_idx = 2+7+10
for i in range(9):
    inner = gridspec.GridSpecFromSubplotSpec(1, 8,
                    subplot_spec=outer[i], wspace=0.5, hspace=0.5)
    label = train_data_window[sample_idx+i][1]#[j+window]
    img = train_data_window[sample_idx+i][0]#[j:j+window]
    ax1 = plt.Subplot(fig, outer[i])
    ax1.set_title(label)
    ax1.set_xticks([])
    ax1.set_yticks([])
    fig.add_subplot(ax1)
    for j in range(8):
        ax = plt.Subplot(fig, inner[j])
        #t = ax.text(0.5,0.5, 'outer=%d, inner=%d' % (i, j))
        ax.imshow(img[j], cmap="gray")
        ax.set_xticks([])
        ax.set_yticks([])
        fig.add_subplot(ax)

fig.show()

In [None]:
class0 = 0
class1 = 0
for d in train_data_window:
  if d[1] < 0.1:
    class0 += 1
  else:
    class1 += 1
class0, class1, class0 + class1, len(train_data_window)

In [None]:
def batchify_data(data, batch_size=64, padding=False, padding_token=-1):
    batches = []
    
    for idx in range(0, len(data), batch_size):
        # We make sure we dont get the last bit if its not batch_size size
        if idx + batch_size < len(data):
            # Here you would need to get the max length of the batch,
            # and normalize the length with the PAD token.
            if padding:
                max_batch_length = 0

                # Get longest sentence in batch
                for seq in data[idx : idx + batch_size]:
                    if len(seq) > max_batch_length:
                        max_batch_length = len(seq)

                # Append X padding tokens until it reaches the max length
                for seq_idx in range(batch_size):
                    remaining_length = max_bath_length - len(data[idx + seq_idx])
                    data[idx + seq_idx] += [padding_token] * remaining_length
            
            batches.append(np.array(data[idx : idx + batch_size]))
    #print(f"{len(batches)} batches of size {batch_size}")

    return batches

In [None]:
def train_loop(model, opt, loss_fn, dataloader):
    model.train()
    total_loss = 0
    for batch in dataloader:
        X, y = batch[:, 0], batch[:, 1]
        X, y = np.vstack(X).astype(np.float), np.vstack(y.reshape(len(y), 1)).astype(np.float)
        if model.model_type == 'LinReg':
              X, y = torch.tensor(X,  dtype=torch.float).reshape(len(batch), len(batch[0][0]), 28, 28).to(device), torch.tensor(y,  dtype=torch.float).to(device)
        if model.model_type == 'LinReg_CNN':
              X, y = torch.tensor(X,  dtype=torch.float).reshape(len(batch)*len(batch[0][0]),1, 28, 28).to(device), torch.tensor(y,  dtype=torch.float).to(device)        
        pred = model(X)
        #weight = []
        #for k in range(len(y)):
        #      if y[k] < 0.5:
        #        weight.append( 0.1 )
        #      else:
        #        weight.append( 1. )
        #weight = torch.Tensor(weight).to(torch.float)
        #loss = weighted_mse_loss(pred, y, weight)
        loss = loss_fn(pred, y)
        opt.zero_grad()
        loss.backward()
        opt.step()
    
        total_loss += loss.detach().item()
        
    return total_loss / len(dataloader)

def validation_loop(model, loss_fn, dataloader):
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for batch in dataloader:
            X, y = batch[:, 0], batch[:, 1]
            X, y = np.vstack(X).astype(np.float), np.vstack(y.reshape(len(y), 1)).astype(np.float)
            if model.model_type == 'LinReg':
                #X, y = np.vstack(X).astype(np.float), np.vstack(y.reshape(len(y), 1)).astype(np.float)
                X, y = torch.tensor(X,  dtype=torch.float).reshape(len(batch), len(batch[0][0]), 28, 28).to(device), torch.tensor(y,  dtype=torch.float).to(device)
            if model.model_type == 'LinReg_CNN':
                X, y = torch.tensor(X,  dtype=torch.float).reshape(len(batch)*len(batch[0][0]),1, 28, 28).to(device), torch.tensor(y,  dtype=torch.float).to(device)

            pred = model(X)
            loss = loss_fn(pred, y)
            total_loss += loss.detach().item()
        
    return total_loss / len(dataloader)

def fit(model, opt, loss_fn, train_dataloader, val_dataloader, epochs):

    # Used for plotting later on
    train_loss_list, validation_loss_list = [], []
    
    #print("Training and validating model")
    for epoch in range(epochs):
        #clear_output(wait=True)
        print("-"*25, f"Epoch {epoch + 1}","-"*25)
        
        train_loss = train_loop(model, opt, loss_fn, train_dataloader)
        train_loss_list += [train_loss]
        
        validation_loss = validation_loop(model, loss_fn, val_dataloader)
        validation_loss_list += [validation_loss]
        
        print(f"Training loss: {train_loss:.4f}")
        print(f"Validation loss: {validation_loss:.4f}")
        print()
        
    return train_loss_list, validation_loss_list
 

In [None]:
def get_bound(x, tr=0.8, window=8):
  start = 0
  end = 0
  for i in range(1, len(x)):
    if x[i] > tr and x[i-1] <= tr:
      start = i
    if x[i] < tr and x[i-1] >= tr:
      end = i
  return [start, end]

In [None]:
train_dataloader_window = batchify_data(train_data_window)
val_dataloader_window = batchify_data(test_data_window)

In [None]:
def weighted_mse_loss(input, target, weight):
    return torch.sum(weight * (input - target) ** 2).mean()

In [None]:
device

In [None]:
####
class CNN_model(nn.Module):
  def __init__(self, in_channels=1, hidden_channels=8, out_channels=16, input_size=28, stride=2, window=8, batch_size=64):
      super(CNN_model, self).__init__()
      self.model_type = 'LinReg_CNN'
      self.window = window
      self.batch_size = batch_size
      self.conv1 = nn.Sequential(         
            nn.Conv2d(
                in_channels=in_channels,              
                out_channels=hidden_channels,            
                kernel_size=5,              
                stride=2, #1,                   
                padding=2,                  
            ),                              
            nn.ReLU(),                      
        )
      self.conv2 = nn.Sequential(         
            nn.Conv2d(hidden_channels, out_channels, 5, 
                      #1,
                      2, 2),     
            nn.ReLU(),                      
        )
      self.conv3 = nn.Sequential(         
            nn.Conv2d(out_channels, 2*out_channels, 5, 
                      #1,
                      2, 2),     
            nn.ReLU(),                      
        )
      
      self.l1 = nn.Linear(
          2*4096, #int(self.window*out_channels*(input_size/(2*stride))**2)
          1024)
      self.relu = nn.ReLU()
      self.l2 = nn.Linear(1024, 256)
      self.l3 = nn.Linear(256, 1)
      self.sigmoid = nn.Sigmoid()
      print(int(self.window*out_channels*(input_size/(2*stride))**2))
  
  def forward(self, x):
     #batch_size, seq_len = x.size()[:2]
     if x.type() != 'torch.FloatTensor':
            x = x.float() 
     out = self.conv1(x)
     out = self.conv2(out)
     out = self.conv3(out)
     
     out = out.view(out.size(0)//8, -1) 
     out = self.l1(out)
     out = self.relu(out)
     out = self.l2(out)
     out = self.relu(out)
     out = self.l3(out)
     out = self.sigmoid(out)
     return out

In [None]:
window = 8
model = CNN_model(in_channels=1, hidden_channels=16, out_channels=32, window=window).to(device)

#model = NeuralNetwork(window*28*28).to(device)

opt = torch.optim.SGD(model.parameters(), lr=0.001)#
#opt = torch.optim.Adam(model.parameters(), lr = 0.0005)
#opt = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.9)

loss_fn = nn.MSELoss() #    nn.CrossEntropyLoss() # 


In [None]:
train_loss_list, validation_loss_list = fit(model, opt, loss_fn, train_dataloader_window, val_dataloader_window, 50)

In [None]:
plt.plot(validation_loss_list, label='val')
plt.plot(train_loss_list, label='train')
plt.legend()

In [None]:
model.eval()

In [None]:
def get_n_data_exaples(test_dataloader, n=10, no_cpd=3, seq_len=64, window=8,     blur = True):
    data_examples = []
    inputs_examples = []
    gen = iter(test_dataloader)
    flag = 0
    flag64 = 0
    while len(data_examples) != n:
        inputs,labels=next(gen)
        #
        labels = labels.to(torch.float)
        batch_size = len(inputs)
        
        for seq in range( batch_size - 1):
          data_example = []
          seq_len = len(labels[seq])
          cp = find_1(labels[seq])
          if cp != seq_len and cp > window:
            labels[seq][cp+2:] = 0.
            labels[seq][cp-1] = 1.
            a = gaussian_filter(labels[seq].to(torch.float), sigma=1.5)
            labels[seq] = torch.Tensor(a)
            labels[seq][:cp] = 0.
            labels[seq][cp] = 1.
            labels[seq][cp+window-1:] = 0.
            inputs_seq = inputs[seq].numpy()
            inputs_examples.append(inputs_seq)
            gc.collect()
            flag += 1
            for j in range( 0 , seq_len-window):
                  data_example.append([inputs_seq[j:j+window], labels[seq][j+window-1]])
            data_examples.append(data_example)
          if cp == seq_len and flag64 < no_cpd:
            flag64 += 1
            flag += 1
            inputs_seq = inputs[seq].numpy()
            inputs_examples.append(inputs_seq)
            for j in range( 0 , seq_len-window):
                  data_example.append([inputs_seq[j:j+window], labels[seq][j+window-1]])
            data_examples.append(data_example)

          if flag == n:
            break
        
    return   data_examples, inputs_examples

In [None]:
data_examples, inputs_examples = get_n_data_exaples(test_dataloader)

In [None]:
fig, ax = plt.subplots(len(data_examples), figsize=(15,10))
k = -1
for data_example in data_examples:
    k += 1
    pred = []
    for j in range( len(data_example)):
        X = data_example[j][0]
        if model.model_type == 'LinReg_CNN':
          X = torch.tensor(X,  dtype=torch.float).reshape( X.shape[0], 1, X.shape[1], X.shape[2]).to(device)
        else:
          X = torch.tensor(X,  dtype=torch.float).reshape(1,   X.shape[0], X.shape[1], X.shape[2]).to(device)
        pred.append(model( X ).detach().cpu().numpy() )
    pred = np.array(pred)
    pred = pred.reshape(len(pred))

    true_labels = [data_example[i][1] for i in range(len(data_example))]
    ax[k].plot(pred, label = 'pred')
    ax[k].plot(true_labels, label='true')
    ax[k].legend()

    real = get_bound(true_labels, tr = 0.985)
    preds = get_bound(pred, tr = 0.5)
    input = inputs_examples[k]
    #rpt.display(output[window:], [real[0],  seq_len], [preds[0],  seq_len])

In [None]:
figure = plt.figure(figsize=(15, 8))
cols, rows = 8, 8
plt.axis("off")

img = inputs_examples[-1][window:]
label = true_labels[-1]
labels_blur = [data_example[i][1] for i in range(len(data_example))]

#plt.title(label,  size=10, y=1.1)
for i in range(1, len(img) ):
    figure.add_subplot(rows, cols, i)
    plt.axis("off")
    label = np.round(labels_blur[i].numpy(), 2, )
    plt.title('t '+str(label) + ', p ' + str(round(pred[i], 2))) 
    plt.imshow(img.squeeze()[i-1], cmap="gray")

plt.subplots_adjust(wspace=1, hspace=1)

plt.show()

In [None]:
data_examples, inputs_examples  = get_n_data_exaples(test_dataloader, n=1000, no_cpd=500)

In [None]:
len(data_examples)

In [None]:
def get_recall(data_examples, model, threshold = 0.3):
    k = -1
    recalls = []
    for data_example in data_examples:
        k += 1
        pred = []
        for j in range( len(data_example)):
            X = data_example[j][0]
            if model.model_type == 'LinReg_CNN':
              X = torch.tensor(X,  dtype=torch.float).reshape( X.shape[0], 1, X.shape[1], X.shape[2]).to(device)
            else:
              X = torch.tensor(X,  dtype=torch.float).reshape(1,   X.shape[0], X.shape[1], X.shape[2]).to(device)
            pred.append(model( X ).detach().cpu().numpy() )
        pred = np.array(pred).reshape(len(pred))
        true_labels = np.array([data_example[i][1] for i in range(len(data_example))])
        
        # recall = TP / Total Actual Positive
        total_p = np.array([true_labels > threshold]).sum()
        if total_p != 0:
            recall =  (np.array([pred > threshold])*np.array([true_labels > threshold ])).sum() / total_p
            recalls.append(recall)
    #print(recalls)
    return np.mean(recalls)

In [None]:
def get_pred_width(data_examples, model, threshold = 0.3):
    k = -1
    pred_widths = []
    for data_example in data_examples:
        k += 1
        pred = []
        for j in range( len(data_example)):
            X = data_example[j][0]
            if model.model_type == 'LinReg_CNN':
              X = torch.tensor(X,  dtype=torch.float).reshape( X.shape[0], 1, X.shape[1], X.shape[2]).to(device)
            else:
              X = torch.tensor(X,  dtype=torch.float).reshape(1,   X.shape[0], X.shape[1], X.shape[2]).to(device)
            pred.append(model( X ).detach().cpu().numpy() )
        pred = np.array(pred).reshape(len(pred))
        pred_width = np.array([pred > threshold]).sum()
        true_labels = np.array([data_example[i][1] for i in range(len(data_example))])
        total_p = np.array([true_labels > threshold]).sum()
        if total_p != 0:
            pred_widths.append(pred_width)
    #print(pred_widths)
    return np.mean(pred_widths)

In [None]:
def get_metric(test_dataloader, model,  n=200, no_cpd=100, threshold=0.1 ):
    data_examples, inputs_examples = get_n_data_exaples(test_dataloader)
    fig, ax = plt.subplots(len(data_examples), figsize=(15,10))
    k = -1
    for data_example in data_examples:
        k += 1
        pred = []
        for j in range( len(data_example)):
            X = data_example[j][0]
            if model.model_type == 'LinReg_CNN':
              X = torch.tensor(X,  dtype=torch.float).reshape( X.shape[0], 1, X.shape[1], X.shape[2]).to(device)
            else:
              X = torch.tensor(X,  dtype=torch.float).reshape(1,   X.shape[0], X.shape[1], X.shape[2]).to(device)
            pred.append(model( X ).detach().cpu().numpy() )
        pred = np.array(pred)
        pred = pred.reshape(len(pred))

        true_labels = [data_example[i][1] for i in range(len(data_example))]
        ax[k].plot(pred, label = 'pred')
        ax[k].plot(true_labels, label='true')
        ax[k].legend()

        real = get_bound(true_labels, tr = 0.985)
        preds = get_bound(pred, tr = 0.5)
        input = inputs_examples[k]
    plt.show()

    data_examples, inputs_examples = get_n_data_exaples(test_dataloader, n, no_cpd)
    recall = get_recall(data_examples, model, threshold)
    pred_width = get_pred_width(data_examples, model, threshold)
    return recall, pred_width

In [None]:
recall, pred_width = get_metric(test_dataloader, model)
recall, pred_width

In [None]:
torch.save( model, '/content/drive/MyDrive/colab_anna/DLS/model10.ptр')