In [None]:
from locale import normalize
%load_ext autoreload
%autoreload 2

In [None]:
import math
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.optim as optim
import losses, datasets, ddpm, models, geometry

In [None]:
s_0 = torch.tensor([1.0])
z_0 = torch.tensor([1.0])

In [None]:
# Create grid
grid_res = 100
z_vals = torch.linspace(0, 2, grid_res)
s_vals = torch.linspace(0, 2, grid_res)

loss_grid = torch.zeros(grid_res, grid_res)
for i, z in enumerate(z_vals):
    for j, s in enumerate(s_vals):
        loss_grid[j, i] = losses.circle_loss(z, s, z_0, s_0)

# Plot
plt.imshow(loss_grid.numpy(), extent=[0, 2, 0, 2], origin='lower', aspect='auto', cmap='viridis')
plt.colorbar(label="Loss")
plt.xlabel("z")
plt.ylabel("s")
plt.title("Loss surface for fwd_loss(z, s)")
plt.show()

# Plain regression

# Supervised diffusion

In [None]:
import torch
import torch.optim as optim
import models, geometry, datasets, losses, ddpm, example_2Dsimple
from torch.utils.data import Dataset, DataLoader

def main():
    # Linear
    T = 100
    bs = 1000

    betas = ddpm.linear_beta_schedule(T)
    a, abar = ddpm.alpha_bar(betas)  # \bar{α}_t for t=1..T

    plt.plot(range(1, T + 1), a, label=r"$'\alpha'_t$ (Linear Beta)")
    plt.plot(range(1, T + 1), betas, label=r"$'\beta'_t$ (Linear Beta)")
    plt.plot(range(1, T + 1), abar, label=r"$\bar{\alpha}_t$ (Linear Beta)")
    plt.xlabel("Timestep")
    plt.ylabel(r"$\bar{\alpha}_t$")
    plt.title("Alpha Bar vs Timestep (Linear Beta Schedule)")
    plt.grid(True)
    plt.legend()
    plt.show()

    model_diff = models.SimpleNN(IO=2, N=10, C=1+1, bound=True)

    # Loss & optimizer
    optimizer = optim.Adam(model_diff.parameters(), lr=0.01)

    dataset = datasets.Dataset2DSphere(length=20000*bs, seed=8)
    loader = DataLoader(
        dataset,
        batch_size=bs,
        shuffle=True,  # reshuffle each epoch
        num_workers=0,  # >0 for parallel loading
        pin_memory=False  # True if training on GPU
    )

    # Minimalistic loop
    log = []
    for iter, batch in enumerate(loader):
        s_0, z_0, p_0, s_1, z_1, p_1 = batch['s_0'], batch['z_0'], batch['p_0'], batch['s_1'], batch['z_1'], batch['p_1']

        # warmup
        t = torch.ones((bs,), dtype=int) * (T - 1)
        #z_t = 2 * torch.randn(bs, 2)  # HACK 100 samples, 2 features
        x_1 = torch.cat([s_1, z_1], dim=-1)
        x_eps = model_diff(x_1, [p_0, t])  # diffusion
        x_t, x_var = ddpm.denoising_step(a, betas, abar, t, x_1, x_eps)

        # sample step times t and add noise up to step t
        t = torch.randint(0, T, (bs,))
        x_t, eps_gt = ddpm.q_sample(abar, x_t, t)
        x_eps = model_diff(x_t, [p_0, t])  # diffusion
        x_t, x_var = ddpm.denoising_step(a, betas, abar, t, x_t, x_eps)

        if 1:  # deterministic process
            diff = losses.circle_loss(x_t[:, 0], x_t[:, 1], z_0, s_0)
        else:  # random process (TODO: run a few steps of this before computing the loss)
            N_std_noise = torch.randn_like(x_t)
            print("X", z_var.shape, z_mean.shape, N_std_noise.shape)
            x_prev = x_t + torch.sqrt(x_var) * N_std_noise
            diff = losses.circle_loss(x_prev[:, 0], x_prev[:, 1], z_0, s_0)

        loss = diff.mean()
        optimizer.zero_grad()  # reset gradients
        loss.backward()  # backprop
        optimizer.step()  # update weights
        log.append(loss.item())
        reference = math.pow(10.0, math.floor(math.log10(iter + 1)))
        if (iter + 1) % reference == 0:
            print(f"Iter {iter + 1}, loss = {loss.item():.4f}")
    losses.plot_losses(log)

    # now sample from the learned (conditional) distribution
    s_0 = torch.FloatTensor([1.0]) #torch.rand(1)  # 100 samples, 2 features
    z_0 = torch.FloatTensor([1.0]) # torch.rand(1)  # class labels {0,1}
    p_0 = geometry.project_1D(z_0, s_0).expand(p_1.shape)

    x_1 = torch.randn(x_eps.shape)
    samples = ddpm.ddpm_sample(model_diff, x_1, [p_0], betas=betas, abar=abar)
    print(samples)

    plot_2d_samples(samples, p_0)
    
    
    example_2Dsimple.plot_2d_samples(samples, p_0)
    example_2Dsimple.plot_2d_samples(samples.abs(), p_0)

    pass

if __name__ == '__main__':
    main()

In [None]:
# model_untrained = SimpleNN(IO=2, N=10, C=C, bound=True) # leads to a spread around 0, often with a slight shift to one side


samples

In [None]:
z = z_mean
num = 5
print(f" z={z[:num, 0]},\n s={z[:num, 1]} \n p={project_1D(z[:num, 0], z[:num, 1]).abs()},\n l={-circle_iou_concentric(project_1D(z[:num, 0], z[:num, 1]), s_0)},\n gt={s_0}")

In [None]:
z.shap    