# **The Math Behind the Magic: Neural Networks, Theory and Practice**
## **Encuentro Nacional de Ingeniería Matemática 2024**
### ***Realizado por: Joaquín Fontbona, Javier Maass y Diego Olguín.***

# **Sesión 3: Todo es una red neuronal feedforward**

## **Parte 1: PINNs para EDO**

Primero importamos las librerías y configuramos la semilla aleatoria.

In [1]:
# Pytorch
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

# Numpy
import numpy as np

# Matplotlib
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation

# Plotly
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.offline as pyo

# Pandas
import pandas as pd

# Para mostrar el progreso del entrenamiento
from tqdm import tqdm

# Solo para dibujar el grafo de la red
from graphviz import Digraph

# Solo para las animaciones
from IPython.display import Image, HTML, display

In [2]:
# Semilla aleatoria
seed = 42

# Fijamos la semilla para Pytorch y Numpy
torch.manual_seed(seed)
np.random.seed(seed)

# Si lo desean, pueden "acelerar" la velocidad de entrenamiento de sus redes neuronales
# al utilizar la GPU de sus computadores (o cambiando el Runtime Type en Colab a uno con GPU)
# Si esto les presenta errores extraños ligados a "CUDA", les recomiendo settear esta variable
# como "cpu" (todo el notebook corre sin necesidad de usar GPU).
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

Ejemplificaremos el funcionamiento de una PINN con un sistema SIR simple

\begin{align*}
\frac{dS}{dt} &= -\beta S I \\
\frac{dI}{dt} &= \beta S I - \gamma I \\
\frac{dS}{dt} &= \gamma I
\end{align*}

Para ello primero trabajaremos con datos sintéticos.

In [3]:
# Clase para poder crear datos sintéticos de un modelo SIR

class SIR:
    def __init__(self, beta, gamma):
        # Guardamos los parámetros
        self.beta = beta
        self.gamma = gamma

    # Función de paso, pensando en integrar el sistema
    def step(self, S, I, R):
        dS = -self.beta * S * I
        dI = self.beta * S * I - self.gamma * I
        dR = self.gamma * I

        return dS, dI, dR

    # Simularemos con un método de Euler con paso constante igual a 1 día
    def simulate(self, S0, I0, R0, T):
        # Condición inicial
        S = [S0]
        I = [I0]
        R = [R0]

        # Evolución en cada día
        for t in range(T-1):
            dS, dI, dR = self.step(S[-1], I[-1], R[-1])
            S.append(S[-1] + dS)
            I.append(I[-1] + dI)
            R.append(R[-1] + dR)

        return np.array(S), np.array(I), np.array(R)

    # Gráfico de la simulación
    def plot_simulation(self, S, I, R, t):
        fig = go.Figure()
        fig.add_trace(go.Scatter(x=t, y=S, name="Susceptibles"))
        fig.add_trace(go.Scatter(x=t, y=I, name="Infectados"))
        fig.add_trace(go.Scatter(x=t, y=R, name="Recuperados"))
        text_t = f"Ajuste de parámetros: beta = {self.beta}, gamma = {self.gamma}"
        fig.update_layout(title=text_t, xaxis_title="Tiempo", yaxis_title="Población normalizada")
        fig.show()
        return fig,

In [4]:
# Simulación modelo SIR

# Parámetros
true_beta = 1.1
true_gamma = 0.5

# Condiciones iniciales
S0, I0, R0 = 0.95, 0.05, 0.0

# Horizonte de tiempo
T = 30

# Instanciamos
SIR_model = SIR(beta=true_beta, gamma=true_gamma)
# Simulamos
S, I, R = SIR_model.simulate(S0, I0, R0, T)
# Graficamos
fig, = SIR_model.plot_simulation(S, I, R, np.arange(T))

Primero veamos que pasa si no integramos la dinámica del problema y solo intentamos minimizar la pérdida asociada a los datos.

In [5]:
# Dimensiones de entrada y salida
n_in, n_out = 1, 3

Creamos una red neuronal que será la PINN. Notar que en realidad una PINN no es una arquitectura nueva, es solo otra forma de llamarle a una FFNN, la diferencia vendrá al momento de declarar la función de pérdida.

In [6]:
# Una clase que creará una red neuronal del ancho y profundida que le demos
class PINN(nn.Module):
    def __init__(self, wide, deep):
        super(PINN, self).__init__()
        self.wide = wide # Ancho
        self.deep = deep # Profundidad
        # Capa de entrada
        self.layers = nn.ModuleList([nn.Linear(n_in, self.wide)])
        # Capas ocultas
        self.layers.extend([nn.Linear(self.wide, self.wide) for _ in range(self.deep - 1)])
        # Capa de salida
        self.layers.append(nn.Linear(self.wide, n_out))
        # Función de activación
        self.activation = nn.Sigmoid()

    # Forward pass
    def forward(self, x):
        for layer in self.layers[:-1]:
            x = self.activation(layer(x))
        return self.layers[-1](x)

Entrenaremos con ciertos datos y luego veremos cómo la red generaliza a tiempos futuros.

In [None]:
# Generamos datos sintéticos
T_data = 10
t_obs = np.arange(T_data)
S_obs, I_obs, R_obs = SIR_model.simulate(S0, I0, R0, T=T_data)

# Convertimos a tensores
t_data_tensor = torch.tensor(t_obs, dtype=torch.float32).view(-1, 1)
S_tensor = torch.tensor(S_obs, dtype=torch.float32).view(-1, 1)
I_tensor = torch.tensor(I_obs, dtype=torch.float32).view(-1, 1)
R_tensor = torch.tensor(R_obs, dtype=torch.float32).view(-1, 1)

# Definimos la función de pérdida, de una manera un poco diferente a cómo
# lo hemos hecho antes, pero sigue siendo válida
def loss_function(model, t_data, S_data, I_data, R_data):
    t_data.requires_grad_(True)
    SIR_pred_data = model(t_data)
    S_pred_data, I_pred_data, R_pred_data = SIR_pred_data[:, 0:1], SIR_pred_data[:, 1:2], SIR_pred_data[:, 2:3]

    # Pérdida de los datos observados
    loss = torch.mean((S_pred_data - S_data)**2) + torch.mean((I_pred_data - I_data)**2) + torch.mean((R_pred_data - R_data)**2)

    return loss

# Instanciamos la clase
wide, deep = 200, 1
model = PINN(wide, deep)

# Entrenamiento
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

epochs = 3000
for epoch in range(epochs):
    optimizer.zero_grad()
    loss = loss_function(model, t_data_tensor, S_tensor, I_tensor, R_tensor)
    loss.backward()
    optimizer.step()

    if epoch % 500 == 0:
        print(f"Epoch {epoch}, Loss: {loss.item():.6f}")

Epoch 0, Loss: 0.553119


In [None]:
# Visualización de resultados junto con resultados de testeo
T_test = 30

t_test = torch.linspace(0, T_test-1, T_test).view(-1, 1)
SIR_pred = model(t_test).detach().numpy()

S, I, R = SIR_model.simulate(S0, I0, R0, T=T_test)

# Gráfico con plotly
fig = go.Figure()


# Observaciones
fig.add_trace(
    go.Scatter(
        x=t_obs, y=S_obs, mode='markers', name='S Observado', marker=dict(color='blue', opacity=0.9))
    )
fig.add_trace(
    go.Scatter(
        x=t_obs, y=I_obs, mode='markers', name='I Observado', marker=dict(color='red'))
    )
fig.add_trace(
    go.Scatter(
        x=t_obs, y=R_obs, mode='markers', name='R Observado', marker=dict(color='orange'))
    )

# Predicción
fig.add_trace(
    go.Scatter(
        x=t_test.numpy().squeeze(), y=SIR_pred[:, 0], mode='lines', name='S Predicho', marker=dict(color='blue'))
    )
fig.add_trace(
    go.Scatter(
        x=t_test.numpy().squeeze(), y=SIR_pred[:, 1], mode='lines', name='I Predicho', marker=dict(color='red'))
    )
fig.add_trace(
    go.Scatter(
        x=t_test.numpy().squeeze(), y=SIR_pred[:, 2], mode='lines', name='R Predicho', marker=dict(color='orange'))
    )

# Real
fig.add_trace(
    go.Scatter(x=np.arange(T_test), y=S, mode='lines', name='S Real', marker=dict(color='blue'), line=dict(dash='dash'))
    )
fig.add_trace(
    go.Scatter(x=np.arange(T_test), y=I, mode='lines', name='I Real', marker=dict(color='red'), line=dict(dash='dash'))
    )
fig.add_trace(
    go.Scatter(x=np.arange(T_test), y=R, mode='lines', name='R Real', marker=dict(color='orange'), line=dict(dash='dash'))
    )

fig.update_layout(
    title=f"Resultado red sin información de la física del problema",
    xaxis_title="Tiempo",
    yaxis_title="Población normalizada")

fig.show()

Ahora haremos exactamente lo mismo, pero ahora incluiremos en la pérdida un término que minimice el error de aproximar la ecuación diferencial.

In [None]:
# Generamos datos sintéticos
T_data, T_phys = 10, 30
t_obs = np.arange(T_data)
S_obs, I_obs, R_obs = SIR_model.simulate(S0, I0, R0, T=T_data)

# Puntos de colocación
t_phys = np.arange(0, T_phys, 2)

# Convertimos a tensores
t_data_tensor = torch.tensor(t_obs, dtype=torch.float32).view(-1, 1)
S_tensor = torch.tensor(S_obs, dtype=torch.float32).view(-1, 1)
I_tensor = torch.tensor(I_obs, dtype=torch.float32).view(-1, 1)
R_tensor = torch.tensor(R_obs, dtype=torch.float32).view(-1, 1)

# Convertimos a tensores
t_data_tensor = torch.tensor(t_obs, dtype=torch.float32).view(-1, 1)
t_phys_tensor = torch.tensor(t_phys, dtype=torch.float32).view(-1, 1)
S_tensor = torch.tensor(S_obs, dtype=torch.float32).view(-1, 1)
I_tensor = torch.tensor(I_obs, dtype=torch.float32).view(-1, 1)
R_tensor = torch.tensor(R_obs, dtype=torch.float32).view(-1, 1)

# Definimos la pérdida total, ahora incluyendo los términos de la dinámica
def loss_function(model, t_phys, t_data, S_data, I_data, R_data):
    t_data.requires_grad_(True)
    t_phys.requires_grad_(True)
    SIR_pred_data = model(t_data)
    SIR_pred_phys = model(t_phys)
    S_pred_data, I_pred_data, R_pred_data = SIR_pred_data[:, 0:1], SIR_pred_data[:, 1:2], SIR_pred_data[:, 2:3]
    S_pred_phys, I_pred_phys, R_pred_phys = SIR_pred_phys[:, 0:1], SIR_pred_phys[:, 1:2], SIR_pred_phys[:, 2:3]

    # Derivadas temporales
    dS_dt = torch.autograd.grad(S_pred_phys, t_phys, grad_outputs=torch.ones_like(S_pred_phys), create_graph=True)[0]
    dI_dt = torch.autograd.grad(I_pred_phys, t_phys, grad_outputs=torch.ones_like(I_pred_phys), create_graph=True)[0]
    dR_dt = torch.autograd.grad(R_pred_phys, t_phys, grad_outputs=torch.ones_like(R_pred_phys), create_graph=True)[0]

    # Ecuaciones diferenciales
    beta = true_beta
    gamma = true_gamma
    eq1 = dS_dt + beta * S_pred_phys * I_pred_phys
    eq2 = dI_dt - beta * S_pred_phys * I_pred_phys + gamma * I_pred_phys
    eq3 = dR_dt - gamma * I_pred_phys

    # Pérdida de las ecuaciones físicas
    physics_loss = torch.mean(eq1**2) + torch.mean(eq2**2) + torch.mean(eq3**2)

    # Pérdida de los datos observados
    data_loss = torch.mean((S_pred_data - S_data)**2) + torch.mean((I_pred_data - I_data)**2) + torch.mean((R_pred_data - R_data)**2)

    # Ponderadores de las pérdidas
    w_phys = 0.5
    w_data = 0.5

    return w_phys*physics_loss + w_data*data_loss, physics_loss, data_loss

# Instanciamos la clase
wide, deep = 200, 1
model = PINN(wide, deep)

# Entrenamiento
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

epochs = 5000
for epoch in range(epochs):
    optimizer.zero_grad()
    loss, phys_loss, data_loss = loss_function(model, t_phys_tensor, t_data_tensor, S_tensor, I_tensor, R_tensor)
    loss.backward()
    optimizer.step()

    if epoch % 500 == 0:
        print(f"Epoch {epoch}, Loss: {loss.item():.6f}")
        print(f"Loss Physics: {phys_loss.item():.6f}")
        print(f"Loss Data: {data_loss.item():.6f}")
        print("")

In [None]:
# Visualización de resultados junto con resultados de testeo
T_test = 30

t_test = torch.linspace(0, T_test-1, T_test).view(-1, 1)
SIR_pred = model(t_test).detach().numpy()

S, I, R = SIR_model.simulate(S0, I0, R0, T=T_test)

# Gráfico con plotly
fig = go.Figure()

# Observaciones
fig.add_trace(
    go.Scatter(
        x=t_obs, y=S_obs, mode='markers', name='S Observado', marker=dict(color='blue', opacity=0.9))
    )
fig.add_trace(
    go.Scatter(
        x=t_obs, y=I_obs, mode='markers', name='I Observado', marker=dict(color='red'))
    )
fig.add_trace(
    go.Scatter(
        x=t_obs, y=R_obs, mode='markers', name='R Observado', marker=dict(color='orange'))
    )

# Predicción
fig.add_trace(
    go.Scatter(
        x=t_test.numpy().squeeze(), y=SIR_pred[:, 0], mode='lines', name='S Predicho', marker=dict(color='blue'))
    )
fig.add_trace(
    go.Scatter(
        x=t_test.numpy().squeeze(), y=SIR_pred[:, 1], mode='lines', name='I Predicho', marker=dict(color='red'))
    )
fig.add_trace(
    go.Scatter(
        x=t_test.numpy().squeeze(), y=SIR_pred[:, 2], mode='lines', name='R Predicho', marker=dict(color='orange'))
    )

# Real
fig.add_trace(
    go.Scatter(x=np.arange(T_test), y=S, mode='lines', name='S Real', marker=dict(color='blue'), line=dict(dash='dash'))
    )
fig.add_trace(
    go.Scatter(x=np.arange(T_test), y=I, mode='lines', name='I Real', marker=dict(color='red'), line=dict(dash='dash'))
    )
fig.add_trace(
    go.Scatter(x=np.arange(T_test), y=R, mode='lines', name='R Real', marker=dict(color='orange'), line=dict(dash='dash'))
    )

fig.update_layout(
    title=f"Resultado red con información de la física del problema",
    xaxis_title="Tiempo",
    yaxis_title="Población normalizada")

fig.show()

¡Vemos que el resultado de generalizar es mucho mejor que solo con los datos!

Pero podemos aprovecharnos de que tenemos física incluida en la función de pérdida, ya que ahora podemos estimar los parámetros $\beta$ y $\gamma$ del modelo SIR en base a datos. Esto dará paso a en realidad un modelo que ahora podemos simular con un solver de EDO, por ejemplo, también pueden ser parámetros que tienen un valor tangible, como el número básico reproductivo en epidemiología.

Esto se puede hacer en PyTorch de manera muy sencilla, agregándolos como parámetros sujetos a aprendizaje.

In [None]:
# Una clase que creará una red neuronal del ancho y profundida que le demos
class PINN_params(nn.Module):
    def __init__(self, wide, deep):
        super(PINN_params, self).__init__()
        self.wide = wide # Ancho
        self.deep = deep # Profundidad
        # Capa de entrada
        self.layers = nn.ModuleList([nn.Linear(n_in, self.wide)])
        # Capas ocultas
        self.layers.extend([nn.Linear(self.wide, self.wide) for _ in range(self.deep - 1)])
        # Capa de salida
        self.layers.append(nn.Linear(self.wide, n_out))
        # Función de activación
        self.activation = nn.Sigmoid()

        # Parámetros a aprender
        self.beta = nn.Parameter(torch.tensor(0.1, dtype=torch.float32))  # Valor inicial para beta
        self.gamma = nn.Parameter(torch.tensor(0.1, dtype=torch.float32))  # Valor inicial para gamma

    # Forward pass
    def forward(self, x):
        for layer in self.layers[:-1]:
            x = self.activation(layer(x))
        return self.layers[-1](x)

Notar que hay que colocar una restricción a los parámetros en este modelo, y es que $\beta$ y $\gamma$ deben ser positivos para que todo tenga sentido. Eso se logra creando un Clipper para aplicarlo a la red.

In [None]:
class WeightClipper(object):

    def __init__(self, frequency=5):
        self.frequency = frequency

    def __call__(self, module):
        # filter the variables to get the ones you want
        if hasattr(module, 'weight'):
            w = module.weight.data
            w = w.clamp(0,10)

Obtendremos los datos desde un archivo de datos en el GitHub del curso.

In [None]:
# Clonamos el GitHub del curso
!git clone https://github.com/diegoolguinw/Math_Behind_Magic.git

In [None]:
# Abrimos el archivo de datos
data = pd.read_csv('Math_Behind_Magic/Data/epi_data.csv')
data.head()

In [None]:
# Extraemos los arreglos
S_obs = data['Susceptibles'].values
I_obs = data['Infectados'].values
R_obs = data['Recuperados'].values

Supondremos que no nos llegan todos los datos a la vez, si no que nos van llegando por día, entonces tenemos que entrenar el model con una cierta cantidad de datos cada día. Haremos el setting inicial y luego vamos aplicando el entrenamiento día a día.

In [None]:
# Puntos de colocación
T_phys = 20
t_phys = np.arange(0, T_phys, 2)

# Convertimos a tensores
t_data_tensor = torch.tensor(t_obs, dtype=torch.float32).view(-1, 1)
t_phys_tensor = torch.tensor(t_phys, dtype=torch.float32).view(-1, 1)
S_tensor = torch.tensor(S_obs, dtype=torch.float32).view(-1, 1)
I_tensor = torch.tensor(I_obs, dtype=torch.float32).view(-1, 1)
R_tensor = torch.tensor(R_obs, dtype=torch.float32).view(-1, 1)

# Definimos la pérdida total, ahora incluyendo los términos de la dinámica
def loss_function(model, t_phys, t_data, S_data, I_data, R_data):
    t_data.requires_grad_(True)
    t_phys.requires_grad_(True)
    SIR_pred_data = model(t_data)
    SIR_pred_phys = model(t_phys)
    S_pred_data, I_pred_data, R_pred_data = SIR_pred_data[:, 0:1], SIR_pred_data[:, 1:2], SIR_pred_data[:, 2:3]
    S_pred_phys, I_pred_phys, R_pred_phys = SIR_pred_phys[:, 0:1], SIR_pred_phys[:, 1:2], SIR_pred_phys[:, 2:3]

    # Derivadas temporales
    dS_dt = torch.autograd.grad(S_pred_phys, t_phys, grad_outputs=torch.ones_like(S_pred_phys), create_graph=True)[0]
    dI_dt = torch.autograd.grad(I_pred_phys, t_phys, grad_outputs=torch.ones_like(I_pred_phys), create_graph=True)[0]
    dR_dt = torch.autograd.grad(R_pred_phys, t_phys, grad_outputs=torch.ones_like(R_pred_phys), create_graph=True)[0]

    # Solo cambian estas lineas al estimar parámetros!
    beta = model.beta
    gamma = model.gamma

    # Ecuaciones diferenciales
    eq1 = dS_dt + beta * S_pred_phys * I_pred_phys
    eq2 = dI_dt - beta * S_pred_phys * I_pred_phys + gamma * I_pred_phys
    eq3 = dR_dt - gamma * I_pred_phys

    # Pérdida de las ecuaciones físicas
    physics_loss = torch.mean(eq1**2) + torch.mean(eq2**2) + torch.mean(eq3**2)

    # Pérdida de los datos observados
    data_loss = torch.mean((S_pred_data - S_data)**2) + torch.mean((I_pred_data - I_data)**2) + torch.mean((R_pred_data - R_data)**2)

    # Ponderadores de las pérdidas
    w_phys = 0.5
    w_data = 0.5

    return w_phys*physics_loss + w_data*data_loss, physics_loss, data_loss

# Instanciamos la clase
wide, deep = 250, 1
model = PINN_params(wide, deep)

clipper = WeightClipper()
model.beta.register_hook(clipper)
model.gamma.register_hook(clipper)

# Entrenamiento
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Guardaremos los resultados por día
results_days = {}

In [None]:
# Entrenamiento con hasta el día 1
day = 1

# Tiempo en los datos
T_data = len(S_obs[:day])
t_obs = np.arange(T_data)

# Convertimos a tensores
t_data_tensor = torch.tensor(t_obs[:day], dtype=torch.float32).view(-1, 1)
S_tensor = torch.tensor(S_obs[:day], dtype=torch.float32).view(-1, 1)
I_tensor = torch.tensor(I_obs[:day], dtype=torch.float32).view(-1, 1)
R_tensor = torch.tensor(R_obs[:day], dtype=torch.float32).view(-1, 1)

# Épocas de entrenamiento
epochs = 10000

print(f"Entrenando en el día {day} con {epochs} épocas")
print("")

for epoch in range(epochs):
    optimizer.zero_grad()
    loss, phys_loss, data_loss = loss_function(model, t_phys_tensor, t_data_tensor, S_tensor, I_tensor, R_tensor)
    loss.backward()
    optimizer.step()

    if epoch % 500 == 0:
        print(f"Epoch {epoch}, Loss: {loss.item():.6f}")
        print(f"Loss Physics: {phys_loss.item():.6f}")
        print(f"Loss Data: {data_loss.item():.6f}")
        print("")

In [None]:
# Visualización de resultados junto con resultados de testeo
T_test = 20

# Tiempo de testeo
t_test = torch.linspace(0, T_test-1, T_test).view(-1, 1)

# Colocamos el modelo en evaluación
model.eval()
SIR_pred = model(t_test).detach().numpy()
results_days[day] = SIR_pred

# Gráfico con plotly
fig = go.Figure()

# Observaciones
fig.add_trace(
    go.Scatter(
        x=t_obs, y=S_obs, mode='markers', name='S Observado', marker=dict(color='blue', opacity=0.9))
    )
fig.add_trace(
    go.Scatter(
        x=t_obs, y=I_obs, mode='markers', name='I Observado', marker=dict(color='red'))
    )
fig.add_trace(
    go.Scatter(
        x=t_obs, y=R_obs, mode='markers', name='R Observado', marker=dict(color='orange'))
    )

# Predicción
fig.add_trace(
    go.Scatter(
        x=t_test.numpy().squeeze(), y=SIR_pred[:, 0], mode='lines', name='S Predicho', marker=dict(color='blue'))
    )
fig.add_trace(
    go.Scatter(
        x=t_test.numpy().squeeze(), y=SIR_pred[:, 1], mode='lines', name='I Predicho', marker=dict(color='red'))
    )
fig.add_trace(
    go.Scatter(
        x=t_test.numpy().squeeze(), y=SIR_pred[:, 2], mode='lines', name='R Predicho', marker=dict(color='orange'))
    )

fig.update_layout(
    title=f"Entrenamiento con datos hasta el día {day}",
    xaxis_title="Tiempo",
    yaxis_title="Población normalizada",
    )

fig.show()

In [None]:
# Entrenamiento con hasta el día 2
day = 2

# Tiempo en los datos
T_data = len(S_obs[:day])
t_obs = np.arange(T_data)

# Convertimos a tensores
t_data_tensor = torch.tensor(t_obs[:day], dtype=torch.float32).view(-1, 1)
S_tensor = torch.tensor(S_obs[:day], dtype=torch.float32).view(-1, 1)
I_tensor = torch.tensor(I_obs[:day], dtype=torch.float32).view(-1, 1)
R_tensor = torch.tensor(R_obs[:day], dtype=torch.float32).view(-1, 1)

# Épocas de entrenamiento
epochs = 2000

print(f"Entrenando en el día {day} con {epochs} épocas")
print("")

for epoch in range(epochs):
    optimizer.zero_grad()
    loss, phys_loss, data_loss = loss_function(model, t_phys_tensor, t_data_tensor, S_tensor, I_tensor, R_tensor)
    loss.backward()
    optimizer.step()

    if epoch % 500 == 0:
        print(f"Epoch {epoch}, Loss: {loss.item():.6f}")
        print(f"Loss Physics: {phys_loss.item():.6f}")
        print(f"Loss Data: {data_loss.item():.6f}")
        print("")

In [None]:
# Visualización de resultados junto con resultados de testeo
T_test = 20

# Tiempo de testeo
t_test = torch.linspace(0, T_test-1, T_test).view(-1, 1)

# Colocamos el modelo en evaluación
model.eval()
SIR_pred = model(t_test).detach().numpy()
results_days[day] = SIR_pred

# Gráfico con plotly
fig = go.Figure()

# Observaciones
fig.add_trace(
    go.Scatter(
        x=t_obs, y=S_obs, mode='markers', name='S Observado', marker=dict(color='blue', opacity=0.9))
    )
fig.add_trace(
    go.Scatter(
        x=t_obs, y=I_obs, mode='markers', name='I Observado', marker=dict(color='red'))
    )
fig.add_trace(
    go.Scatter(
        x=t_obs, y=R_obs, mode='markers', name='R Observado', marker=dict(color='orange'))
    )

# Predicción
fig.add_trace(
    go.Scatter(
        x=t_test.numpy().squeeze(), y=SIR_pred[:, 0], mode='lines', name='S Predicho', marker=dict(color='blue'))
    )
fig.add_trace(
    go.Scatter(
        x=t_test.numpy().squeeze(), y=SIR_pred[:, 1], mode='lines', name='I Predicho', marker=dict(color='red'))
    )
fig.add_trace(
    go.Scatter(
        x=t_test.numpy().squeeze(), y=SIR_pred[:, 2], mode='lines', name='R Predicho', marker=dict(color='orange'))
    )

fig.update_layout(
    title=f"Entrenamiento con datos hasta el día {day}",
    xaxis_title="Tiempo",
    yaxis_title="Población normalizada",
    )

fig.show()

In [None]:
# Entrenamiento con hasta el día 3
day = 3

# Tiempo en los datos
T_data = len(S_obs[:day])
t_obs = np.arange(T_data)

# Convertimos a tensores
t_data_tensor = torch.tensor(t_obs[:day], dtype=torch.float32).view(-1, 1)
S_tensor = torch.tensor(S_obs[:day], dtype=torch.float32).view(-1, 1)
I_tensor = torch.tensor(I_obs[:day], dtype=torch.float32).view(-1, 1)
R_tensor = torch.tensor(R_obs[:day], dtype=torch.float32).view(-1, 1)

# Épocas de entrenamiento
epochs = 2000

print(f"Entrenando en el día {day} con {epochs} épocas")
print("")

for epoch in range(epochs):
    optimizer.zero_grad()
    loss, phys_loss, data_loss = loss_function(model, t_phys_tensor, t_data_tensor, S_tensor, I_tensor, R_tensor)
    loss.backward()
    optimizer.step()

    if epoch % 500 == 0:
        print(f"Epoch {epoch}, Loss: {loss.item():.6f}")
        print(f"Loss Physics: {phys_loss.item():.6f}")
        print(f"Loss Data: {data_loss.item():.6f}")
        print("")

In [None]:
# Visualización de resultados junto con resultados de testeo
T_test = 20

# Tiempo de testeo
t_test = torch.linspace(0, T_test-1, T_test).view(-1, 1)

# Colocamos el modelo en evaluación
model.eval()
SIR_pred = model(t_test).detach().numpy()
results_days[day] = SIR_pred

# Gráfico con plotly
fig = go.Figure()

# Observaciones
fig.add_trace(
    go.Scatter(
        x=t_obs, y=S_obs, mode='markers', name='S Observado', marker=dict(color='blue', opacity=0.9))
    )
fig.add_trace(
    go.Scatter(
        x=t_obs, y=I_obs, mode='markers', name='I Observado', marker=dict(color='red'))
    )
fig.add_trace(
    go.Scatter(
        x=t_obs, y=R_obs, mode='markers', name='R Observado', marker=dict(color='orange'))
    )

# Predicción
fig.add_trace(
    go.Scatter(
        x=t_test.numpy().squeeze(), y=SIR_pred[:, 0], mode='lines', name='S Predicho', marker=dict(color='blue'))
    )
fig.add_trace(
    go.Scatter(
        x=t_test.numpy().squeeze(), y=SIR_pred[:, 1], mode='lines', name='I Predicho', marker=dict(color='red'))
    )
fig.add_trace(
    go.Scatter(
        x=t_test.numpy().squeeze(), y=SIR_pred[:, 2], mode='lines', name='R Predicho', marker=dict(color='orange'))
    )

fig.update_layout(
    title=f"Entrenamiento con datos hasta el día {day}",
    xaxis_title="Tiempo",
    yaxis_title="Población normalizada",
    )

fig.show()

In [23]:
# Entrenamiento con hasta el día 4
day = 4

# Tiempo en los datos
T_data = len(S_obs[:day])
t_obs = np.arange(T_data)

# Convertimos a tensores
t_data_tensor = torch.tensor(t_obs[:day], dtype=torch.float32).view(-1, 1)
S_tensor = torch.tensor(S_obs[:day], dtype=torch.float32).view(-1, 1)
I_tensor = torch.tensor(I_obs[:day], dtype=torch.float32).view(-1, 1)
R_tensor = torch.tensor(R_obs[:day], dtype=torch.float32).view(-1, 1)

# Épocas de entrenamiento
epochs = 5000

print(f"Entrenando en el día {day} con {epochs} épocas")
print("")

for epoch in range(epochs):
    optimizer.zero_grad()
    loss, phys_loss, data_loss = loss_function(model, t_phys_tensor, t_data_tensor, S_tensor, I_tensor, R_tensor)
    loss.backward()
    optimizer.step()

    if epoch % 500 == 0:
        print(f"Epoch {epoch}, Loss: {loss.item():.6f}")
        print(f"Loss Physics: {phys_loss.item():.6f}")
        print(f"Loss Data: {data_loss.item():.6f}")
        print("")

Entrenando en el día 4 con 5000 épocas

Epoch 0, Loss: 0.000975
Loss Physics: 0.000015
Loss Data: 0.001935

Epoch 500, Loss: 0.000114
Loss Physics: 0.000031
Loss Data: 0.000198

Epoch 1000, Loss: 0.000073
Loss Physics: 0.000024
Loss Data: 0.000121

Epoch 1500, Loss: 0.000061
Loss Physics: 0.000031
Loss Data: 0.000091

Epoch 2000, Loss: 0.000051
Loss Physics: 0.000018
Loss Data: 0.000084

Epoch 2500, Loss: 0.000018
Loss Physics: 0.000011
Loss Data: 0.000026

Epoch 3000, Loss: 0.000450
Loss Physics: 0.000252
Loss Data: 0.000648

Epoch 3500, Loss: 0.000024
Loss Physics: 0.000010
Loss Data: 0.000038

Epoch 4000, Loss: 0.000005
Loss Physics: 0.000003
Loss Data: 0.000006

Epoch 4500, Loss: 0.000030
Loss Physics: 0.000016
Loss Data: 0.000045



In [24]:
# Visualización de resultados junto con resultados de testeo
T_test = 20

# Tiempo de testeo
t_test = torch.linspace(0, T_test-1, T_test).view(-1, 1)

# Colocamos el modelo en evaluación
model.eval()
SIR_pred = model(t_test).detach().numpy()
results_days[day] = SIR_pred

# Gráfico con plotly
fig = go.Figure()

# Observaciones
fig.add_trace(
    go.Scatter(
        x=t_obs, y=S_obs, mode='markers', name='S Observado', marker=dict(color='blue', opacity=0.9))
    )
fig.add_trace(
    go.Scatter(
        x=t_obs, y=I_obs, mode='markers', name='I Observado', marker=dict(color='red'))
    )
fig.add_trace(
    go.Scatter(
        x=t_obs, y=R_obs, mode='markers', name='R Observado', marker=dict(color='orange'))
    )

# Predicción
fig.add_trace(
    go.Scatter(
        x=t_test.numpy().squeeze(), y=SIR_pred[:, 0], mode='lines', name='S Predicho', marker=dict(color='blue'))
    )
fig.add_trace(
    go.Scatter(
        x=t_test.numpy().squeeze(), y=SIR_pred[:, 1], mode='lines', name='I Predicho', marker=dict(color='red'))
    )
fig.add_trace(
    go.Scatter(
        x=t_test.numpy().squeeze(), y=SIR_pred[:, 2], mode='lines', name='R Predicho', marker=dict(color='orange'))
    )

fig.update_layout(
    title=f"Entrenamiento con datos hasta el día {day}",
    xaxis_title="Tiempo",
    yaxis_title="Población normalizada",
    )

fig.show()

In [25]:
# Comparamos los resultados por día

fig = go.Figure()
for day in results_days.keys():

    SIR_pred = results_days[day]

    # Predicción
    fig.add_trace(
        go.Scatter(
            x=t_test.squeeze(), y=SIR_pred[:,0], mode='lines', name=f'S Datos al día {day}', marker=dict(color='blue', opacity=0.9))
    )

    fig.add_trace(
        go.Scatter(
            x=t_test.squeeze(), y=SIR_pred[:,1], mode='lines', name=f'I Datos al día {day}', marker=dict(color='red'))
    )

    fig.add_trace(
        go.Scatter(
            x=t_test.squeeze(), y=SIR_pred[:,2], mode='lines', name=f'R Datos al día {day}', marker=dict(color='orange'))
    )

# Observaciones
fig.add_trace(
    go.Scatter(
        x=t_obs, y=S_obs, mode='markers', name='S Observado', marker=dict(color='blue', opacity=0.9))
)
fig.add_trace(
    go.Scatter(
        x=t_obs, y=I_obs, mode='markers', name='I Observado', marker=dict(color='red'))
)
fig.add_trace(
    go.Scatter(
        x=t_obs, y=R_obs, mode='markers', name='R Observado', marker=dict(color='orange'))
)

fig.update_layout(
    title=f"Comparación de resultados por día",
    xaxis_title="Tiempo",
    yaxis_title="Población normalizada")

fig.show()

Podemos ver el resultado de estimación de parámetros.

In [26]:
# Extraemos los valores de beta y gamma
beta_pred = model.beta.item()
gamma_pred = model.gamma.item()

# Valores verdaderos de beta y gamma utilizados
beta_true = 1.3
gamma_true = 0.4

# Printeamos
print(f"Beta Predicho: {beta_pred}")
print(f"Gamma Predicho: {gamma_pred}")
print("")
print(f"Beta Real: {beta_true}")
print(f"Gamma Real: {gamma_true}")

Beta Predicho: 1.3088319301605225
Gamma Predicho: 0.3205893635749817

Beta Real: 1.3
Gamma Real: 0.4


In [27]:
# Resolvemos con solve_ivp de Scipy
from scipy.integrate import solve_ivp

# Condición inicial
S0, I0, R0 = S_obs[0], I_obs[0], R_obs[0]

# Tiempo a evaluar
t_test = np.linspace(0, T_test-1, T_test)

def step_pred(t, y):
    return np.array([
        -beta_pred * y[0] * y[1],
        beta_pred * y[0] * y[1] - gamma_pred * y[1],
        gamma_pred * y[1]])

def step_true(t, y):
    return np.array([
        -beta_true * y[0] * y[1],
        beta_true * y[0] * y[1] - gamma_true * y[1],
        gamma_true * y[1]])


sol_pred = solve_ivp(step_pred, [0, T_test], [S0, I0, R0], t_eval=t_test)
sol_true = solve_ivp(step_true, [0, T_test], [S0, I0, R0], t_eval=t_test)

# Gráfico con Plotly
fig = go.Figure()

fig.add_trace(
    go.Scatter(
        x=t_test, y=sol_pred.y[0], mode='lines', name='S Predicho', marker=dict(color='blue'))
)

fig.add_trace(
    go.Scatter(
        x=t_test, y=sol_pred.y[1], mode='lines', name='I Predicho', marker=dict(color='red'))
)

fig.add_trace(
    go.Scatter(
        x=t_test, y=sol_pred.y[2], mode='lines', name='R Predicho', marker=dict(color='orange'))
)

fig.add_trace(
    go.Scatter(
        x=t_test, y=sol_true.y[0], mode='lines', name='S Real', marker=dict(color='blue'), line=dict(dash='dash'))
)

fig.add_trace(
    go.Scatter(
        x=t_test, y=sol_true.y[1], mode='lines', name='I Real', marker=dict(color='red'), line=dict(dash='dash'))
)

fig.add_trace(
    go.Scatter(
        x=t_test, y=sol_true.y[2], mode='lines', name='R Real', marker=dict(color='orange'), line=dict(dash='dash'))
)

fig.update_layout(
    title=f"Comparación de resultados",
    xaxis_title="Tiempo",
    yaxis_title="Población normalizada")

fig.show()

Incluso, podemos guardar el modelo para exportarlo y después guardarlo, por si viene otra pandemia en el futuro, para tener un red pre-entrenada!

In [28]:
# Guardamos el modelo
torch.save(model.state_dict(), "SIR_Net.pt")

In [30]:
# Y podemos cargarlo desde un GitHub
model_PATH = 'Math_Behind_Magic/Data/SIR_Net.pt'
model = PINN_params(wide, deep)
model.load_state_dict(torch.load(model_PATH, weights_only=True))

<All keys matched successfully>

## **Parte 2: Principio de Continuación Única con PINNs**

Para este ejemplo utilizaremos la librería [DeepXDE](https://deepxde.readthedocs.io/en/latest/), que facilitará la implementación de este problema al tener que involucrarnos con geometrías y EDP. Ocuparemos como dominio un cuadrado perforado y trataremos de aproximar la solución de una ecuación de Laplace.

In [None]:
# Instalar DeepXDE
try:
    import deepxde as dde
except:
    !pip install deepxde
    import deepxde as dde

In [None]:
# Solo porque DeepXDE utiliza por defecto TensorFlow
# lo importaremos, solo para este ejemplo!
import tensorflow as tf

In [None]:
# Parámetros
dim = 2

precision_train = 15
precision_test = 30

hx_train = 1 / precision_train
nx_train = int(1 / hx_train)

hx_test = 1 / precision_test
nx_test = int(1 / hx_test)

# Pesos de las pérdidas
weight_inner = 1
weight_outer = 1
weight_known = 1

iterations = 10000
learning_rate = 1e-3
num_dense_layers = 3
num_dense_nodes = 350
activation = "tanh"

In [None]:
# Parámetros de la geometría
n = 1
length = 1
R = 1 / 4

# Creación de la geometría
outer = dde.geometry.Rectangle([-length / 2, -length / 2], [length / 2, length / 2])
inner = dde.geometry.Disk([0, 0], R)

# Fronteras de la geometría
def boundary_outer(x, on_boundary):
    return on_boundary and outer.on_boundary(x)


def boundary_inner(x, on_boundary):
    return on_boundary and inner.on_boundary(x)


def boundary(_, on_boundary):
    return on_boundary

# Geometry as a difference
geom = outer - inner

In [None]:
# Máscara del dominio
def mask(x):
    return (x[0] < 0)*(x[1] < 0)


# Número de puntos considerados
meas_ratio = 1/4
sample_points = 20

points_int = geom.random_points(sample_points, ).T
points_bound = geom.random_boundary_points(1000).T

# Create the masked array
ma = mask(points_int)

masked_points = points_int.T[ma].T
obt_points = int(len(masked_points.T))
print("Número de puntos efectivo en el dominio: {}".format(obt_points))

In [None]:
# Gráfico con Plotly
fig = go.Figure()

fig.add_trace(
    go.Scatter(
        x=masked_points[0], y=masked_points[1], mode='markers', name='Puntos de solución conocida', marker=dict(color='blue', opacity=0.9))
    )

fig.add_trace(
    go.Scatter(
        x=points_bound[0], y=points_bound[1], mode='markers', name='Puntos de frontera', marker=dict(color='red', opacity=0.6))
    )

fig.update_layout(
    title=f"Puntos de dominio: {len(masked_points[0])}",
    height=700,
    width=900)

In [None]:
# Función conocida
def func_known(x):
    return np.sin(x[:,0:1]**2 + x[:,1:2]**2)

known_points = masked_points
known_u = func_known(masked_points.T)

# Establecer los puntos conocidos
observe_u = dde.icbc.PointSetBC(known_points.T, known_u)

# Se deja como una "condición de borde"
bcs = [observe_u]

In [None]:
# Matriz que representa al Laplaciano
def A(x1, x2):
    a11 = 1. + x1*0 + x2*0
    a12 = 0. + x1*0 + x2*0
    a21 = 0. + x1*0 + x2*0
    a22 = 1. + x1*0 + x2*0

    return [
        [a11, a12],
        [a21, a22]
    ]

# Lado derecho de la ecuación
def f(x, y):
    return -4*(-tf.cos(x[:,0:1]**2 + x[:,1:2]**2) + (x[:,0:1]**2 + x[:,1:2]**2)*tf.sin(x[:,0:1]**2 + x[:,1:2]**2))

In [None]:
# EDP
def pde(x, y):
    Dy = [dde.grad.jacobian(y, x, j=j) for j in range(dim)]
    D2y = []
    for i in range(dim):
        D2y.append([dde.grad.hessian(y, x, i=i, j=j) for j in range(dim)])

    A_eval = A(x[:,0:1], x[:,1:2])

    result = 0

    for i in range(dim):
        for j in range(dim):
            result += A_eval[i][j]*D2y[i][j]

    return result - f(x, y)

In [None]:
# Instanciamos la red neuronal y creamos el modelo de EDP

data = dde.data.PDE(
    geom,
    pde,
    bcs,
    num_domain=nx_train**2,
    num_boundary=16 * nx_train
)

net = dde.nn.FNN(
    [dim] + [num_dense_nodes] * num_dense_layers + [1], activation, "Glorot uniform"
)

model = dde.Model(data, net)

In [None]:
# Entrenamiento

loss_weights = [1, weight_known] # Seteamos como 1 el peso de la colocación

model.compile(
    "adam", lr=learning_rate, loss_weights=loss_weights
)

losshistory, train_state = model.train(iterations=iterations)
dde.saveplot(losshistory, train_state, issave=True, isplot=True)

In [None]:
# Librerías necesarias para este gráfico
import matplotlib.tri as tri
from mpl_toolkits.axes_grid1 import make_axes_locatable
import matplotlib.cm as cm
from functools import reduce

# Números de puntos para el gráfico
Nx = 60
Ny = Nx

# Puntos de grilla
xmin, xmax, ymin, ymax = [-length / 2, length / 2, -length / 2, length / 2]

# Predicción del modelo
x, y = np.linspace(xmin, xmax, Nx), np.linspace(ymin, ymax, Nx)
arrays = [np.vstack((x,np.ones(Nx)*y[i])) for i in range(Ny)]
X = np.hstack(arrays)
u = model.predict(X.T)

# Máscaras y triangulaciones
dom_mask = geom.inside(X.T)

X_dom = X.T[dom_mask]
x_dom, y_dom = X_dom[:,0], X_dom[:,1]
u_dom = u[dom_mask].reshape(len(X_dom[:,0]))

exact_dom = func_known(X.T)[dom_mask].flatten()

triang = tri.Triangulation(x_dom, y_dom)
x2 = x_dom[triang.triangles].mean(axis=1)
y2 = y_dom[triang.triangles].mean(axis=1)

dom_mask2 = geom.inside((np.vstack((x2, y2)).T))

triang.set_mask(~dom_mask2)

# Gráfico
fig, ax = plt.subplots(1, 3, figsize=(18,5))

c1 = ax[0].tricontourf(triang, u_dom, levels=20)
ax[0].set_xlabel("x")
ax[0].set_ylabel("y")
ax[0].set_title("PINN")

divider = make_axes_locatable(ax[0])
cax = divider.append_axes('right', size='3%', pad=0.02)
fig.colorbar(c1, cax=cax, orientation='vertical')

c2 = ax[1].tricontourf(triang, exact_dom, levels=20)
ax[1].set_xlabel("x")
#ax[1].set_ylabel("y")
ax[1].set_title("Exacta")

divider = make_axes_locatable(ax[1])
cax = divider.append_axes('right', size='3%', pad=0.02)
fig.colorbar(c2, cax=cax, orientation='vertical')

c3 = ax[2].tricontourf(triang, np.abs(u_dom-exact_dom), levels=20)
ax[2].scatter(known_points[0], known_points[1], s=10., c="red")
ax[2].set_xlabel("x")
ax[2].set_title("Error absoluto")

divider = make_axes_locatable(ax[2])
cax = divider.append_axes('right', size='3%', pad=0.02)
fig.colorbar(c3, cax=cax, orientation='vertical')

plt.show()

In [None]:
# Gráfico con Plotly

fig = make_subplots(
    rows=1, cols=3,
    subplot_titles=("PINN", "Exacta", "Error absoluto"),
    specs=[[{"type": "surface"}, {"type": "surface"}, {"type": "surface"}]])

c1 = go.Mesh3d(x=x_dom, y=y_dom, z=u_dom, intensity=u_dom, name='PINN', showscale=False)

c2 = go.Mesh3d(x=x_dom, y=y_dom, z=exact_dom, intensity=exact_dom, name='Exacta', showscale=False)

c3 = go.Mesh3d(x=x_dom, y=y_dom, z=np.abs(u_dom-exact_dom), intensity=np.abs(u_dom-exact_dom), name='Error absoluto', showscale=True)

fig.add_trace(c1, row=1, col=1)
fig.add_trace(c2, row=1, col=2)
fig.add_trace(c3, row=1, col=3)

fig

## **Parte 3: DeepONets**

Veremos una extensión de las PINNs, que son las DeepONets (Deep Operator Networks). Para ello utilizaremos DeepXDE, que implementa las DeepONets tal como el [artículo original](https://arxiv.org/abs/1910.03193).

Aplicaremos las DeepONets al problema de aprender el operador $G: C([0, 1]) \times [0, 1] \to \mathbb{R}$ definido por

$$ G(u, x) = \int_0^x u(s) ds $$

In [None]:
# Importaremos otras librerías solo para este ejemplo
import scipy.stats as stats
from scipy.integrate import cumulative_trapezoid
from sklearn.gaussian_process.kernels import RBF

Ahora debemos generar datos para poder entrenar la red, es decir, necesitamos funciones $u \in C([0,1])$ dadas para poder crear otras funciones $v$ definidas por

$$ v(x) = G(u,y) = \int_0^x u(s) ds $$

En otras palabras, necesitamos la función a integrar y el valor de la integral en cada punto. ¿Y cómo podemos generar muchas funciones de manera rápida? La solución más eficiente y que no tiene un sesgo significativo respecto a la forma de la función es samplear una función desde un proceso gaussiano.

Esto es lo siguiente, si tengo una discretización $X = \{ x_i \}_{i=1}^N$ del intervalo $[0, 1]$, entonces una función $u$ es sampleada desde un proceso gaussiano representado por los $X$ si

$$ u(X) \sim N( 0_{N}, k(X, X)) $$

Donde $k$ es lo que se llama una función *kernel*, en este caso $k: [0, 1] \times [0, 1] \to \mathbb{R}$ dada por

$$ k(x, y) = \text{exp} (- |x-y|^2/(2 \sigma^2)) $$

Que se denota por *kernel* gaussiano o RBF. Por tanto $k(X, X)$ actúa como matriz de covarianzas

$$ k(X, X)_{ij} = k(x_i, x_j) $$

Con esto veamos como samplear funciones aleatorias con Python.

In [None]:
# Sampleo desde un proceso gaussiano
Npoints = 100  # Cantidad de puntos en cada trayectoria
Ntraj_train = 150  # Cantidad de trayectorias de entrenamiento
Ntraj_test = 300  # Cantidad de trayectorias de testeo

# Puntos a samplear en [0, 1]
x0, xf = 0, 1
xdim = 1

# Arreglo sin expandir
x = np.linspace(0, 1, Npoints)

# Arreglo expandido
X = np.expand_dims(np.linspace(x0, xf, Npoints), xdim)

Utilizamos el kernel gaussiano creado por Sklearn y creamos la matriz de covarianzas.

In [None]:
# Kernel gaussiano
kernel = RBF(0.25)

# Matriz de covarianzas dada por el kernel
Sigma = kernel.__call__(X, X)

Veamos como se ve el kernel en el intervalo $[0,1]$.

In [None]:
# Punto fijo con respecto a que calcular el kernel, solo para visualizar
center = 0.5

# Imagen del kernel a través del punto fijo y todo el intervalo [0, 1]
ker_img = kernel.__call__(np.array([[center]]), X).reshape(len(x))

# Gráfico con Plotly
fig = go.Figure()
fig.add_trace(go.Scatter(x=x, y=ker_img, mode='lines'))
fig.update_layout(
    title=f"Visualización del kernel con un punto fijo",
    xaxis_title="x",
    yaxis_title=r"$k({}, x)$".format(center))
fig.show()

Veamos como se ve la matriz de covarianzas.

In [None]:
# Gráfico
X1, X2 = np.meshgrid(x, x)

fig = go.Figure(data=go.Contour(z=Sigma, x=x, y=x))
fig.update_layout(
    title=r"$k(x,x)$",
    xaxis_title="x",
    yaxis_title="x")
fig.show()

Ahora generaremos las funciones (o trayectorias) sampleadas desde un proceso gaussiano.

In [None]:
# Sampleo desde una normal multivariada para simular el proceso

# Entrenamiento
gpr_train = np.random.multivariate_normal(
    mean=np.zeros(Npoints),
    cov=Sigma,
    size=Ntraj_train)

# Testeo
gpr_test = np.random.multivariate_normal(
    mean=np.zeros(Npoints),
    cov=Sigma,
    size=Ntraj_test)

Veamos una de las trayectorias sampleadas.

In [None]:
# Gráfico con Plotly
fig = go.Figure()
fig.add_trace(go.Scatter(x=x, y=gpr_train[0], mode='lines', line=dict(dash='dashdot')))
fig.update_layout(
    title=f"Visualización de una trayectoria sampleada",
    xaxis_title="x",
    yaxis_title="y")
fig.show()

Ahora graficamos todas las trayectorias que se usarán para entrenar a la red.

In [None]:
# Gráfico
fig = go.Figure()
for i in range(Ntraj_train):
    fig.add_trace(
        go.Scatter(x=x, y=gpr_train[i], mode='lines', line=dict(dash='dashdot'), name=f'Trayectoria {i+1}'))
fig.update_layout(
    title=f"Visualización de las {Ntraj_train} trayectorias sampleadas",
    xaxis_title="x",
    yaxis_title="y")
fig.show()

Después calculamos las integrales en todos los puntos de la discretización, para todas las trayectorias.

In [None]:
# Trayectorias integradas, entrenamiento
int_traj_train = cumulative_trapezoid(gpr_train, x, axis=1, initial=0)

# Trayectorias integradas, testeo
int_traj_test = cumulative_trapezoid(gpr_test, x, axis=1, initial=0)

In [None]:
# Dataset para entrenar
X_train = (gpr_train, X)
y_train = int_traj_train

# Dataset para testear
X_test = (gpr_test, X)
y_test = int_traj_test

# Juntamos todos los dataset en un formato que DeepXDE dispone para las DeepONets
data = dde.data.TripleCartesianProd(
    X_train=X_train, y_train=y_train, X_test=X_test, y_test=y_test
)

In [None]:
# Armamos la red

# Dimensión para la branch net
n_branch = 20

# Dimensión para la trunk net
n_trunk = 20

# Red con Tanh e iniciación Glorot
net = dde.nn.DeepONetCartesianProd(
    [Npoints, n_branch, n_branch],
    [xdim, n_trunk, n_trunk],
    "tanh",
    "Glorot normal",
)

# Modelo
model = dde.Model(data, net)

# Compilación y entrenamiento
model.compile("adam", lr=0.001, metrics=["mean l2 relative error"])
losshistory, train_state = model.train(iterations=25000)

# Plot the loss trajectory
dde.utils.plot_loss_history(losshistory)
plt.show()

Ahora testeamos con funciones coseno de distinta frecuencia.

In [None]:
# Testeamos con una función conocida
f = lambda x, w: np.cos(w*x)
f_int = lambda x, w: np.sin(w*x)/w

In [None]:
# Cantidad de funciones a testear
n_test = 5
fun_test = np.zeros((n_test, Npoints))
fun_test_int = np.zeros((n_test, Npoints))

# Frecuencias a probar
omegas = np.linspace(1, 2*np.pi, n_test)

# Se calcula cada una
for i in range(n_test):
    fun_test[i] = f(x, omegas[i])
    fun_test_int[i] = f_int(x, omegas[i])

In [None]:
# Arreglo disminuido para poder evaluar
x_arr = np.expand_dims(x, axis=-1)

# Predicción del modelo
y_pred = model.predict((fun_test, x_arr)).reshape((n_test, Npoints))

In [None]:
# Gráfico
fig = go.Figure()
for i in range(n_test):
    fig.add_trace(
        go.Scatter(x=x,
                   y=y_pred[i],
                   mode='lines',
                   name=r"NN omega = {}".format(round(omegas[i], 2))))

    fig.add_trace(
        go.Scatter(x=x,
                   y=fun_test_int[i],
                   mode='lines',
                   name=r"Real omega = {}".format(round(omegas[i], 2))))

fig.update_layout(
    title=f"Visualización de las predicciones de la red",
    xaxis_title="x",
    yaxis_title="y")

fig.show()

## **Parte 3: Generative Adversarial Networks**

In [None]:
# Configuración de hiperparámetros
latent_dim = 100  # Dimensión del espacio latente
data_dim = 1     # Dimensión de los datos (Gaussiana 1D)
batch_size = 64
num_epochs = 5000
learning_rate = 1e-3
n_neurons = 64

# Definición del Generador
class Generator(nn.Module):
    def __init__(self, latent_dim, data_dim):
        super(Generator, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(latent_dim, n_neurons),
            nn.ReLU(),
            nn.Linear(n_neurons, n_neurons),
            nn.ReLU(),
            nn.Linear(n_neurons, data_dim)
        )

    def forward(self, z):
        return self.model(z)

# Definición del Discriminador
class Discriminator(nn.Module):
    def __init__(self, data_dim):
        super(Discriminator, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(data_dim, n_neurons),
            nn.ReLU(),
            nn.Linear(n_neurons, n_neurons),
            nn.ReLU(),
            nn.Linear(n_neurons, 1),
            nn.Sigmoid()  # Predicción de probabilidad
        )

    def forward(self, x):
        return self.model(x)

# Inicialización de los modelos
generator = Generator(latent_dim, data_dim)
discriminator = Discriminator(data_dim)

# Optimizadores
optimizer_G = optim.Adam(generator.parameters(), lr=learning_rate)
optimizer_D = optim.Adam(discriminator.parameters(), lr=learning_rate)

# Función para generar datos reales (distribución Gaussiana)
def sample_real_data(batch_size):
    return torch.tensor(np.random.normal(0, 1, (batch_size, data_dim)), dtype=torch.float32)

# Función para generar ruido latente
def sample_latent_noise(batch_size, latent_dim):
    return torch.randn(batch_size, latent_dim)

# Data real, para comparar histogramas
real_data_sample = sample_real_data(1000).numpy().flatten()

# Configuración de la animación
frames = []

# Lista para los datos generados
generated_data_list = []

# Entrenamiento de la GAN
for epoch in range(num_epochs):
    # Entrenamiento del Discriminador
    optimizer_D.zero_grad()

    # Datos reales
    real_data = sample_real_data(batch_size)
    real_labels = torch.ones(batch_size, 1)

    # Datos generados
    latent_noise = sample_latent_noise(batch_size, latent_dim)
    fake_data = generator(latent_noise)
    fake_labels = torch.zeros(batch_size, 1)

    # Pérdida del Discriminador
    real_loss = nn.BCELoss()(discriminator(real_data), real_labels)
    fake_loss = nn.BCELoss()(discriminator(fake_data.detach()), fake_labels)
    d_loss = real_loss + fake_loss

    d_loss.backward()
    optimizer_D.step()

    # Entrenamiento del Generador
    optimizer_G.zero_grad()

    # Datos generados (sin detach)
    fake_data = generator(latent_noise)
    fake_labels = torch.ones(batch_size, 1)  # El Generador quiere engañar al Discriminador

    # Pérdida del Generador
    g_loss = nn.BCELoss()(discriminator(fake_data), fake_labels)

    g_loss.backward()
    optimizer_G.step()

    # Impresión de progreso
    if epoch % 500 == 0:
        print(f"Epoch [{epoch}/{num_epochs}] | D_loss: {d_loss.item():.4f} | G_loss: {g_loss.item():.4f}")

    # Capturar datos para la animación
    if epoch % 50 == 0:
        with torch.no_grad():
            latent_noise = sample_latent_noise(1000, latent_dim)
            generated_data = generator(latent_noise).numpy().flatten()

            # Crear un frame para la animación
            frame = go.Frame(
                data=[
                    go.Histogram(x=generated_data, nbinsx=50, opacity=0.7, histnorm='probability density', name="Datos generados"),
                    go.Histogram(x=real_data_sample, nbinsx=50, opacity=0.7, histnorm='probability density', name="Datos reales")
                ],
                name=f"Epoch {epoch}"
            )
            frames.append(frame)
            generated_data_list.append(generated_data)

# Crear la figura de animación
fig = go.Figure(
    data=[
        go.Histogram(x=[], nbinsx=50, opacity=0.7, histnorm='probability density', name="Datos generados"),
        go.Histogram(x=real_data_sample, nbinsx=50, opacity=0.7, histnorm='probability density', name="Datos reales")
    ],
    layout=go.Layout(
        title="Evolución de la distribución generada vs. datos reales",
        xaxis=dict(title="Valor"),
        yaxis=dict(title="Frecuencia"),
        updatemenus=[dict(
            type="buttons",
            buttons=[dict(label="Reproducir", method="animate", args=[None])]
        )]
    ),
    frames=frames
)


# Mostrar la animación
fig.show()

In [None]:
# Visualización de los datos generados
with torch.no_grad():
    latent_noise = sample_latent_noise(5000, latent_dim)
    generated_data = generator(latent_noise).numpy().squeeze()

fig = go.Figure()

fig.add_trace(go.Histogram(x=generated_data, histnorm='probability density', name='Datos generados'))
fig.add_trace(go.Histogram(x=real_data_sample, histnorm='probability density', name='Datos reales'))

fig.update_traces(opacity=0.7)
fig.update_layout(
    title="Distribución de datos generados vs. datos reales",
    xaxis_title="Valor",
    yaxis_title="Frecuencia")
fig.show()

¡Podemos hacerle un test de normalidad a las muestras! Esto es, hacer un test de hipótesis para poder ver que tan probable es que la muestra provenga de una gaussiana estándar. Esto se puede hacer con el test de Shapiro-Wilk de normalidad, en donde, si su estadístico se acerca a 1, entonces no se rechaza la hipótesis nula, que es que los datos no provengan de una normal estándar.

In [None]:
# Test de normalidad
from scipy.stats import shapiro

# Testeamos en el resultado final
shapiro(generated_data)

Y podemos ver cómo evoluciona el estadístico en cada una de las iteraciones.

In [None]:
# Gráfico con Plotly
statistics = []

for i in range(len(generated_data_list)):
    statistics.append(shapiro(generated_data_list[i])[0])

fig = go.Figure()
fig.add_trace(go.Scatter(x=np.arange(1, num_epochs+1, 50), y=statistics, mode='lines'))
fig.update_layout(
    title=f"Evolución del estadístico de Shapiro-Wilk",
    xaxis_title="Iteración",
    yaxis_title="Estadístico")
fig.show()

## **Parte 4: Generando Dígitos de MNIST usando GANs**

Intentaremos generar dígitos "sintéticos" usando una GAN muy simple. Para ello, consideraremos el formato de dígitos entre 0 y 9 del dataset MINST.

## Importar Librerías Necesarias

In [None]:
!pip install umap-learn

In [None]:
# Librerías extras para este ejemplo
import math
import pickle as pkl
from torchvision import datasets, transforms

## Cargamos los datos del MNIST

In [None]:
# Hiperparámetros
batch_size = 64
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Preprocesamiento
transform = transforms.Compose([
    transforms.ToTensor(),
    #transforms.Normalize((0.1307,), (0.3081,))
])

train_dataset = datasets.MNIST('./data', train=True, download=True, transform=transform)
test_dataset = datasets.MNIST('./data', train=False, transform=transform)

Construimos el DataLoader que usaremos para entrenar nuestra GAN

In [None]:
# Los loader de training y los de test
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

Podemos ver que nos da resultados consistentes:

In [None]:
# Vemos el "primer" batch disponible:
image_batch = next(iter(train_loader))
print(len(image_batch), type(image_batch))
print(image_batch[0].shape)
print(image_batch[1].shape)

In [None]:
## ----------------------------------------------------------------------------
## Visualización de un Batch
## ----------------------------------------------------------------------------

def display_images(images, n_cols=4, figsize=(12, 6)):
    """
    Función útil para mostrar un batch de imágenes en una grilla.

    Parameters
    ----------
    images: Tensor
            tensor of shape (batch_size, channel, height, width)
            containing images to be displayed
    n_cols: int
            number of columns in the grid

    Returns
    -------
    None
    """
    plt.style.use('ggplot')
    n_images = len(images)
    n_rows = math.ceil(n_images / n_cols)
    plt.figure(figsize=figsize)
    for idx in range(n_images):
        ax = plt.subplot(n_rows, n_cols, idx+1)
        image = images[idx]
        # make dims H x W x C
        image = image.permute(1, 2, 0)
        cmap = 'gray' if image.shape[2] == 1 else plt.cm.viridis
        ax.imshow(image, cmap=cmap)
        ax.set_xticks([])
        ax.set_yticks([])
    plt.tight_layout()
    plt.show()

display_images(images=image_batch[0], n_cols=8)

## Recordamos el problema de clasificación en MNIST

In [None]:
# Creamos una CNN simple para clasificar las clases del MNIST
class MNISTClassifier(nn.Module):
    def __init__(self):
        super(MNISTClassifier, self).__init__()
        self.conv1 = nn.Conv2d(1, 64, kernel_size=11, stride=4, padding=2)
        self.conv2 = nn.Conv2d(64, 192, kernel_size=5, padding=2)
        self.conv3 = nn.Conv2d(192, 384, kernel_size=3, padding=1)
        self.fc1 = nn.Linear(384, 256)
        self.fc2 = nn.Linear(256, 10)
        self.relu = nn.ReLU()
        self.maxpool = nn.MaxPool2d(2)

    def forward(self, x):
        x = self.maxpool(self.relu(self.conv1(x)))
        x = self.maxpool(self.relu(self.conv2(x)))
        x = self.relu(self.conv3(x))
        x = x.view(x.size(0), -1)
        x = self.relu(self.fc1(x))
        x = self.fc2(x)
        return x

In [None]:
learning_rate_classif = 0.01
n_epochs_classif = 10

classifier = MNISTClassifier().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(classifier.parameters(), lr=learning_rate_classif)

def train_epoch(classifier, train_loader, criterion, optimizer):
    classifier.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)

        optimizer.zero_grad()
        output = classifier(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = output.max(1)
        total += target.size(0)
        correct += predicted.eq(target).sum().item()

    return running_loss / len(train_loader), 100. * correct / total

def evaluate(classifier, test_loader):
    classifier.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = classifier(data)
            _, predicted = output.max(1)
            total += target.size(0)
            correct += predicted.eq(target).sum().item()

    return 100. * correct / total

# Training loop
train_losses = []
train_accuracies = []
test_accuracies = []

for epoch in range(n_epochs_classif):
    train_loss, train_acc = train_epoch(classifier, train_loader, criterion, optimizer)
    test_acc = evaluate(classifier, test_loader)

    train_losses.append(train_loss)
    train_accuracies.append(train_acc)
    test_accuracies.append(test_acc)

    print(f'Epoch: {epoch+1}/{n_epochs_classif}')
    print(f'Training Loss: {train_loss:.4f}')
    print(f'Training Accuracy: {train_acc:.2f}%')
    print(f'Test Accuracy: {test_acc:.2f}%')
    print('-' * 50)

# Guardamos el clasificador
torch.save(classifier.state_dict(), 'mnist_classifier.pth')

# Graficar las curvas
plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.plot(train_losses)
plt.title('Training Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')

plt.subplot(1, 2, 2)
plt.plot(train_accuracies, label='Train')
plt.plot(test_accuracies, label='Test')
plt.title('Classification Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy (%)')
plt.legend()

plt.tight_layout()
plt.show()


In [None]:
# Cargamos el Clasificador Pre-Entrenado
classifier = MNISTClassifier()
classifier.load_state_dict(torch.load('mnist_classifier.pth'))
classifier.eval()

Visualizamos cómo le va a nuestro modelo entrenado sobre un batch de datos

In [None]:
# Obtenemos un batch de imágenes
dataiter = iter(test_loader)
images, labels = next(dataiter)
images = images.to(device)
labels = labels.to(device)

classifier = classifier.to(device)
# Hacemos predicciones
with torch.no_grad():
    outputs = classifier(images)
    probabilities = torch.nn.functional.softmax(outputs, dim=1)
    _, predicted = torch.max(probabilities, 1)

# Visualizamos las imágenes y las predicciones
plt.figure(figsize=(12, 6))
for i in range(10):  # Mostramos las primeras 10 imágenes
    plt.subplot(2, 5, i+1)
    image = images[i].cpu().numpy().squeeze()
    true_label = labels[i].item()
    pred_label = predicted[i].item()
    plt.imshow(image, cmap='gray')
    plt.title(f'True: {true_label}\nPred: {pred_label}')
    plt.axis('off')
plt.tight_layout()
plt.show()

Genial! Pero no nos interesa clasificar estos datos, sino poder **generarlos**...

## Definición de la GAN

### Discriminator Network

Definimos un discriminador estándar... Esencialmente un "clasificador" de imágenes.

In [None]:
class Discriminator(nn.Module):
    def __init__(self, in_features, out_features):
        super().__init__()
        self.in_features = in_features
        self.out_features = out_features

        # Usamos una FFNN con Leaky-ReLUs y Dropout.
        self.fc1 = nn.Linear(in_features=in_features, out_features=128)
        self.leaky_relu1 = nn.LeakyReLU(negative_slope=0.2)
        self.fc2 = nn.Linear(in_features=128, out_features=64)
        self.leaky_relu2 = nn.LeakyReLU(negative_slope=0.2)
        self.fc3 = nn.Linear(in_features=64, out_features=32)
        self.leaky_relu3 = nn.LeakyReLU(negative_slope=0.2)
        self.fc4 = nn.Linear(in_features=32, out_features=out_features)
        self.dropout = nn.Dropout(0.3)


    def forward(self, x):
        # Flatten a la entrada
        batch_size = x.shape[0]
        x = x.view(batch_size, -1)
        # Feed forward
        x = self.fc1(x)
        x = self.leaky_relu1(x)
        x = self.dropout(x)
        x = self.fc2(x)
        x = self.leaky_relu2(x)
        x = self.dropout(x)
        x = self.fc3(x)
        x = self.leaky_relu3(x)
        x = self.dropout(x)
        logit_out = self.fc4(x)

        return logit_out

### Generator Network

Nuestra red generadora debe ser capaz de "crear" imágenes en el formato deseado, a partir de ruido en una dimensión menor

In [None]:
class Generator(nn.Module):
    def __init__(self, in_features, out_features):
        super(Generator, self).__init__()
        self.in_features = in_features
        self.out_features = out_features

        # El Generator va a up-samplear el input, produciendo datos listos para el discriminador
        self.fc1 = nn.Linear(in_features=in_features, out_features=32)
        self.relu1 = nn.LeakyReLU(negative_slope=0.2)
        self.fc2 = nn.Linear(in_features=32, out_features=64)
        self.relu2 = nn.LeakyReLU(negative_slope=0.2)
        self.fc3 = nn.Linear(in_features=64, out_features=128)
        self.relu3 = nn.LeakyReLU(negative_slope=0.2)
        self.fc4 = nn.Linear(in_features=128, out_features=out_features)
        self.dropout = nn.Dropout(0.3)
        self.tanh = nn.Tanh()


    def forward(self, x):
        # Feed forward
        x = self.fc1(x)
        x = self.relu1(x)
        x = self.dropout(x)
        x = self.fc2(x)
        x = self.relu2(x)
        x = self.dropout(x)
        x = self.fc3(x)
        x = self.relu3(x)
        x = self.dropout(x)
        x = self.fc4(x)
        tanh_out = self.tanh(x)

        return tanh_out

## Definición de las Losses

In [None]:
def real_loss(predicted_outputs, loss_fn, device):
    """
    Función para calcular la loss cuando las muestras vienen del dataset real

    Parameters
    ----------
    predicted_outputs: Tensor
                       predicted logits

    Returns
    -------
    real_loss: int
    """
    batch_size = predicted_outputs.shape[0]
    # Los Targets se definen como 1, porque se espera que la predicción sea
    # 1 (o cercano a 1), ya que las muestras vienen del dataset real.
    targets = torch.ones(batch_size).to(device)
    real_loss = loss_fn(predicted_outputs.squeeze(), targets)

    return real_loss


def fake_loss(predicted_outputs, loss_fn, device):
    """
    Función para calcular la loss cuando las samples son las "generadas"

    Parameters
    ----------
    predicted_outputs: Tensor
                       predicted logits

    Returns
    -------
    fake_loss: int
    """
    batch_size = predicted_outputs.shape[0]
    # Los Targets se definen como 0, porque se espera que la predicción sea
    # 0 (o cercano a 0), ya que las muestras son falsas.
    targets = torch.zeros(batch_size).to(device)
    fake_loss = loss_fn(predicted_outputs.squeeze(), targets)

    return fake_loss

## Entrenando nuestras redes
El Discriminador y Generador se entrenan en conjunto, pero cada uno requiere su propio optimizador

In [None]:
#Visualización de un vector en el espacio latente (de donde samplearemos)
z_size = 100
z = np.random.uniform(-1, 1, size=(16, z_size))
plt.imshow(z, cmap='gray')
plt.xticks([])
plt.yticks([])
plt.show()

Definimos el Loop de Entrenamiento:

In [None]:
def train_minst_gan(d, g, d_optim, g_optim, loss_fn, dl, n_epochs, device, verbose=False):
    print(f'Training on [{device}]...')

    # Generate a batch (say 16) of latent image vector (z) of fixed size
    # (say 100 pix) to be as input to the Generator after each epoch of
    # training to generate a fake image. We'll visualise these fake images
    # to get a sense how generator improves as training progresses
    z_size = 100
    fixed_z = np.random.uniform(-1, 1, size=(16, z_size))
    fixed_z = torch.from_numpy(fixed_z).float().to(device)
    fixed_samples = []
    d_losses = []
    g_losses = []


    # Move discriminator and generator to available device
    d = d.to(device)
    g = g.to(device)

    for epoch in range(n_epochs):
        print(f'Epoch [{epoch+1}/{n_epochs}]:')
        # Switch the training mode on
        d.train()
        g.train()
        d_running_batch_loss = 0
        g_running_batch_loss = 0
        for curr_batch, (real_images, _) in enumerate(dl):
            # Move input batch to available device
            real_images = real_images.to(device)

            ## ----------------------------------------------------------------
            ## Train discriminator using real and then fake MNIST images,
            ## then compute the total-loss and back-propogate the total-loss
            ## ----------------------------------------------------------------

            # Reset gradients
            d_optim.zero_grad()

            # Real MNIST images
            # Convert real_images value range of 0 to 1 to -1 to 1
            # this is required because latter discriminator would be required
            # to consume generator's 'tanh' output which is of range -1 to 1
            real_images = (real_images * 2) - 1
            d_real_logits_out = d(real_images)
            d_real_loss = real_loss(d_real_logits_out, loss_fn, device)
            #d_real_loss = real_loss(d_real_logits_out, smooth=True)

            # Fake images
            with torch.no_grad():
                # Generate a batch of random latent vectors
                z = np.random.uniform(-1, 1, size=(dl.batch_size, z_size))
                z = torch.from_numpy(z).float().to(device)
                # Generate batch of fake images
                fake_images = g(z)
            # feed fake-images to discriminator and compute the
            # fake_loss (i.e. target label = 0)
            d_fake_logits_out = d(fake_images)
            d_fake_loss = fake_loss(d_fake_logits_out, loss_fn, device)
            #d_fake_loss = fake_loss(d_fake_logits_out)
            # Compute total discriminator loss
            d_loss = d_real_loss + d_fake_loss
            # Backpropogate through discriminator
            d_loss.backward()
            d_optim.step()
            # Save discriminator batch loss
            d_running_batch_loss += d_loss

            ## ----------------------------------------------------------------
            ## Train generator, compute the generator loss which is a measure
            ## of how successful the generator is in tricking the discriminator
            ## and finally back-propogate generator loss
            ## ----------------------------------------------------------------

            # Reset gradients
            g_optim.zero_grad()

            # Generate a batch of random latent vectors
            #z = torch.rand(size=(dl.batch_size, z_size)).to(device)
            z = np.random.uniform(-1, 1, size=(dl.batch_size, z_size))
            z = torch.from_numpy(z).float().to(device)
            # Generate a batch of fake images, feed them to discriminator
            # and compute the generator loss as real_loss
            # (i.e. target label = 1)
            fake_images = g(z)
            g_logits_out = d(fake_images)
            g_loss = real_loss(g_logits_out, loss_fn, device)
            #g_loss = real_loss(g_logits_out)
            # Backpropogate thorugh generator
            g_loss.backward()
            g_optim.step()
            # Save discriminator batch loss
            g_running_batch_loss += g_loss

            # Display training stats for every 200 batches
            if curr_batch % 400 == 0 and verbose:
                print(f'\tBatch [{curr_batch:>4}/{len(dl):>4}] - d_batch_loss: {d_loss.item():.6f}\tg_batch_loss: {g_loss.item():.6f}')

        # Compute epoch losses as total_batch_loss/number_of_batches
        d_epoch_loss = d_running_batch_loss.item()/len(dl)
        g_epoch_loss = g_running_batch_loss.item()/len(dl)
        d_losses.append(d_epoch_loss)
        g_losses.append(g_epoch_loss)

        # Display training stats for every 200 batches
        print(f'epoch_d_loss: {d_epoch_loss:.6f} \tepoch_g_loss: {g_epoch_loss:.6f}')

        # Generate fake images from fixed latent vector using the trained
        # generator so far and save images for latter viewing
        g.eval()
        fixed_samples.append(g(fixed_z).detach().cpu())

    # Finally write generated fake images from fixed latent vector to disk
    with open('fixed_samples.pkl', 'wb') as f:
        pkl.dump(fixed_samples, f)

    return d_losses, g_losses

### Preparar y Realizar el Entrenamiento


In [None]:
d = Discriminator(in_features=784, out_features=1)
g = Generator(in_features=100, out_features=784)

print(d)
print()
print(g)

# Definimos Optimizadores Separados
d_optim = optim.Adam(d.parameters(), lr=0.002)
g_optim = optim.Adam(g.parameters(), lr=0.002)

# Consideramos la loss BCE
loss_fn = nn.BCEWithLogitsLoss()


#Entrenamiento
n_epochs = 10
d_losses, g_losses = train_minst_gan(d, g, d_optim, g_optim,
                                     loss_fn, train_loader, n_epochs, device,
                                     verbose=False)

### Visualizando la Loss durante el entrenamiento

In [None]:
plt.plot(d_losses, label='Discriminator')
plt.plot(g_losses, label='Generator')
plt.legend()
plt.show()

### Visualicemos cómo mejora la generación de imágenes con las épocas de entrenamiento

In [None]:
def show_generated_images(epoch, n_cols=8):
    # Cargamos las imágenes que guardamos
    with open('fixed_samples.pkl', 'rb') as f:
        saved_data = pkl.load(f)
    epoch_data = saved_data[epoch-1]
    # re-escalamos a 0-1
    epoch_data = (epoch_data + 1) / 2
    # re-shapeamos a (batch_size, channel, height, width)
    batch_size, channel, height, width = len(epoch_data), 1, 28, 28
    image_batch = epoch_data.view(batch_size, channel, height, width)
    display_images(images=image_batch, n_cols=n_cols, figsize=(12, 4))

In [None]:
show_generated_images(epoch=1, n_cols=8)

In [None]:
show_generated_images(epoch=n_epochs//10, n_cols=8)

In [None]:
show_generated_images(epoch=n_epochs//2, n_cols=8)

In [None]:
show_generated_images(epoch=n_epochs, n_cols=8)

## Evaluar nuestras redes
El generador entrenado se puede usar para generar un dígito aleatorio (y random) que parecerá provenir del dataset MNIST. Para esta etapa, podemos olvidarnos del discriminator.

In [None]:
# Traemos el generador a cpu y lo ponemos en modo eval
g.to('cpu')
g.eval()
# Entregamos un vector latente random, de dimensión 100, para generar una imagen falsa.
z = np.random.uniform(-1, 1, size=(1, 100))
z = torch.from_numpy(z).float()
print("Semilla de la Generación", z)
fake_image = g(z)
# Reshapeamos y la mostramos
fake_image = fake_image.view(1, 1, 28, 28).detach()
display_images(fake_image, n_cols=1, figsize=(2, 2))

## Evaluar si el modelo está aprendiendo alguna estructura subyacente
En otras palabras, samplearemos aleatoriamente muchos datos, y veremos de qué clase son los dígitos generados (apoyándonos con nuestro clasificador previamente entrenado)

In [None]:
from sklearn.decomposition import PCA
import umap


def analyze_gan_distribution(generator, classifier, n_samples=100):
    # Generate latent vectors and images
    z_vectors = np.random.uniform(-1, 1, size=(n_samples, 100))
    z_tensors = torch.from_numpy(z_vectors).float()

    # Generate images
    generator.eval()
    with torch.no_grad():
        fake_images = generator(z_tensors)
        fake_images = fake_images.view(-1, 1, 28, 28)

        # Get predictions
        predictions = classifier(fake_images)
        predicted_labels = torch.argmax(predictions, dim=1).numpy()

    # Create a grid of subplots
    num_rows = int(np.ceil(n_samples / 5))
    fig, axes = plt.subplots(num_rows, 5, figsize=(15, num_rows*3))
    axes = axes.flatten()  # Flatten the axes array for easy indexing

    for j in range(n_samples):
        fake_image = fake_images[j].squeeze().detach()
        axes[j].imshow(fake_image, cmap='gray')
        axes[j].set_title(f'Predicted Label: {predicted_labels[j]}')
        axes[j].axis('off')  # Hide axes

    # Remove any empty subplots
    for j in range(n_samples, num_rows*5):
        axes[j].axis('off')

    plt.tight_layout()
    plt.show()

    # Reduce dimensions of latent vectors
    pca = PCA(n_components=2)
    z_pca = pca.fit_transform(z_vectors)

    reducer = umap.UMAP()
    z_umap = reducer.fit_transform(z_vectors)

    return z_pca, z_umap, predicted_labels

# Plot results
def plot_distributions(z_pca, z_umap, labels):
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))

    # PCA plot
    scatter1 = ax1.scatter(z_pca[:, 0], z_pca[:, 1], c=labels, cmap='tab10')
    ax1.set_title('PCA projection of latent space')
    plt.colorbar(scatter1, ax=ax1)

    # UMAP plot
    scatter2 = ax2.scatter(z_umap[:, 0], z_umap[:, 1], c=labels, cmap='tab10')
    ax2.set_title('UMAP projection of latent space')
    plt.colorbar(scatter2, ax=ax2)

    plt.tight_layout()
    plt.show()

In [None]:
# Cargamos el Clasificador Pre-Entrenado
classifier = MNISTClassifier()
classifier.load_state_dict(torch.load('mnist_classifier.pth'))
classifier.eval()

In [None]:
# Generate and analyze samples
z_pca, z_umap, labels = analyze_gan_distribution(g, classifier, n_samples=100)


print(50*"-")
# Plot results
plot_distributions(z_pca, z_umap, labels)
