In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from torch.distributions import MultivariateNormal
from tqdm import tqdm

In [None]:
# === PlanarFlow (from your code) ===
class PlanarFlow(nn.Module):
    def __init__(self, dim, h=torch.tanh, hp=lambda x: 1 - torch.tanh(x) ** 2):
        super().__init__()
        self.weight = nn.Parameter(torch.randn(1, dim) * 0.01)
        self.scale = nn.Parameter(torch.randn(1, dim) * 0.01)
        self.bias = nn.Parameter(torch.zeros(1))
        self.h = h
        self.hp = hp

    def forward(self, z):
        f_z = F.linear(z, self.weight, self.bias)
        return z + self.scale * self.h(f_z)

    def log_abs_det_jacobian(self, z):
        f_z = F.linear(z, self.weight, self.bias)
        psi = self.hp(f_z) * self.weight  # (B, D)
        det_grad = 1 + torch.mm(psi, self.scale.t())
        return torch.log(det_grad.abs() + 1e-9).squeeze()


# === Normalizing Flow Model ===
class NormalizingFlowModel(nn.Module):
    def __init__(self, flows, base_dist):
        super().__init__()
        self.flows = nn.ModuleList(flows)
        self.base_dist = base_dist

    def forward(self, x):
        log_det = 0
        for flow in self.flows:
            log_det = log_det + flow.log_abs_det_jacobian(x)
            x = flow(x)
        return x, log_det

    def inverse(self, z):
        # Needed for sampling from base to data space
        for flow in reversed(self.flows):
            # No analytical inverse – Planar flow not invertible algebraically
            raise NotImplementedError("Planar flow inverse is not defined.")
        return z

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
filename="/content/blobs.json"
with open(filename, 'r') as file:
    data = json.load(file)
Data=torch.tensor(data['X'])

# Compute the mean
mean_x = Data.mean(axis=0)

# Center the data
X_centered = Data - mean_x

In [None]:
x_data = torch.tensor(X_centered, dtype=torch.float32).to(device)

In [None]:
# Convert to NumPy
data = X_centered.numpy()

# Extract x and y
x = data[:, 0]
y = data[:, 1]

# Plot 2D histogram
plt.figure(figsize=(6, 5))
plt.hist2d(x, y, bins=100, cmap='plasma')  # You can adjust 'bins' and 'cmap'
plt.colorbar(label='Counts')
plt.xlabel('X')
plt.ylabel('Y')
plt.title('2D Histogram of Target Distribution')
plt.grid(True)
plt.tight_layout()
plt.show()

In [None]:
dim = x_data.shape[1]
base_dist = MultivariateNormal(loc=torch.zeros(dim).to(device),
                               covariance_matrix=torch.eye(dim).to(device))

In [None]:
flows = [PlanarFlow(dim).to(device) for _ in range(8)]
model = NormalizingFlowModel(flows, base_dist).to(device)

In [None]:
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
num_epochs = 5000
batch_size = 256

losses = []
for epoch in tqdm(range(num_epochs)):
    idx = torch.randint(0, x_data.shape[0], (batch_size,))
    x_batch = x_data[idx]

    z, log_det = model(x_batch)
    log_prob = model.base_dist.log_prob(z)
    loss = -(log_prob + log_det).mean()

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    losses.append(loss.item())

    if epoch % 500 == 0:
        print(f"[{epoch}] Loss: {loss.item():.4f}")

In [None]:
with torch.no_grad():
    z_latent, _ = model(x_data)

    # Optionally check if z_latent ∼ N(0, I)
    z_np = z_latent.cpu().numpy()

    import matplotlib.pyplot as plt
    if dim == 2:
        plt.figure(figsize=(6, 6))
        plt.hist2d(z_np[:, 0], z_np[:, 1], bins=100, density=True, cmap="viridis")
        plt.title("Transformed z (should look like N(0,I))")
        plt.colorbar()
        plt.show()

Let's now do this with Normflows package

In [None]:
import torch
import normflows as nf
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm

In [None]:
num_flows = 8
# --- Define base distribution (Standard Normal) ---
q0 = nf.distributions.base.DiagGaussian(dim)

# --- Create list of Planar flows ---
flows = [nf.flows.Planar(dim) for _ in range(num_flows)]

# --- Create the flow model ---
model = nf.NormalizingFlow(q0=q0, flows=flows)

In [None]:
# === Training parameters ===
num_iter = 5000
batch_size = 256
lr = 1e-3

optimizer = torch.optim.Adam(model.parameters(), lr=lr)

# --- Training loop ---
losses = []
for it in tqdm(range(num_iter)):
    idx = torch.randint(0, x_data.shape[0], (batch_size,))
    x = x_data[idx]

    # log_prob handles forward pass and log_det
    z = x
    log_det = 0.
    for flow in model.flows:
        z, ld = flow(z)
        log_det += ld

    log_q0 = model.q0.log_prob(z)
    loss = -(log_q0 + log_det).mean()

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    losses.append(loss.item())

    if it % 500 == 0:
        print(f"[{it}] Loss: {loss.item():.4f}")

In [None]:
with torch.no_grad():
    z = x_data.clone()
    for flow in model.flows:
        z, _ = flow(z)  # Only forward transform, discard log_det

In [None]:
import matplotlib.pyplot as plt

fig, axs = plt.subplots(1, 2, figsize=(12, 5))

# Original data
axs[0].scatter(x_data[:, 0], x_data[:, 1], s=5, alpha=0.6)
axs[0].set_title("Original data")

# Transformed to latent space
axs[1].scatter(z[:, 0], z[:, 1], s=5, alpha=0.6)
axs[1].set_title("Transformed to latent space")

plt.tight_layout()
plt.show()

In [None]:
# --- Configuration ---
dim = 2  # Dimensionality of the data
num_flows = 8
num_iter = 5000
batch_size = 256

# --- Define base distribution (Standard Normal) ---
q0 = nf.distributions.base.DiagGaussian(dim)

# --- Create list of Planar flows ---
flows = [nf.flows.Planar(dim) for _ in range(num_flows)]

# --- Create the flow model ---
model = nf.NormalizingFlow(q0=q0, flows=flows)

# --- Optimizer ---
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

# --- Create some new dummy data
x_data = torch.tensor(
    np.concatenate([
        np.random.randn(1000, 2) * 0.5 + np.array([2, 2]),
        np.random.randn(1000, 2) * 0.5 + np.array([-2, -2])
    ], axis=0),
    dtype=torch.float32
)

In [None]:
# --- Training loop ---
losses = []
for it in tqdm(range(num_iter)):
    idx = torch.randint(0, x_data.shape[0], (batch_size,))
    x = x_data[idx]

    # log_prob handles forward pass and log_det
    z = x
    log_det = 0.
    for flow in model.flows:
        z, ld = flow(z)
        log_det += ld

    log_q0 = model.q0.log_prob(z)
    loss = -(log_q0 + log_det).mean()

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    losses.append(loss.item())

    if it % 500 == 0:
        print(f"[{it}] Loss: {loss.item():.4f}")

In [None]:
with torch.no_grad():
    z = x_data.clone()
    for flow in model.flows:
        z, _ = flow(z)  # Only forward transform, discard log_det

In [None]:
import matplotlib.pyplot as plt

fig, axs = plt.subplots(1, 2, figsize=(12, 5))

# Original data
axs[0].scatter(x_data[:, 0], x_data[:, 1], s=5, alpha=0.6)
axs[0].set_title("Original data")

# Transformed to latent space
axs[1].scatter(z[:, 0], z[:, 1], s=5, alpha=0.6)
axs[1].set_title("Transformed to latent space")

plt.tight_layout()
plt.show()
