In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

# Set device (use GPU if available)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Define a simple Normalizing Flow model using RealNVP
class RealNVP(nn.Module):
    def __init__(self, input_dim, hidden_dim):
        super(RealNVP, self).__init__()

        self.t1 = nn.Sequential(
            nn.Linear(input_dim // 2, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, input_dim // 2),
        )
        self.s1 = nn.Sequential(
            nn.Linear(input_dim // 2, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, input_dim // 2),
            nn.Tanh(),  # To ensure scale is not too large
        )

        self.t2 = nn.Sequential(
            nn.Linear(input_dim // 2, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, input_dim // 2),
        )
        self.s2 = nn.Sequential(
            nn.Linear(input_dim // 2, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, input_dim // 2),
            nn.Tanh(),
        )

    def forward(self, x):
        x1, x2 = x[:, : x.shape[1] // 2], x[:, x.shape[1] // 2 :]

        # First transformation
        y1 = x1
        y2 = x2 * torch.exp(self.s1(x1)) + self.t1(x1)

        # Second transformation
        z2 = y2
        z1 = y1 * torch.exp(self.s2(y2)) + self.t2(y2)

        z = torch.cat([z1, z2], dim=1)
        return z

    def inverse(self, z):
        z1, z2 = z[:, : z.shape[1] // 2], z[:, z.shape[1] // 2 :]

        # Inverse of second transformation
        y2 = z2
        y1 = (z1 - self.t2(y2)) * torch.exp(-self.s2(y2))

        # Inverse of first transformation
        x1 = y1
        x2 = (y2 - self.t1(x1)) * torch.exp(-self.s1(x1))

        x = torch.cat([x1, x2], dim=1)
        return x

# Define the loss function (negative log-likelihood)
def loss_function(z, log_det_J):
    loss = 0.5 * torch.sum(z ** 2) - torch.sum(log_det_J)
    return loss

# Hyperparameters
input_dim = 784  # MNIST images are 28x28
hidden_dim = 256
epochs = 10
batch_size = 128
learning_rate = 1e-3

# Load MNIST dataset
train_dataset = datasets.MNIST(root='./data', train=True, transform=transforms.ToTensor(), download=True)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# Initialize the flow model and optimizer
flow_model = RealNVP(input_dim, hidden_dim)
optimizer = optim.Adam(flow_model.parameters(), lr=learning_rate)

# Training loop
for epoch in range(epochs):
    for i, (data, _) in enumerate(train_loader):
        # Flatten the image
        data = data.view(-1, 784)

        # Forward pass
        z = flow_model(data)

        # Calculate log determinant of Jacobian (not shown here for simplicity, 
        # but it involves tracking the scaling factors during the transformations)
        # For this example, we'll assume log_det_J = 0
        log_det_J = torch.zeros(data.shape[0]) 

        loss = loss_function(z, log_det_J)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if (i+1) % 100 == 0:
            print(f'Epoch [{epoch+1}/{epochs}], Step [{i+1}/{len(train_loader)}], Loss: {loss.item():.4f}')



Epoch [1/10], Step [100/469], Loss: 339.2137
Epoch [1/10], Step [200/469], Loss: 258.3534
Epoch [1/10], Step [300/469], Loss: 224.7767
Epoch [1/10], Step [400/469], Loss: 207.5470
Epoch [2/10], Step [100/469], Loss: 200.9799
Epoch [2/10], Step [200/469], Loss: 179.9624
Epoch [2/10], Step [300/469], Loss: 178.0738
Epoch [2/10], Step [400/469], Loss: 181.6971
Epoch [3/10], Step [100/469], Loss: 165.3713
Epoch [3/10], Step [200/469], Loss: 166.6384
Epoch [3/10], Step [300/469], Loss: 156.0230
Epoch [3/10], Step [400/469], Loss: 151.1497
Epoch [4/10], Step [100/469], Loss: 162.5576
Epoch [4/10], Step [200/469], Loss: 157.4131
Epoch [4/10], Step [300/469], Loss: 166.3839
Epoch [4/10], Step [400/469], Loss: 159.5267
Epoch [5/10], Step [100/469], Loss: 153.9683
Epoch [5/10], Step [200/469], Loss: 152.1700
Epoch [5/10], Step [300/469], Loss: 157.0360
Epoch [5/10], Step [400/469], Loss: 156.7315
Epoch [6/10], Step [100/469], Loss: 145.7701
Epoch [6/10], Step [200/469], Loss: 142.0586
Epoch [6/1

In [2]:
# Now you can use the trained flow model to estimate the probability density:
def estimate_pdf(x, flow_model):
    with torch.no_grad():
        z = flow_model(x.view(-1, 784))
        # Calculate the log probability density of z
        log_p_z = -0.5 * torch.sum(z ** 2, dim=1) - (input_dim / 2) * torch.log(torch.tensor(2 * torch.pi))  
        # If you need the actual probability, you can exponentiate log_p_z
        # p_z = torch.exp(log_p_z)  
        return log_p_z 

# Example usage:
test_image = train_dataset[0][0]  # Get a test image from the dataset
pdf_estimate = estimate_pdf(test_image, flow_model)
print(pdf_estimate)

tensor([-722.5181])
