In [None]:
import os
import subprocess

def git_repo_root():
    # Run the 'git rev-parse --show-toplevel' command to get the root directory of the Git repository
    try:
        root = subprocess.check_output(['git', 'rev-parse', '--show-toplevel'], universal_newlines=True).strip()
        return root
    except subprocess.CalledProcessError:
        # Handle the case where the current directory is not inside a Git repository
        return None

# Get the root directory of the Git repository
git_root = git_repo_root()

if git_root:
    # Change the working directory to the root of the Git repository
    os.chdir(git_root)
    print(f"Changed working directory to: {git_root}")
else:
    print("Not inside a Git repository.")

In [None]:
%load_ext autoreload
%autoreload 2

from diffusion import VPSDE
from data import generate_mixture_gaussians

epochs = 10
# Make sure our diffusion process actually diffuses the data
data = generate_mixture_gaussians()
num_steps = 250
sde = VPSDE(num_steps, 0.1, 20, logarithmic_scheduling=True)
sde.plot_forward_diffusion(data)

In [None]:
from training import get_optimizer, loss_function
from diffusion import match_dim
from data import log_likelihood_mixture_gaussians_batch
from tqdm import tqdm
from matplotlib import pyplot as plt
import numpy as np

def train_score_network(dataloader, score_net, sde, epochs=epochs, bridge=False):
    """
    Trains the score network

    """

    optimizer = get_optimizer(score_net)
    avg = 0
    for epoch in tqdm(range(epochs)):
        for x_batch, in dataloader:
            optimizer.zero_grad()
            loss = loss_function(score_net, x_batch, sde, bridge=bridge)
            loss.backward()
            # nn.utils.clip_grad_norm_(score_net.parameters(), 1.0)
            optimizer.step()
            avg += loss
        
        # Print the log loss for the Gaussian mixture
        ll = 0
        for x_batch, in dataloader:
            ll += log_likelihood_mixture_gaussians_batch(x_batch).sum()
        print(f'Epoch: {epoch} and Log Likelihood: {ll}')        

        if ((epoch % 1000 == 0 and epoch != 0) or epoch == epochs-1):
            tqdm.write(f'Epoch: {epoch} and Loss: {avg/(8*1000)}')
            avg = 0
            samples = sde.backward_diffusion(score_net)
            data = x_batch.detach().numpy()
            samples_np = samples.detach().numpy()
            plt.scatter(data[:, 0], data[:, 1], label='Original Data')
            plt.scatter(samples_np[:, 0], samples_np[:,
                        1], label='Generated Samples')
            plt.legend()
            plt.show()

In [None]:
from torch.utils.data import DataLoader, TensorDataset
from model import MLP

data = generate_mixture_gaussians(num_samples=4000)
dataloader = DataLoader(TensorDataset(data), batch_size=500, shuffle=True)
score_net = MLP()
train_score_network(dataloader, score_net, sde)