In [1]:
!pip install scikit-learn==1.5.0
!pip install optuna

Collecting scikit-learn==1.5.0
  Downloading scikit_learn-1.5.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (11 kB)
Downloading scikit_learn-1.5.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (13.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.3/13.3 MB[0m [31m78.5 MB/s[0m eta [36m0:00:00[0m:00:01[0m:01[0m
[?25hInstalling collected packages: scikit-learn
  Attempting uninstall: scikit-learn
    Found existing installation: scikit-learn 1.2.2
    Uninstalling scikit-learn-1.2.2:
      Successfully uninstalled scikit-learn-1.2.2
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
cesium 0.12.4 requires numpy<3.0,>=2.0, but you have numpy 1.26.4 which is incompatible.
umap-learn 0.5.9.post2 requires scikit-learn>=1.6, but you have scikit-learn 1.5.0 which is incompatible.[0m[31m
[0mSuccessful

In [2]:
import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import transforms as T
import torch.utils.data as data_utils
from torch.utils.data import TensorDataset, random_split, DataLoader
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.offsetbox import OffsetImage, AnnotationBbox
import optuna
import optuna.visualization as vis
import cupy as cp
from cuml.manifold import UMAP as cumlUMAP
from ipywidgets import interact, FloatSlider

crear device para pytorch

In [3]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

Descargar el dataset 

In [4]:
data_set = np.load('/kaggle/input/kdm-database-spiners/data.npy.npz')
print(data_set.files)

['Nest', 'L', 'rd', 'So', 'T', 'Jex', 'Jex2', 'Jex3', 'Jex4', 'Kan1', 'KanS', 'Hex', 'kd', 'KDM', 'MS']


En "data" se cargan las imagenes de las configuraciones de los espines, se reorganiza para que quede de la forma (N, C, H, W). Se redimensiona de 39x39 a 40x40 para que el cambio de dimensionalidad sea simetrico en el encoder y el decoder

In [5]:
data = data_set['MS']
data.shape
data = torch.from_numpy(data).permute(0, 3, 1, 2).float()
data = F.interpolate(data, size=(40, 40), mode='bilinear')
data = data / data.max() #normalizar, queda [-1,1]
data = (data + 1) / 2 #mover rango [0,1]
data.shape

torch.Size([164212, 1, 40, 40])

Como este es un cvae, "y" va a ser el vector con los parámetros que van a condicionar el modelo

In [6]:
kdm = data_set['KDM']
t = data_set['T']
y = np.column_stack((kdm, t))
y = torch.from_numpy(y).float()
y.shape

torch.Size([164212, 2])

In [7]:
full_dataset = TensorDataset(data, y)

val_size = int(len(full_dataset) * 0.2)
train_size = len(full_dataset) - val_size
train_dataset, val_dataset = random_split(full_dataset, [train_size, val_size])

In [8]:
train_loader = data_utils.DataLoader(
    train_dataset, 
    batch_size=128, 
    shuffle=True
)

val_loader = DataLoader(
    val_dataset,
    batch_size=128, 
    shuffle=False
)

#### La arquitectura de este CVAE cuenta:
Este es la primera versión del CVAE (concatenación + prior condicionado). 
En este modelo, el VAE se condicionó de la siguiente manera:
- Se pasaron los parámetros (kdm y t) por redes densas y se concatenaron a la salida flatten del encoder, resultando en una distribución $q(x|y)$
- Se condicionó el prior, es decir, se pasó de $p(z)$ a $p(z|y)$
  
Donde $y$ es el vector que contiene a los parámetros kdm y t.

Así, de la distribución marginal aproximada por el encoder $q(x|y)$ resulta $\mu_q$ y $\log{\sigma_q}$

Y de la distribución prior $p(z|y)$ resulta $\mu_p$ y $\log{\sigma_p}$

Para el encoder: se deja la función build_encoder para que durante la optimización pruebe diferentes configuraciones de capas, un flaten despues del ciclo para aplanar los datos y una densa para hallar $\mu$ y $\sigma$

Para el dencoder: una densa para aumentar la dimension de z, una unflatten para pasar de un vector a un tensor y en base al ciclo propuesto en el encoder se establece el número de capas convtranspose que requiere el decoder

Además, se define una función infer_star_dim para calcular el tamaño que tiene el tensor al final del encoder. Lo que se hace es pasar un tensor de ceros con el mismo tamaño de la imagen original a través del encoder, así, se guarda las dimensiones de la ultima capa conv para cuando se necesite hacer unflatten

In [9]:
class CVAE(nn.Module):
    def __init__(self, z_dim, layers_config, device, cond_dim=2):
        super(CVAE, self).__init__()
        self.z_dim = z_dim
        self.layers_config = layers_config
        self.kernel_size = 4
        self.stride = 2
        self.padding = 1
        self.cond_dim = cond_dim
        # mover todo a gpu
        self.device = device  

        self.cond = nn.Sequential(
            nn.Linear(self.cond_dim, 64),
            nn.LeakyReLU(0.2),
            nn.Linear(64, 128),
            nn.LeakyReLU(0.2)
        ).to(self.device)
        
        self.cond_dim_process = 128

        self.encoder = self._build_encoder(layers_config).to(self.device)
        self.flat_dim, self.h_start, self.w_start = self._infer_start_dim()
        
        self.fc_mu_q = nn.Linear(self.flat_dim + self.cond_dim_process, z_dim).to(self.device)
        self.fc_logvar_q = nn.Linear(self.flat_dim + self.cond_dim_process, z_dim).to(self.device)
        self.decoder = self._build_decoder().to(self.device)

        # condicionamiento en el prior
        self.fc_mu_p = nn.Linear(self.cond_dim_process, z_dim).to(self.device)
        self.fc_logvar_p = nn.Linear(self.cond_dim_process, z_dim).to(self.device)
        
    # crea el encoder en base a las configuraciones
    def _build_encoder(self, layers_config):
        layers = []
        in_ch = 1
        for (n_filters, use_bn) in layers_config:
            layers.append(nn.Conv2d(in_ch, n_filters, kernel_size=3, stride=2, padding=1))
            if use_bn:
                layers.append(nn.BatchNorm2d(n_filters))
            layers.append(nn.LeakyReLU(0.2, inplace=True))
            in_ch = n_filters
        layers.append(nn.Flatten())
        encoder = nn.Sequential(*layers)
        return encoder

    # se guardan dimensiones de la ultima capa
    def _infer_start_dim(self):
        # tensor de ceros con el tamaño de la imagen para pasarlo por el encoder
        x = torch.zeros(1, 1, 40, 40).to(self.device)
        
        with torch.no_grad():
            # encoder temporal sin flatten para medir dimensiones espaciales
            temp_encoder = nn.Sequential(*list(self.encoder.children())[:-1]) 
            temp_encoder.eval()            
            current_layer = x

            
            for layer in temp_encoder:
                current_layer = layer(current_layer)

            out = current_layer
        
        flat_dim = out.shape[1] * out.shape[2] * out.shape[3]
        h_start, w_start = out.shape[2], out.shape[3] 
        
        return flat_dim, h_start, w_start

    # crea el decoder también en base a las configuraciones de capas
    def _build_decoder(self):
        layers = []
        layers.append(nn.Linear(self.z_dim + self.cond_dim_process, self.flat_dim))
        # se pasa deun vector plano a un tensor con las dimensiones de la ultima capa convolucional del encoder
        layers.append(nn.Unflatten(1, (self.layers_config[-1][0], self.h_start, self.w_start)))
        layers.append(nn.LeakyReLU(0.2, inplace=True))
        
        # se invierte el vector de las configuraciones
        reversed_config = list(reversed(self.layers_config))
        
        N_layers = len(reversed_config) # Total de capas transpuestas necesarias
        
        # la primera entrada (input chanel) es la última del encoder
        in_ch = reversed_config[0][0] 
        for i in range(N_layers): 
            
            # atualización de los canales
            is_final_layer = (i == N_layers - 1)
            
            # si no es la capa final, el out_ch es el filtro de la siguiente capa en la config inversa
            out_ch = 1 if is_final_layer else reversed_config[i+1][0] 
            
            # la configuración de BN se toma de la capa a la que se está transponiendo (el out_ch)
            use_bn = reversed_config[i+1][1] if not is_final_layer else False
            
            # se construyen las capas conv
            layers.append(nn.ConvTranspose2d(in_ch, out_ch, kernel_size=4, stride=2, padding=1))
            in_ch = out_ch
            
            # Aplicar BN y LeakyReLU, excepto en la capa de salida final
            if not is_final_layer:
                if use_bn:
                    layers.append(nn.BatchNorm2d(out_ch))
                layers.append(nn.LeakyReLU(0.2, inplace=True))
            else:
                layers.append(nn.Sigmoid())
                
        decoder = nn.Sequential(*layers)
        return decoder
    
    def encode(self, x, y):
        h = self.encoder(x)
        y_process = self.cond(y)
        #concatenacion
        combined = torch.cat([h, y_process], dim=1)
        mu_q = self.fc_mu_q(combined)
        logvar_q = self.fc_logvar_q(combined)
        return mu_q, logvar_q

    # Calcula el prior condicionado
    def get_prior(self, y):
        y_process = self.cond(y)
        mu_p = self.fc_mu_p(y_process)
        logvar_p = self.fc_logvar_p(y_process)
        return mu_p, logvar_p

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        z = mu + eps*std
        return z

    def decode(self, z, y):
        y_process = self.cond(y)
        combined = torch.cat([z, y_process], dim=1)
        return self.decoder(combined)

    def forward(self, x, y):
        mu_q, logvar_q = self.encode(x, y)
        z = self.reparameterize(mu_q, logvar_q)
        x_hat = self.decode(z, y)
        mu_p, logvar_p = self.get_prior(y)
        return x_hat, mu_q, logvar_q, mu_p, logvar_p

    def get_intermediate_features(self, x):
        features = {}
        h = x
        conv_blocks = list(self.encoder.children())
        
        # Layer 1
        h = conv_blocks[0](h)
        # Aplanar de (B, C, H, W) a (B, C*H*W)
        features['L1'] = h.flatten(start_dim=1) 
        
        # Layer 2
        h = conv_blocks[1](h)
        features['L2'] = h.flatten(start_dim=1)
    
        # Layer 3
        h = conv_blocks[2](h)
        features['L3'] = h.flatten(start_dim=1)
        
        return features

Se cargan los pesos del modelo previamente entrenado y los mejores parámetros encontrados

In [10]:
## params 
best_params = {'lr': 0.0011785331104065553, 
               'z_dim': 64, 
               'beta': 0.1013322948202451, 
               'batch_size': 64, 
               'base_filters': 64, 
               'growth': 1.476083655965836, 
               'use_bn': False}

z_dim = best_params['z_dim'] 
beta = best_params['beta']
num_layers = 3
base_filters = best_params['base_filters']
growth = best_params['growth']
use_bn = best_params['use_bn']

layers_config = []

for i in range(num_layers):
        filters = int(base_filters * (growth ** i))
        layers_config.append((filters, use_bn))

model = CVAE(z_dim=z_dim, layers_config=layers_config, device=device, cond_dim=2)
pesos = "/kaggle/input/pesos-cvae-emb-co/vae_spines_pesos_final.pth"
state_dict = torch.load(pesos, map_location=device)
model.load_state_dict(state_dict)
model.to(device)

CVAE(
  (cond): Sequential(
    (0): Linear(in_features=2, out_features=64, bias=True)
    (1): LeakyReLU(negative_slope=0.2)
    (2): Linear(in_features=64, out_features=128, bias=True)
    (3): LeakyReLU(negative_slope=0.2)
  )
  (encoder): Sequential(
    (0): Conv2d(1, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (1): LeakyReLU(negative_slope=0.2, inplace=True)
    (2): Conv2d(64, 94, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (3): LeakyReLU(negative_slope=0.2, inplace=True)
    (4): Conv2d(94, 139, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (5): LeakyReLU(negative_slope=0.2, inplace=True)
    (6): Flatten(start_dim=1, end_dim=-1)
  )
  (fc_mu_q): Linear(in_features=3603, out_features=64, bias=True)
  (fc_logvar_q): Linear(in_features=3603, out_features=64, bias=True)
  (decoder): Sequential(
    (0): Linear(in_features=192, out_features=3475, bias=True)
    (1): Unflatten(dim=1, unflattened_size=(139, 5, 5))
    (2): LeakyReLU(negative_slope=0

In [35]:
def generar_espin(kdm_valor, t_valor):
    model.eval()
    y_input = torch.tensor([[kdm_valor, t_valor]], dtype=torch.float32).to(device)
    
    with torch.no_grad():
        mu_p, logvar_p = model.get_prior(y_input)
        z_sample = model.reparameterize(mu_p, logvar_p)
        imagen_generada = model.decode(z_sample, y_input) 
    imagen_numpy = imagen_generada.cpu().squeeze().numpy()
    plt.figure(figsize=(5, 5))
    plt.imshow(imagen_numpy, cmap='jet') # 'jet' parece ser tu mapa de color
    plt.colorbar(label="orientación")
    plt.title(f"Estimación\nKDM={kdm_valor} | T={t_valor}")
    plt.axis('off')
    plt.show()


In [36]:
interact(generar_espin, 
         kdm_valor=FloatSlider(min=0.01, max=1.2, step=0.1, value=0.0, description='KDM'),
         t_valor=FloatSlider(min=0.01, max=20, step=0.1, value=1.0, description='T'));

interactive(children=(FloatSlider(value=0.01, description='KDM', max=1.2, min=0.01), FloatSlider(value=1.0, de…