<a href="https://colab.research.google.com/github/ErikFantomex/Segmentacion_Angiogramas/blob/main/Proyectodelfinv2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Introducción

El proyecto explora técnicas de visión por computadora como el modelo de segmentacion gaussiana y el modelo de aprendizaje profundo U-Net. 

En un esfuerzo por segmentar angiogramas.

In [None]:
import numpy as np
import pandas as pd  
import imageio
from skimage import img_as_float32 as as_float
from natsort import natsorted
from glob import glob
import matplotlib.pyplot as plt
import seaborn as sns
import random
from scipy.stats import multivariate_normal
import cv2
import pickle
from sklearn.metrics import accuracy_score
import time
from skimage import exposure
from functools import partial
from tqdm import tqdm
tqdm = partial(tqdm, position=0, leave=True)
from sklearn.metrics import confusion_matrix
from sklearn.metrics import cohen_kappa_score
from sklearn.model_selection import KFold,train_test_split 
from sklearn.metrics import f1_score
plt.gray()

<Figure size 432x288 with 0 Axes>


#Explorando la base de datos 


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
#reading the dataset
#/content/drive/MyDrive/datasets/DB_134/Angiograms

path_pairs = list(zip(
natsorted(glob('/content/drive/MyDrive/datasets/AngioDB/normal/*.png')),
natsorted(glob('/content/drive/MyDrive/datasets/AngioDB/masks/*.png')),
))
img = np.array([as_float(imageio.imread(ipath)) for ipath, _ in path_pairs])
msk = np.array([as_float(imageio.imread(mpath)) for _, mpath in path_pairs])

print('There are',img.shape[0],'images and',msk.shape[0],'masks')
print('The shape of our data is',img.shape,'and',msk.shape)

There are 135 images and 135 masks
The shape of our data is (135, 768, 1024) and (135, 768, 1024)


#Explorando el conjunto de datos.

In [None]:
SIZE = (768,1024)

assert img.shape == (135, SIZE[0], SIZE[1])
assert msk.shape == (135, SIZE[0], SIZE[1])

SCALE = 0.40

images = [cv2.resize(img, None, fx=SCALE, fy=SCALE) for img in tqdm(img, 'Resizing Images')]
masks1 = [cv2.resize(img, None, fx=SCALE, fy=SCALE) for img in tqdm(msk, 'Resizing Masks')]

masks = []#umbralizar la máscara a 0 y 1 ya que después de reescalar la máscara tendrá diferentes valores 
for i in range(len(masks1)):
    mask = cv2.threshold(masks1[i],0,1,cv2.THRESH_BINARY)[1]
    masks.append(mask)
    
print('\n', img[0].shape, '->', images[0].shape)

Resizing Images: 100%|██████████| 135/135 [00:00<00:00, 1606.32it/s]
Resizing Masks: 100%|██████████| 135/135 [00:00<00:00, 1314.33it/s]


 (768, 1024) -> (307, 410)





#Diviendo dataset

In [None]:

VALIDATION_needed = True 
def train_test_valid_split(all_X, all_Y,PERC_TRAIN = 0.65,PERC_VALIDATION = 0.14,PERC_TEST = 0.13): 
    '''
    returs the desired data split ( 34 training images, 7 validation and 7 test images)
    
    params: 
    - all_X image data
    - all_Y mask or ground truth data
    - PERC_VALIDATION percentage for validation data
    - PERC_TEST percentage for testing data
    - PERC_TRAIN percentage for training data
    
    '''
    left_X, valid_X, left_y, valid_y = train_test_split(all_X, all_Y, test_size=PERC_VALIDATION) 
    train_X, test_X, train_y, test_y = train_test_split(left_X, left_y, test_size=PERC_TEST / (PERC_TEST + PERC_TRAIN)) 
    return test_X, test_y, train_X, train_y, valid_X, valid_y 
 
if VALIDATION_needed: 
    test_X, test_y, train_X, train_y, valid_X, valid_y = train_test_valid_split(images, masks) 
else: 
    train_X, test_X, train_y, test_y = train_test_split(images, masks) 

In [None]:
print('Despues de divir las fotos:',len(train_X),
      'training images',len(valid_X),
      'validation and',len(test_X),'test images')

Despues de divir las fotos: 96 training images 19 validation and 20 test images


#Metodo de segmentacion gaussiana

*  http://www.oranlooney.com/post/ml-from-scratch-part-5-gmm/




In [None]:
class GMM_scratch:
    '''
       
        La implementacion de la clase Segmentacion gaussiana esta basada en la funcion homonima sklearn.mixture.GaussianMixture
        params: 
       
        data - the image data to train in the EM step of the GMM
        numb_components - the number of clusters to train the GMM
        init_strategy - method to initialize the means of the GMM. 0 is the normal initialization method
        
        
        fuentes :
        https://github.com/ScienceKot/mysklearn/blob/master/Gaussian%20Mixture%20Models/GMM.py
        https://stackoverflow.com/questions/44913009/initialize-gmm-using-sklearn-python
        https://github.com/SeoHyeong/GMM_image-segmentation/blob/master/GMM_image%20segmentation.ipynb
        https://github.com/roger8587/Image-Segmentation-Using-Gaussian-Mixture-Model-with-PyTorch/blob/main/run.ipynb

    '''
    
    
    def __init__(self, data, numb_components,init_strategy):#Step1 initialization of the main variables
        
        self.valuesforplot = []
        self.data = data
        self.numb_components = numb_components
        
        self.H = self.data.shape[1]#
        self.W = self.data.shape[0]#
        
        self.lamda = np.ones((self.numb_components))/self.numb_components
        self.means = np.zeros((self.numb_components,self.H))
        #create distances variable 
        if(init_strategy == 0):#Inicializacion aleatoria
            for i in range(self.numb_components):
                soma = random.choice(self.data)
                self.means[i] = soma
            
        else:# Inicializacion kmeans 
            from sklearn.cluster import KMeans
            km = KMeans(numb_components).fit(self.data)
            self.means = km.cluster_centers_
            
        self.covariance = np.zeros((self.numb_components,self.H,self.H))#Creando el arreglo de covarianza con las dimensiones necesarios 
        self.covariance[:] = np.cov(self.data.T)#Rl arreglo que contiene toda la informacion de covarianza

        self.responsavel = np.zeros((self.W, self.numb_components))#responsavel es simplemente la matriz de responsabilidad dado el número de canales y componentes en la matriz
        
    #-------------------------------------------Norm-Step------------------------------------------------------------     
    def normalvariant(self):
        '''
        Funcion para calcular la distribucion normal , como en la clase GMM en sklearn
        '''
        numb_components = self.numb_components
        self.normal = []
        for i in range(numb_components):
            self.normal.append(multivariate_normal(mean=self.means[i], cov=self.covariance[i]))
    #-------------------------------------------E-Step-------------------------------------------------------------- 
    def Expectation(self):#necesito entender esta parte
        numb_components = self.numb_components
        self.normalvariant()
        
        for i, points in enumerate(self.data):
            for j in range(numb_components):
                
                self.responsavel[i,j] = self.lamda[j]*self.normal[j].pdf(points)#Calculando responsabilidades
            self.responsavel[i,:] /= self.responsavel[i, :].sum()#distribution usando los puntos de la distribucion normal 
                                                                #del vector de caracteristicas
            
    #-------------------------------------------M-Step---------------------------------------------------------------
    def Maximization(self):#necesito entender esta parte
        numb_components = self.numb_components

        responsavel_sum = self.responsavel.sum(0)
        
        self.lamda = responsavel_sum/float(self.responsavel.sum())

        for i in range(self.numb_components):
            self.means[i] = self.responsavel[:,i].dot(self.data)/responsavel_sum[i]
            
        for i in range(self.numb_components):
            temp = np.zeros((self.H, self.H))
            for j in range(self.W):
                t = self.data[j, :] - self.means[i]
                temp += self.responsavel[j,i]*np.outer(t,t)# Creando la matriz de identidad 
            self.covariance[i] = temp/responsavel_sum[i]#La matrix covariancia para evitar el fallo singularity 
            
        #Reinicio de la covarirancia y la media representadas en el enlace de abajo por si surgen fallos adicionales

        #https://stats.stackexchange.com/questions/219302/singularity-issues-in-gaussian-mixture-model
        if(np.isnan(self.covariance[i]).any()== True):
            self.covariance[i] = np.cov(self.data.T)#calculamos de nuevo  la covariancia 
                
        for i in range(self.numb_components):
            soma = random.choice(self.data)
            self.means[i] = soma#Media aleatoria
            
    #-------------------------------------------Train or fit Step----------------------------------------------------- 
    def fit(self, tolerance=0.01, max_epoch=35):
        distance = tolerance
        epoch = 0
        while ((distance >= tolerance) and (epoch != max_epoch)):
            old_means = self.means.copy()
            self.Expectation()
            self.Maximization()
            distance = np.linalg.norm(old_means - self.means)
            print(epoch, distance)
            epoch += 1
            array = np.array([epoch,distance])
            self.valuesforplot.append(array)
   #-------------------------------------------Likelihood-Step--------------------------------------------------------
    def Proba(self,image):
    
        Probabilities = np.zeros((image.shape[:-1]))
        numb_components = self.numb_components
        for i in range(numb_components):
            Probabilities += self.lamda[i] * multivariate_normal(mean=self.means[i], cov=self.covariance[i]).pdf(image)
        return Probabilities 

#Vector de caracteristicas
Un vector de características puede considerarse una colección de características (características importantes de la imagen estudiada) que se extrae y forma de matriz. 





# Arquitectura U-net



In [None]:
import torch
import torch.nn as nn
#https://github.com/jbrussellai/UWMGI-Pytorch-Unet

In [None]:
def doubleConv(in_c,out_c):
  conv = nn.Sequential(nn.Conv2d(in_c,out_c,3),
                       ## can even add BatchNorm here and set bias=False in Conv2d()
                       nn.ReLU(inplace=True),
                       nn.Conv2d(out_c,out_c,3),
                       ## can even add BatchNorm here and set bias=False in Conv2d()
                       nn.ReLU(inplace=True)
                       )
  return conv




## here crop_img is not a good resolve as division operation can throw us in some loopholes
## so there we 2 crop_img created as per the size and output changed because of the div operation
def crop_img(tensor,target_tensor):
  target_size = target_tensor.size()[2] # t_s = 100
  tensor_size = tensor.size()[2]        # t_s = 135
  delta = tensor_size-target_size   
  # print("delta was : ",delta)           # delta = 35
  delta = delta//2                      # delta was 17
  # print("delta became :",delta)          
  return tensor[:,:,delta:tensor_size-delta-1,delta:tensor_size-delta-1]

def crop_img2(tensor,target_tensor):
  target_size = target_tensor.size()[2] # t_s = 100
  tensor_size = tensor.size()[2]        # t_s = 135
  delta = tensor_size-target_size   
  # print("delta was : ",delta)           # delta = 35
  delta = delta//2                      # delta was 17
  # print("delta became :",delta)          
  return tensor[:,:,delta:tensor_size-delta,delta:tensor_size-delta]

In [None]:
class UNET(nn.Module):
  def __init__(self):
    super(UNET,self).__init__()
    self.maxPool = nn.MaxPool2d(3,2)

    self.down_conv_1 = doubleConv(1,64)
    self.down_conv_2 = doubleConv(64,128)
    self.down_conv_3 = doubleConv(128,256)
    self.down_conv_4 = doubleConv(256,512)
    self.down_conv_5 = doubleConv(512,1024)

    self.trans_up_1 = nn.ConvTranspose2d(1024,512,2,2)
    self.up_conv_1 = doubleConv(1024,512)

    self.trans_up_2 = nn.ConvTranspose2d(512,256,2,2)
    self.up_conv_2 = doubleConv(512,256)

    self.trans_up_3 = nn.ConvTranspose2d(256,128,2,2)
    self.up_conv_3 = doubleConv(256,128)

    self.trans_up_4 = nn.ConvTranspose2d(128,64,2,2)
    self.up_conv_4 = doubleConv(128,64)

    self.out = nn.Conv2d(64,2,1)



  def forward(self,image):
    # ENCODER DONE
    x1 = self.down_conv_1(image)
    x2 = self.maxPool(x1)
    x3 = self.down_conv_2(x2)
    x4 = self.maxPool(x3)
    x5 = self.down_conv_3(x4)
    x6 = self.maxPool(x5)
    x7 = self.down_conv_4(x6)
    x8 = self.maxPool(x7)
    x9 = self.down_conv_5(x8)
    # print(x9.size())

    #DECODER
    x = self.trans_up_1(x9)
    # print(x.size())
    y = crop_img(x7,x)
    # print(y.size())
    # print(torch.cat([x,y],1).size())
    x = self.up_conv_1(torch.cat([x,y],1))
    # print(x.size())
    x = self.trans_up_2(x)
    # print(x.size())
    # print(x5.size())
    y = crop_img(x5,x)
    # print(y.size())
    x = self.up_conv_2(torch.cat([x,y],1))
    print(x.size())
    x = self.trans_up_3(x)
    y = crop_img(x3,x)
    x = self.up_conv_3(torch.cat([x,y],1))

    x = self.trans_up_4(x)
    y = crop_img2(x1,x)
    x = self.up_conv_4(torch.cat([x,y],1))

    x = self.out(x)

    print(x.size())
    return x

In [None]:
image = torch.rand((1,1,572,572))
model = UNET()
print(model(image))

torch.Size([1, 256, 96, 96])
torch.Size([1, 2, 372, 372])
tensor([[[[ 0.0168,  0.0142,  0.0176,  ...,  0.0107,  0.0120,  0.0113],
          [ 0.0156,  0.0191,  0.0167,  ...,  0.0154,  0.0124,  0.0088],
          [ 0.0152,  0.0147,  0.0166,  ...,  0.0159,  0.0185,  0.0120],
          ...,
          [ 0.0142,  0.0134,  0.0158,  ...,  0.0177,  0.0148,  0.0186],
          [ 0.0097,  0.0150,  0.0172,  ...,  0.0130,  0.0142,  0.0158],
          [ 0.0121,  0.0174,  0.0112,  ...,  0.0152,  0.0142,  0.0146]],

         [[-0.0348, -0.0340, -0.0391,  ..., -0.0330, -0.0365, -0.0288],
          [-0.0330, -0.0349, -0.0341,  ..., -0.0356, -0.0356, -0.0296],
          [-0.0335, -0.0348, -0.0367,  ..., -0.0301, -0.0376, -0.0301],
          ...,
          [-0.0289, -0.0334, -0.0330,  ..., -0.0366, -0.0325, -0.0375],
          [-0.0286, -0.0339, -0.0311,  ..., -0.0290, -0.0343, -0.0357],
          [-0.0297, -0.0334, -0.0322,  ..., -0.0349, -0.0329, -0.0339]]]],
       grad_fn=<ConvolutionBackward0>)


#Probando parametros de convergencia 


#Algoritmo de Ramer–Douglas–Peucker

    #source: https://karthaus.nl/rdp/


    def function DouglasPeucker(PointList[], epsilon)
    # Find the point with the maximum distance
    dmax = 0
    index = 0
    end = length(PointList)
    for i = 2 to (end - 1) {
        d = perpendicularDistance(PointList[i], Line(PointList[1], PointList[end])) 
        if (d > dmax) {
            index = i
            dmax = d
        }
    }

    ResultList[] = empty;

    # If max distance is greater than epsilon, recursively simplify
    if (dmax > epsilon) {
        # Recursive call
        recResults1[] = DouglasPeucker(PointList[1...index], epsilon)
        recResults2[] = DouglasPeucker(PointList[index...end], epsilon)

        # Build the result list
        ResultList[] = {recResults1[1...length(recResults1) - 1], recResults2[1...length(recResults2)]}
    } else {
        ResultList[] = {PointList[1], PointList[end]}
    }
    # Return the result
    return ResultList[]
