<a href="https://colab.research.google.com/github/bonillahermes/Deep_Learning_Projects/blob/main/Distancias.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Hermes Yate Bonilla
**Data Scientist**
---

**Contact:**
- **Email:** [bonillahermes@gmail.com](mailto:bonillahermes@gmail.com)
- **LinkedIn:** [linkedin.com/in/bonillahermes](https://www.linkedin.com/in/bonillahermes/)
- **GitHub:** [github.com/bonillahermes](https://github.com/bonillahermes)
- **Webpage:** [bonillahermes.com](https://bonillahermes.com/)
---

In [None]:
import os
import random
from tqdm.auto import tqdm
import torch
import pandas as pd
from skimage import io, transform
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import matplotlib.colors as mcolors
import matplotlib.patches as mpatches
from mpl_toolkits.mplot3d import Axes3D
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils
import torch.optim as optim #Optim module of Pytorch provides Optimization algorithms for training NN
import rasterio
import torch.nn as nn
import torch.nn.functional as F
import io
import gc  # garbage collector

import matplotlib as mpl
from matplotlib.patches import Patch
import PIL

from Junglenet import Junglenet

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

data_folder = os.path.expanduser('/export/data/s7rocast/AnthroProtect/data/anthroprotect')


# Activation Space

class APDataset(Dataset):

    def __init__(self, split, data_folder, device):

        # file folder
        self.file_folder = os.path.join(data_folder, 'tiles/s2')

        # df
        csv_file = os.path.join(data_folder, 'infos.csv')
        df = pd.read_csv(csv_file)
        df = df[df['datasplit'] == split]



        self.files = df['file'].to_list()
        self.targets = df['label'].to_list()


        self.device = device

    def __getitem__(self, index):

        # get x
        file = self.files[index]
        file = os.path.join(self.file_folder, file)

        with rasterio.open(file) as src:
            x = src.read()

        # transform x
        x = x.astype(np.float32)
        x = torch.tensor(x)
        x /= 10000
        x = x.to(self.device)

        # get y
        y = self.targets[index]

        # transform y
        y = torch.tensor(y, dtype=torch.float32)
        y = y.to(self.device)

        return {'x': x, 'y': y, 'file': file}

    def __len__(self):

        return len(self.files)

# all datasets
train_dataset = APDataset(split='train', data_folder=data_folder, device=device)
val_dataset = APDataset(split='val', data_folder=data_folder, device=device)
test_dataset = APDataset(split='test', data_folder=data_folder, device=device)


print(val_dataset[0])

model = Junglenet(
    in_channels=10,
    n_unet_maps=3,
    n_classes=1,

    unet_base_channels=32,
    double_conv=False,
    batch_norm=True,
    unet_mode='bilinear',
    unet_activation=nn.Tanh(),

    dropout=None,
    final_activation=nn.Sigmoid(),
)
model.to(device)

# Load Model

model.load_state_dict(torch.load('/export/data/s7rocast/RepositoryThesis/model/junglenet_epoch_88.pth'))

# Land cover
class APDataset(Dataset):

    def __init__(self, split, data_folder, device):

        # file folder
        self.file_folder = os.path.join(data_folder, 'tiles/esa_lc/lc_esawc')

        # df
        csv_file = os.path.join(data_folder, 'infos.csv')
        df = pd.read_csv(csv_file)
        df = df[df['datasplit'] == split]



        self.files = df['file'].to_list()
        self.targets = df['label'].to_list()


        self.device = device

    def __getitem__(self, index):

        # get x
        file = self.files[index]
        file = os.path.join(self.file_folder, file)

        with rasterio.open(file) as src:
            x = src.read()

        # transform x
        x = x.astype(np.float32)
        x = torch.tensor(x)
        #x /= 10000
        x = x.to(self.device)

        # get y
        y = self.targets[index]

        # transform y
        y = torch.tensor(y, dtype=torch.float32)
        y = y.to(self.device)

        #return {'x': x, 'y': y, 'file': file}
        return {'x': x}

    def __len__(self):

        return len(self.files)

# all datasets
esa_train_dataset = APDataset(split='train', data_folder=data_folder, device=device)
esa_val_dataset = APDataset(split='val', data_folder=data_folder, device=device)
esa_test_dataset = APDataset(split='test', data_folder=data_folder, device=device)


print(esa_val_dataset[0])


# Obtain Activation and Land cover vectors for a set of random images
def get_acts_and_lc_l(indices):
    act_vectors_list = []
    lc_vectors_list = []

    for index in indices:
        x = train_dataset[index]['x']
        lc_map = esa_train_dataset[index]['x'][0].unsqueeze(0).unsqueeze(0)

        # model.eval() ask if it is needed
        with torch.no_grad():
            act_map = model.unet(x.unsqueeze(0))

        n_channels = act_map.shape[1]
        act_vectors = act_map.permute(0, 2, 3, 1).reshape(-1, n_channels).cpu()
        act_vectors_list.append(act_vectors)

        n_channels = lc_map.shape[1]
        lc_vectors = lc_map.permute(0, 2, 3, 1).reshape(-1, n_channels).cpu()
        lc_vectors_list.append(lc_vectors)

    # Concatenate the lists to get the final tensors
    act_vectors = torch.cat(act_vectors_list, dim=0)
    lc_vectors = torch.cat(lc_vectors_list, dim=0)

    return act_vectors, lc_vectors

# Example usage with a random set of indices
total_images = len(train_dataset)  # Assuming train_dataset is a list or similar structure
num_images_to_select = 200  # Change this to the desired number of random images

# Randomly select indices
random_indices = random.sample(range(total_images), num_images_to_select)

# Get Activation and Land cover vectors for the randomly selected indices
act_vectors, lc_vectors = get_acts_and_lc_l(random_indices)

print(lc_vectors.shape)
print(act_vectors.shape)

colors=['#006400', '#ffbb22', '#ffff4c', '#f096ff', '#fa0000', '#b4b4b4', '#f0f0f0', '#0064c8', '#0096a0', '#00cf75', '#fae6a0']

classes = [10, 20, 30, 40, 50, 60,70, 80, 90, 95, 100]

classes_weights = [49, 99.6, 78, 87, 150, 99, 99.5, 95, 98.5, 100, 95]
#classes_weights = [40, 70, 60, 500, 2000, 78, 79, 75, 78, 100, 75]

def get_cmap(colors=None, classes=None, epsilon: float = 1e-2, framing_color: str = 'white', plot: bool = False):
    """
    Creates a colormap from given colors and classes. E.g. if you have classes [11, 12, 21, 31, ...] with the
    following colors ['blue', 'red', 'green', 'yellow', ...] you can give these two lists as colors and classes.

    :param colors: list of colors, e.g. ['blue', 'red', 'green', 'yellow', ...]
    :param classes: list of classes, e.g. [11, 12, 21, 31, ...]
    :param epsilon: to correctly plot colors accoring to numbers, the numbers must be a bit lower; this i.a. can be
        controlled with epsilon
    :param framing_color: all numbers lower or higher that given min/max bound will be plotted in framing_color;
        if None, lower or higher number will be plotted in lowest or highest given color
    :param plot: bool if colormap shall be plotted for test reasons
    :return: (cmap, norm) colormap and normalization
    """

    bounds = np.array(classes)
    bounds = np.hstack([bounds, bounds.max() + 2 * epsilon])  # there must be a bound larger then given max bound
    bounds = np.array(bounds) - epsilon  # all bounds should be a little smaller than given, if categories are plotted

    if framing_color is not None:

        colors = [framing_color] + list(colors) + [framing_color]
        bounds = np.array([bounds.min() - epsilon] + list(bounds) + [bounds.max() + epsilon])

    cmap = mpl.colors.ListedColormap(colors=colors)
    norm = mpl.colors.BoundaryNorm(bounds, cmap.N)

    if plot:
        classes = np.array(classes)
        array = [np.hstack([classes.min() - 1, classes, classes.max() + 1])]
        fig, ax = plt.subplots()
        im = ax.imshow(array, cmap=cmap, norm=norm)
        ax.axis(False)
        fig.colorbar(im, ticks=classes, orientation='horizontal')
        fig.tight_layout()
        plt.show()

    return cmap, norm
cmap, norm = get_cmap(colors,classes)

n_pixeles = 1000000
cantidad_de_pixeles = act_vectors.shape[0]
# Define class weights
class_weights = {class_val: weight for class_val, weight in zip(classes, classes_weights)}

# Randomly select pixels with weighted randomization
lc_values = [float(class_val.item()) for class_val in lc_vectors[:, 0]]
class_weights_sum = sum(class_weights[class_val] for class_val in lc_values)
normalized_probabilities = [class_weights[class_val] / class_weights_sum for class_val in lc_values]

weighted_random_indices = np.random.choice(cantidad_de_pixeles, size=n_pixeles, p=normalized_probabilities)

act_vectors_filtered = act_vectors[weighted_random_indices]
lc_vectors_filtered = lc_vectors[weighted_random_indices]

class_names_mapping = {
    10.0: 'Tree cover',
    20.0: 'Shrubland',
    30.0: 'Grassland',
    40.0: 'Cropland',
    50.0: 'Built-up',
    60.0: 'Bare / sparse vegetation',
    70.0: 'Snow and ice',
    80.0: 'Permanent water bodies',
    90.0: 'Herbaceous wetland',
    95.0: 'Mangroves',
    100.0: 'Moss and lichen'
}

from sklearn.mixture import GaussianMixture
X = np.array([[1, 2], [1, 4], [1, 0], [10, 2], [10, 4], [10, 0]])
gm = GaussianMixture(n_components=2, random_state=0).fit(X)
gm.means_
array([[10.,  2.],
       [ 1.,  2.]])
gm.predict([[0, 0], [12, 3]])
array([1, 0])

# Dimensionality Reduction
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
act_vectors_np = act_vectors_filtered.numpy()
scaler = StandardScaler()
act_vectors_std = scaler.fit_transform(act_vectors_np)
pca = PCA(n_components=2)
act_vectors_pca = pca.fit_transform(act_vectors_std)
print(act_vectors_pca.shape)




# Reducción de dimensionalidad y GMM

In [None]:
from sklearn.decomposition import PCA
from sklearn.mixture import GaussianMixture
import matplotlib.pyplot as plt

In [None]:
for i in range(5):  # Imprimir los primeros 5 pares de vectores y etiquetas
    print(f"Vector de Activación {i}: {act_vectors_filtered[i]}")
    print(f"Etiqueta de Cobertura de Tierra {i}: {lc_vectors_filtered[i]}")


In [None]:
# Reducir la dimensionalidad a 2 componentes con PCA
pca = PCA(n_components=2)
act_vectors_reduced = pca.fit_transform(act_vectors_filtered)
print(act_vectors_reduced.shape)


Criterio Bayesiano: Evalua cuantos clusters existen en el modelo. Numero de distribuciones en mis datos. ajuste de dimensiones

In [None]:
act_vectors_np = act_vectors_filtered.numpy()

# Definir el rango del número de componentes
n_max_components = 10  # Puedes ajustar este valor

In [None]:
BIC_scores = []
for i in range(1, n_max_components + 1):
    gmm = GaussianMixture(n_components=i, random_state=0).fit(act_vectors_np)
    BIC_scores.append(gmm.bic(act_vectors_np))

# Encontrar el número óptimo de componentes
optimal_n_components = np.argmin(BIC_scores) + 1
print(f"El número óptimo de componentes es: {optimal_n_components}")

# Graficar los BIC scores para visualizar el 'codo'
plt.figure(figsize=(8, 4))
plt.plot(range(1, n_max_components + 1), BIC_scores, marker='o')
plt.title('GMM BIC Score por Número de Componentes')
plt.xlabel('Número de Componentes')
plt.ylabel('BIC Score')
plt.xticks(range(1, n_max_components + 1))
plt.grid(True)
plt.show()



In [None]:
AIC_scores = []
for i in range(1, n_max_components + 1):
    gmm = GaussianMixture(n_components=i, random_state=0).fit(act_vectors_np)
    AIC_scores.append(gmm.aic(act_vectors_np))

# Encontrar el número óptimo de componentes
optimal_n_components = np.argmin(AIC_scores) + 1
print(f"El número óptimo de componentes es: {optimal_n_components}")

# Graficar los AIC scores para visualizar el 'codo'
plt.figure(figsize=(8, 4))
plt.plot(range(1, n_max_components + 1), AIC_scores, marker='o')
plt.title('GMM AIC Score por Número de Componentes')
plt.xlabel('Número de Componentes')
plt.ylabel('AIC Score')
plt.xticks(range(1, n_max_components + 1))
plt.grid(True)
plt.show()



In [None]:
# Aplicar Gaussian Mixture Model
n_components = 10  # Ajustar este rango según sea necesario
gmm = GaussianMixture(n_components=n_components, random_state=0)
gmm.fit(act_vectors_reduced)

In [None]:
# Clasificar cada punto en uno de los clusters
labels = gmm.predict(act_vectors_reduced)

In [None]:
# Visualizar los clusters
plt.scatter(act_vectors_reduced[:, 0], act_vectors_reduced[:, 1], c=labels, cmap='viridis')
plt.title('Clusters GMM')
plt.xlabel('Componente Principal 1')
plt.ylabel('Componente Principal 2')
plt.colorbar(label='Etiqueta del Cluster')
plt.show()


### Bondad de ajuste del GMM

In [None]:
log_likelihood = gmm.score(act_vectors_reduced)
print("Log Likelihood: ", log_likelihood)

In [None]:
from sklearn.metrics import silhouette_score
silhouette_avg = silhouette_score(act_vectors_reduced, labels)
print("Silhouette Score: ", silhouette_avg)


### Plot de clasificación

In [None]:
# Extraer las etiquetas de capa terrestre
land_cover_labels = lc_vectors_filtered[:, 0]  # Modificar según etiquetado

plt.figure(figsize=(8, 6))

# Crear un scatter plot donde cada punto es coloreado según su tipo de capa terrestre
scatter = plt.scatter(act_vectors_reduced[:, 0], act_vectors_reduced[:, 1], c=land_cover_labels, cmap='viridis')

# Crear una leyenda con los tipos de capa terrestre
# Reemplaza esto con tus propios nombres de capa terrestre
class_names_mapping = {
    10.0: 'Tree cover',
    20.0: 'Shrubland',
    30.0: 'Grassland',
    40.0: 'Cropland',
    50.0: 'Built-up',
    60.0: 'Bare / sparse vegetation',
    70.0: 'Snow and ice',
    80.0: 'Permanent water bodies',
    90.0: 'Herbaceous wetland',
    95.0: 'Mangroves',
    100.0: 'Moss and lichen'
}
plt.legend(handles=scatter.legend_elements()[0], labels=class_names_mapping.values, loc='best')

plt.xlabel('PCA Component 1')
plt.ylabel('PCA Component 2')
plt.title('Visualización por Tipo de Capa Terrestre')
plt.show()


## Varianza o Desviación Estándar dentro de cada Cluster

In [None]:
cluster_dispersions = []

for i in range(gmm.n_components):
    cluster_points = act_vectors_reduced[labels == i]
    dispersion = np.var(cluster_points, axis=0)
    cluster_dispersions.append(dispersion)

# cluster_dispersions ahora contiene la varianza para cada cluster


## Distancia Media al Centroide

In [None]:
centroids = gmm.means_
mean_distances = []

for i in range(gmm.n_components):
    cluster_points = act_vectors_reduced[labels == i]
    centroid = centroids[i]
    distances = np.sqrt(np.sum((cluster_points - centroid) ** 2, axis=1))
    mean_distance = np.mean(distances)
    mean_distances.append(mean_distance)

# mean_distances ahora contiene la distancia media para cada cluster


## KL Divergencia

In [None]:
from scipy.stats import gaussian_kde
from scipy.integrate import quad

# Crear Data como un conjunto de datos unidimensional
data = act_vectors_filtered.numpy()[:, 0]  # Modifica según requerimientos

# Estimación de la densidad de los datos usando KDE
kde = gaussian_kde(data)

# Función de densidad de probabilidad del GMM
gmm_pdf = lambda x: np.exp(gmm.score_samples(x.reshape(-1, 1)))

# Función para calcular la divergencia KL
def kl_divergence(p, q, start=-10, end=10):
    return quad(lambda x: p(x) * np.log(p(x) / q(x)), start, end)[0]

# Calcular la divergencia KL
kl_div = kl_divergence(kde, gmm_pdf)
print("KL Divergence: ", kl_div)


In [None]:
import numpy as np
from scipy.stats import gaussian_kde
from scipy.integrate import quad

data = act_vectors_filtered.numpy()[:, 0]

# Estimación de la densidad de los datos usando KDE
kde = gaussian_kde(data)

# Modificar la función lambda para manejar entradas escalares
gmm_pdf = lambda x: np.exp(gmm.score_samples(np.atleast_1d(x).reshape(-1, 1)))

# Función para calcular la divergencia KL
def kl_divergence(p, q, start=-10, end=10):
    return quad(lambda x: p(x) * np.log(p(x) / q(x)), start, end)[0]

# Calcular la divergencia KL
kl_div = kl_divergence(kde, gmm_pdf)
print("KL Divergence: ", kl_div)
