In [None]:
import tarfile
import zipfile
import os
import random
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from matplotlib.offsetbox import OffsetImage, AnnotationBbox
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from tqdm import tqdm
#from models.model import ConNet, Rectifiedflow
#from train.train_rectified_flow import train_rectified_flow
from sklearn.model_selection import train_test_split

In [None]:
#Cargamos las rutas de los archivos .npy donde se encuentran nuestros datos

##################
ruta_pi_1 = 'ruta'
ruta_pi_0 = 'ruta'
##################
pi_1_np = np.load(ruta_pi_1)
pi_0_np = np.load(ruta_pi_0)
idx = 5

#Verificamos el rango de valores
img_array = pi_1_np[idx]
print(img_array.shape)
print(img_array.dtype)
print(img_array.min(), img_array.max())

# Visualizamos de acuerdo a idx
plt.figure(figsize=(6, 6))
plt.subplot(1, 2, 1)
plt.imshow(pi_1_np[idx])
plt.axis('off')
plt.title(f'Famosos {idx}')
plt.subplot(1, 2, 2)
plt.imshow(pi_0_np[idx])
plt.axis('off')
plt.title(f'Caricaturas {idx}')
plt.show()

In [None]:
# =========================
# 2. Separar train / val
# =========================
pi0_train, pi0_val, pi1_train, pi1_val = train_test_split(
    pi_0_np, pi_1_np, test_size=0.2, random_state=42
)

# Convertir a tensores y permutar a (N,C,H,W)

#Podemos usar los diferentes datos para visualizar las trayectorias, sólo hay
#que eliminar el comentario y ajustarlo a esa variable
x0_train = torch.tensor(pi0_train, dtype=torch.float32).permute(0,3,1,2)
#x1_train = torch.tensor(pi1_train, dtype=torch.float32).permute(0,3,1,2)
##x0_val   = torch.tensor(pi0_val, dtype=torch.float32).permute(0,3,1,2)
#x1_val   = torch.tensor(pi1_val, dtype=torch.float32).permute(0,3,1,2)


In [None]:
class ConNet(nn.Module):
    '''Convolutional Neural Network for image processing with time conditioning.'''
    def __init__(self, in_channels=3, hidden_channels=64, out_channels=3):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels + 1, hidden_channels, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(hidden_channels, hidden_channels, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(hidden_channels, out_channels, kernel_size=3, padding=1)
        self.act = nn.ReLU()

    def forward(self, x_input, t):
        batch_size, _, H, W = x_input.shape
        t_expanded = t.view(batch_size, 1, 1, 1).expand(-1, 1, H, W)
        x = torch.cat([x_input, t_expanded], dim=1)

        x = self.act(self.conv1(x))
        x = self.act(self.conv2(x))
        x = self.conv3(x)
        return x

class Rectifiedflow():
    def __init__(self,model=None, num_steps=100):

        self.model = model # Aquí pasas una instancia de ConvNet
        self.N = num_steps # Pasos de integración (Euler)

    def get_train_tuple(self, z0=None, z1=None):
        # z0, z1: [batch_size, 3, 64, 64]
        t = torch.rand((z1.shape[0],1),device=z1.device) # [batch_size, 1]
        # Interpolación entre z0 y z1 → [batch_size, 3, 64, 64]
        z_t = t[:, None, None, None] * z1 + (1. - t[:, None, None, None]) * z0
        target = z1 - z0  # Lo que debe aprender
        return z_t, t, target  # t sigue siendo [batch_size, 1]

    @torch.no_grad()
    def sample_ode(self, z0=None, N=None):
        """
        z0: [batch_size, 3, 64, 64]
        N: número de pasos (si no se pasa, usa self.N)
        """
        if N is None:
            N = self.N

        dt = 1. / N
        traj = []
        z = z0.detach().clone()
        batch_size = z.shape[0]

        traj.append(z.clone())

        for i in range(N):
            t = torch.ones((batch_size, 1), device=z.device) * (i / N)
            pred = self.model(z, t)  # [batch_size, 3, 64, 64]
            z = z + pred * dt
            traj.append(z.clone())

        return traj  # lista de [batch_size, 3, 64, 64]

In [None]:
#Cargamos el modelo a revisar sus predicciones
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = ConNet().to(device)
ruta = 'ruta' #POner ruta del modelo
model.load_state_dict(torch.load(ruta, map_location=device))
model.eval()
rectified_flow = Rectifiedflow(model=model, num_steps=100)

In [None]:
#////////////////////////////////////////////////////////////#
# No es necesario que esté dentro del ciclo for, para no gastar RAM
flat_pi1 = pi_1_np.reshape(pi_1_np.shape[0], -1) #Se aplanan las caricaturas
flat_pi0 = pi_0_np.reshape(pi_0_np.shape[0], -1) #Se aplanan los famosos
dic_Train = {}
#////////////////////////////////////////////////////////////#
# Concatenamos los conjuntos base
X_base = np.vstack((flat_pi1, flat_pi0))

# Entrenamos el PCA en esos datos
pca = PCA(n_components=2, random_state=42)
pca.fit(X_base)  # Aquí se define el espacio latente común

In [None]:
### Creación de diccionario de las trayectorias
def dicc_trayecto(Datos):
  trayetorias_dicc = {}
  for k in range(len(Datos)):
      z0 = Datos[k:k+1].to(device)
      with torch.no_grad():
           tray = rectified_flow.sample_ode(z0)

  # Convertir a numpy para graficar o analizar
      trajectory_imgs = []
      for step_img in tray:
          img = step_img.squeeze().cpu().permute(1, 2, 0).numpy()
          img = np.clip(img, 0, 1)  # Asegura valores válidos
          trajectory_imgs.append(img)
      trayetorias_dicc[str(k)] = trajectory_imgs
  return trayetorias_dicc


### Creación la definición del aplanamiento de las imagenes
def aplanamiento(DIccionario):
    flat_inter2_list = []  # Aquí guardamos las imagenes aplanados
    #Una vez que construimos los datos, aplanamos los datos del diccionario
    for j in range(0,5):
        # Accede a las imágenes de la clave j, aplana y agrega
        flatten_imgs = [img.flatten() for img in Diccionario[str(j)]]
        flat_inter2_list.extend(flatten_imgs)  # Agrega todos a la lista


    # Convierte la lista de arrays planos en un array de NumPy
    flat_inter2 = np.array(flat_inter2_list)
    # Combinamos todo verticalmente
    X_total = np.vstack((flat_pi1, flat_pi0, flat_inter2))

    return [X_total,flat_inter2.shape[0]]

#Proyectamos los datos en el espacio latente
X_pi0_proj = pca.transform(flat_pi0)
X_pi1_proj = pca.transform(flat_pi1)

In [None]:
dic_Train = {}  # Aquí guardarás las trayectorias proyectadas
Numero_imagenes = x0_train.shape[0]
m = int(Numero_imagenes/5)
for k in range(m):
    Datos = x0_train[5*k:5*(k+1),:,:,:]
    Diccionario = dicc_trayecto(Datos)  # Genera trayectorias de 5 imágenes

    # Aplana las trayectorias del diccionario
    flat_inter2_list = []
    for j in range(5):
        flatten_imgs = [img.flatten() for img in Diccionario[str(j)]]
        flat_inter2_list.extend(flatten_imgs)
    flat_inter2 = np.array(flat_inter2_list)

    # Proyecta con el PCA previamente entrenado
    flat_inter2_proj = pca.transform(flat_inter2)

    # Guarda las trayectorias proyectadas en el diccionario
    for j in range(5):
        start = j * 101
        end = (j + 1) * 101
        dic_Train[str(k*5 + j)] = flat_inter2_proj[start:end, :]

    del Diccionario, flat_inter2, flat_inter2_proj

In [None]:
fig, ax = plt.subplots(figsize=(10, 8))

# Dibujar los puntos de pi1 y pi0
ax.scatter(X_pi1_proj[:, 0], X_pi1_proj[:, 1], alpha=0.3,c='blue', label='$\pi_1$')
ax.scatter(X_pi0_proj[:, 0], X_pi0_proj[:, 1], alpha=0.3, c='red', label='$\pi_0$')
# Colores para diferenciar trayectorias si lo deseas
import matplotlib.cm as cm
colors = cm.viridis(np.linspace(0, 1, len(dic_Train)))

# Dibujar trayectorias
for i, (key, traj) in enumerate(dic_Train.items()):
    traj = np.array(traj)

    # Línea delgada de trayectoria
    ax.plot(traj[:, 0], traj[:, 1], '-', linewidth=1, color=colors[i], alpha=0.8)

    # Marcar el último punto más fuerte
    ax.scatter(traj[-1, 0], traj[-1, 1], color='black', s=30, label='Final' if i == 0 else "", zorder=5)

    # Opcional: flecha para dirección (último segmento)
    if traj.shape[0] >= 2:
        ax.annotate("",
                    xy=traj[-1], xytext=traj[-2],
                    arrowprops=dict(arrowstyle="->", color=colors[i], lw=1),
                    size=8)

# Estética del gráfico
plt.xlabel("PCA dimensión 1")
plt.ylabel("PCA dimensión 2")
plt.title("Interpolación en espacio PCA junto a los grupos reales reflow_model")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()

In [None]:
#Vamos a visualizar las predicciones que más se acercan a nuestro conjunto objetivo
#Vamos a estudiar el conjunto de las caricaturas o nuestra distribución objetivo

azul_x = X_pi1_proj[:, 0]
azul_y = X_pi1_proj[:, 1]

min_x = np.min(azul_x)
min_y = np.min(azul_y)


#Vamos a recorrrer sobre las proyecciones de las trayectorias en el espacio y vamos a encontrarl
#las tryaectorias que están dentro del rango de de la distribución objetivo
Imagenes = []
for k in range(len(dic_Train)):
  #Tomamos la evolución de la k-esima imagen
  Evolucion = dic_Train[str(k)]
  Eje_x = Evolucion[:,0]
  Eje_y = Evolucion[:,1]
  A = Eje_y > min_y
  B = Eje_x > min_x
  Mascara = A & B
  if np.sum(Mascara) > 0:
    print('La imagen',k,'llega a la región objetivo')
    if len(np.where(Mascara)[0]) > 80:
       Imagenes.append(k)
  #np.where(Mascara)

In [None]:
'''
En caso de querer ver las trayectorias diferentes a las que
no se encuentran cerca de la región del conjunto objetivo \pi_1
Sólo pon los indices de las imagenes en la lista y quita el comentario
también es recomendable si es que la linea anterior no imprimió nada
indicativo de que ninguna imagen está cerca de la región objetivo
'''
#Imagenes = []

In [None]:
#Prediciremos estas trayectorias con la red
Trayecto_imagenes = {}
for k in Imagenes:
  Imagen_predecir = x0_train[k:k+1]
  z0 = Imagen_predecir.to(device)
  with torch.no_grad():
       tray = rectified_flow.sample_ode(z0)

  trajectory_imgs = []
  for step_img in tray:
      img = step_img.squeeze().cpu().permute(1, 2, 0).numpy()
      img = np.clip(img, 0, 1)  # Asegura valores válidos
      trajectory_imgs.append(img)
  Trayecto_imagenes[k] = trajectory_imgs

In [None]:
#Visualizamos las trayectorias que entran en la región
#de nuestro conjunto objetivo
for k in Imagenes:
  Coordenadas = dic_Train[str(k)]
  plt.plot(Coordenadas[:, 0], Coordenadas[:, 1])

plt.scatter(X_pi0_proj[:, 0], X_pi0_proj[:, 1], c='red', s=5, label='π₀')
plt.scatter(X_pi1_proj[:, 0], X_pi1_proj[:, 1], c='blue', s=5, label='π1')
plt.legend()
plt.ylim(min_y,100)
plt.xlim(min_x,150)
plt.title("Trayectorias proyectadas con PCA entrenado")
plt.show()

In [None]:
#Graficamos la imagen inicial y la final de las trayectorias
#que se encunetrna más cerca de la región del conjunto objetivo.
for k in Trayecto_imagenes:
    trayectoria = Trayecto_imagenes[k]
    imagen_inicial = trayectoria[0]     # Primer paso
    imagen_final = trayectoria[-1]      # Último paso

    fig, axs = plt.subplots(1, 2, figsize=(8, 4))

    axs[0].imshow(np.clip(imagen_inicial, 0, 1))
    axs[0].set_title(f"Imagen inicial (ID: {k})")
    axs[0].axis('off')

    axs[1].imshow(np.clip(imagen_final, 0, 1))
    axs[1].set_title("Imagen generada")
    axs[1].axis('off')

    plt.tight_layout()
    plt.show()
