## Generative AI Project

### Diffusion Model

In [4]:
import os
import librosa
import soundfile as sf
from tqdm import tqdm
import random
import shutil

In [6]:
# Configuration
input_dir = "./VocalSet/FULL/"  # <- change this
output_dir = "VocalSet_processed"
sample_rate = 22050
duration = 4.0  # seconds
split_ratio = [0.8, 0.1, 0.1]  # train, val, test

# Create output folders
splits = ['train', 'val', 'test']
for split in splits:
    os.makedirs(os.path.join(output_dir, split), exist_ok=True)

# Gather all wav files
all_files = []
for root, _, files in os.walk(input_dir):
    for f in files:
        if f.endswith('.wav'):
            full_path = os.path.join(root, f)
            all_files.append(full_path)

random.shuffle(all_files)
n_total = len(all_files)
n_train = int(split_ratio[0] * n_total)
n_val = int(split_ratio[1] * n_total)

# Split files
train_files = all_files[:n_train]
val_files = all_files[n_train:n_train + n_val]
test_files = all_files[n_train + n_val:]

split_map = {
    "train": train_files,
    "val": val_files,
    "test": test_files
}

# Process files
for split in splits:
    for file_path in tqdm(split_map[split], desc=f"Processing {split}"):
        try:
            y, _ = librosa.load(file_path, sr=sample_rate)
            y = librosa.util.fix_length(data=y, size=int(sample_rate * duration))
            filename = os.path.basename(file_path)
            sf.write(os.path.join(output_dir, split, filename), y, sample_rate)
        except Exception as e:
            print(f"Error processing {file_path}: {e}")

Processing train: 100%|██████████| 2890/2890 [00:40<00:00, 71.15it/s]
Processing val: 100%|██████████| 361/361 [00:05<00:00, 62.91it/s]
Processing test: 100%|██████████| 362/362 [00:05<00:00, 67.74it/s]


In [None]:
import torch
import torch.nn as nn
import torchaudio
from torch.utils.data import Dataset, DataLoader
import os
from glob import glob
from tqdm import tqdm

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# ----------------------------
# Dataset Class
# ----------------------------
class VocalSetDataset(Dataset):
    def __init__(self, directory, sample_rate=22050, duration=4.0):
        self.files = glob(os.path.join(directory, "*.wav"))
        self.sample_rate = sample_rate
        self.target_len = int(sample_rate * duration)

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        audio, sr = torchaudio.load(self.files[idx])
        audio = torchaudio.functional.resample(audio, sr, self.sample_rate)
        audio = audio[:, :self.target_len]
        if audio.shape[1] < self.target_len:
            pad = self.target_len - audio.shape[1]
            audio = torch.nn.functional.pad(audio, (0, pad))
        return audio.squeeze(0)

# ----------------------------
# Encoder and Decoder
# ----------------------------
class Encoder(nn.Module):
    def __init__(self, input_dim=22050*4, latent_dim=128):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, 2048),
            nn.ReLU(),
            nn.Linear(2048, 512),
            nn.ReLU(),
            nn.Linear(512, latent_dim)
        )

    def forward(self, x):
        return self.net(x)

class Decoder(nn.Module):
    def __init__(self, latent_dim=128, output_dim=22050*4):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(latent_dim, 512),
            nn.ReLU(),
            nn.Linear(512, 2048),
            nn.ReLU(),
            nn.Linear(2048, output_dim),
            nn.Tanh()
        )

    def forward(self, x):
        return self.net(x)

# ----------------------------
# Training Loop
# ----------------------------
def train_autoencoder(train_loader, val_loader, epochs=30, lr=1e-4):
    encoder = Encoder().to(device)
    decoder = Decoder().to(device)
    optimizer = torch.optim.Adam(list(encoder.parameters()) + list(decoder.parameters()), lr=lr)
    loss_fn = nn.MSELoss()

    for epoch in range(epochs):
        encoder.train()
        decoder.train()
        train_loss = 0.0
        for batch in tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}"):
            batch = batch.to(device)
            batch = batch.view(batch.size(0), -1)
            z = encoder(batch)
            recon = decoder(z)
            loss = loss_fn(recon, batch)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
        print(f"Train Loss: {train_loss / len(train_loader):.4f}")

    # Save models
    os.makedirs("saved_models", exist_ok=True)
    torch.save(encoder.state_dict(), "saved_models/encoder.pth")
    torch.save(decoder.state_dict(), "saved_models/decoder.pth")
    print("✅ Models saved!")

# ----------------------------
# Run Training
# ----------------------------
if __name__ == "__main__":
    train_dataset = VocalSetDataset("VocalSet_processed/train")
    val_dataset = VocalSetDataset("VocalSet_processed/val")

    train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)

    train_autoencoder(train_loader, val_loader)


Epoch 1/30: 100%|██████████| 181/181 [16:24<00:00,  5.44s/it] 


Train Loss: 0.0025


Epoch 2/30: 100%|██████████| 181/181 [10:50<00:00,  3.59s/it]


Train Loss: 0.0025


Epoch 3/30: 100%|██████████| 181/181 [10:20<00:00,  3.43s/it]


Train Loss: 0.0025


Epoch 4/30: 100%|██████████| 181/181 [10:27<00:00,  3.47s/it]


Train Loss: 0.0025


Epoch 5/30: 100%|██████████| 181/181 [09:41<00:00,  3.21s/it]


Train Loss: 0.0025


Epoch 6/30: 100%|██████████| 181/181 [09:52<00:00,  3.28s/it]


Train Loss: 0.0025


Epoch 7/30: 100%|██████████| 181/181 [10:09<00:00,  3.36s/it]


Train Loss: 0.0024


Epoch 8/30: 100%|██████████| 181/181 [09:54<00:00,  3.29s/it]


Train Loss: 0.0024


Epoch 9/30: 100%|██████████| 181/181 [10:10<00:00,  3.37s/it]


Train Loss: 0.0023


Epoch 10/30: 100%|██████████| 181/181 [10:55<00:00,  3.62s/it]


Train Loss: 0.0023


Epoch 11/30: 100%|██████████| 181/181 [11:10<00:00,  3.70s/it]


Train Loss: 0.0022


Epoch 12/30: 100%|██████████| 181/181 [11:28<00:00,  3.80s/it]


Train Loss: 0.0022


Epoch 13/30: 100%|██████████| 181/181 [11:18<00:00,  3.75s/it]


Train Loss: 0.0021


Epoch 14/30: 100%|██████████| 181/181 [10:27<00:00,  3.47s/it]


Train Loss: 0.0020


Epoch 15/30: 100%|██████████| 181/181 [10:26<00:00,  3.46s/it]


Train Loss: 0.0019


Epoch 16/30: 100%|██████████| 181/181 [13:26<00:00,  4.45s/it]


Train Loss: 0.0019


Epoch 17/30: 100%|██████████| 181/181 [15:22<00:00,  5.10s/it]


Train Loss: 0.0018


Epoch 18/30: 100%|██████████| 181/181 [13:29<00:00,  4.47s/it]


Train Loss: 0.0017


Epoch 19/30: 100%|██████████| 181/181 [12:16<00:00,  4.07s/it]


Train Loss: 0.0017


Epoch 20/30: 100%|██████████| 181/181 [11:12<00:00,  3.72s/it]


Train Loss: 0.0016


Epoch 21/30: 100%|██████████| 181/181 [10:59<00:00,  3.64s/it]


Train Loss: 0.0016


Epoch 22/30: 100%|██████████| 181/181 [10:37<00:00,  3.52s/it]


Train Loss: 0.0015


Epoch 23/30: 100%|██████████| 181/181 [10:27<00:00,  3.47s/it]


Train Loss: 0.0015


Epoch 24/30: 100%|██████████| 181/181 [10:11<00:00,  3.38s/it]


Train Loss: 0.0015


Epoch 25/30: 100%|██████████| 181/181 [10:34<00:00,  3.50s/it]


Train Loss: 0.0014


Epoch 26/30: 100%|██████████| 181/181 [10:58<00:00,  3.64s/it]


Train Loss: 0.0014


Epoch 27/30: 100%|██████████| 181/181 [10:58<00:00,  3.64s/it]


Train Loss: 0.0014


Epoch 28/30: 100%|██████████| 181/181 [10:58<00:00,  3.64s/it]


Train Loss: 0.0014


Epoch 29/30: 100%|██████████| 181/181 [10:18<00:00,  3.42s/it]


Train Loss: 0.0014


Epoch 30/30: 100%|██████████| 181/181 [09:54<00:00,  3.28s/it]


Train Loss: 0.0013
✅ Models saved!


In [11]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm
import os
import numpy as np

# --------------------------
# DDPM Noise Scheduler
# --------------------------
class DiffusionScheduler:
    def __init__(self, timesteps=1000, beta_start=1e-4, beta_end=0.02):
        self.timesteps = timesteps
        self.beta = torch.linspace(beta_start, beta_end, timesteps)
        self.alpha = 1.0 - self.beta
        self.alpha_hat = torch.cumprod(self.alpha, dim=0)

    def q_sample(self, x_start, t, noise):
        sqrt_alpha_hat = self.alpha_hat[t].sqrt().unsqueeze(1)
        sqrt_one_minus_alpha_hat = (1 - self.alpha_hat[t]).sqrt().unsqueeze(1)
        return sqrt_alpha_hat * x_start + sqrt_one_minus_alpha_hat * noise

# --------------------------
# Simple UNet-style MLP
# --------------------------
class MLPDiffusion(nn.Module):
    def __init__(self, latent_dim=128, t_emb_dim=64):
        super().__init__()
        self.time_embed = nn.Sequential(
            nn.Linear(1, t_emb_dim),
            nn.ReLU(),
            nn.Linear(t_emb_dim, t_emb_dim),
        )
        self.net = nn.Sequential(
            nn.Linear(latent_dim + t_emb_dim, 256),
            nn.ReLU(),
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.Linear(256, latent_dim)
        )

    def forward(self, x, t):
        t = t.float().unsqueeze(1) / 1000.0
        t_emb = self.time_embed(t)
        x = torch.cat([x, t_emb], dim=1)
        return self.net(x)

# --------------------------
# Latent Dataset
# --------------------------
class LatentVocalSet(Dataset):
    def __init__(self, dataloader, encoder, device):
        self.latents = []
        encoder.eval()
        with torch.no_grad():
            for batch in tqdm(dataloader, desc="Extracting latents"):
                batch = batch.to(device).view(batch.size(0), -1)
                z = encoder(batch)
                self.latents.append(z.cpu())
        self.latents = torch.cat(self.latents)

    def __len__(self):
        return len(self.latents)

    def __getitem__(self, idx):
        return self.latents[idx]

# --------------------------
# Training Loop
# --------------------------
def train_diffusion(epochs=20, latent_dim=128):
    # Load encoder
    diff_encoder = Encoder().to(device)
    diff_encoder.load_state_dict(torch.load("saved_models/encoder.pth"))
    diff_encoder.eval()

    # Dataset
    audio_ds = VocalSetDataset("VocalSet_processed/train")
    audio_loader = DataLoader(audio_ds, batch_size=16, shuffle=False)

    latent_ds = LatentVocalSet(audio_loader, diff_encoder, device)
    latent_loader = DataLoader(latent_ds, batch_size=32, shuffle=True)

    # Model & scheduler
    model = MLPDiffusion(latent_dim).to(device)
    noise_scheduler = DiffusionScheduler()
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
    mse = nn.MSELoss()

    for epoch in range(epochs):
        model.train()
        total_loss = 0
        for x_0 in tqdm(latent_loader, desc=f"Epoch {epoch+1}/{epochs}"):
            x_0 = x_0.to(device)
            noise = torch.randn_like(x_0)
            t = torch.randint(0, noise_scheduler.timesteps, (x_0.shape[0],), device=device)
            x_t = noise_scheduler.q_sample(x_0, t, noise)
            noise_pred = model(x_t, t)
            loss = mse(noise_pred, noise)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(f"Epoch {epoch+1}, Loss: {total_loss / len(latent_loader):.4f}")

    # Save the model
    os.makedirs("saved_models", exist_ok=True)
    torch.save(model.state_dict(), "saved_models/diffusion_model.pth")
    print("✅ Diffusion model saved!")

# --------------------------
# Run Training
# --------------------------
if __name__ == "__main__":
    train_diffusion()


Extracting latents: 100%|██████████| 181/181 [01:17<00:00,  2.34it/s]
Epoch 1/20: 100%|██████████| 91/91 [00:00<00:00, 159.23it/s]


Epoch 1, Loss: 0.9983


Epoch 2/20: 100%|██████████| 91/91 [00:00<00:00, 175.10it/s]


Epoch 2, Loss: 0.9690


Epoch 3/20: 100%|██████████| 91/91 [00:00<00:00, 182.26it/s]


Epoch 3, Loss: 0.8980


Epoch 4/20: 100%|██████████| 91/91 [00:00<00:00, 185.01it/s]


Epoch 4, Loss: 0.7953


Epoch 5/20: 100%|██████████| 91/91 [00:00<00:00, 173.11it/s]


Epoch 5, Loss: 0.7186


Epoch 6/20: 100%|██████████| 91/91 [00:00<00:00, 184.35it/s]


Epoch 6, Loss: 0.6582


Epoch 7/20: 100%|██████████| 91/91 [00:00<00:00, 185.17it/s]


Epoch 7, Loss: 0.6157


Epoch 8/20: 100%|██████████| 91/91 [00:00<00:00, 193.93it/s]


Epoch 8, Loss: 0.5710


Epoch 9/20: 100%|██████████| 91/91 [00:00<00:00, 189.95it/s]


Epoch 9, Loss: 0.5307


Epoch 10/20: 100%|██████████| 91/91 [00:00<00:00, 180.25it/s]


Epoch 10, Loss: 0.5008


Epoch 11/20: 100%|██████████| 91/91 [00:00<00:00, 184.11it/s]


Epoch 11, Loss: 0.4715


Epoch 12/20: 100%|██████████| 91/91 [00:00<00:00, 193.95it/s]


Epoch 12, Loss: 0.4297


Epoch 13/20: 100%|██████████| 91/91 [00:00<00:00, 189.10it/s]


Epoch 13, Loss: 0.4111


Epoch 14/20: 100%|██████████| 91/91 [00:00<00:00, 186.22it/s]


Epoch 14, Loss: 0.3866


Epoch 15/20: 100%|██████████| 91/91 [00:00<00:00, 182.26it/s]


Epoch 15, Loss: 0.3641


Epoch 16/20: 100%|██████████| 91/91 [00:00<00:00, 164.77it/s]


Epoch 16, Loss: 0.3414


Epoch 17/20: 100%|██████████| 91/91 [00:00<00:00, 177.57it/s]


Epoch 17, Loss: 0.3155


Epoch 18/20: 100%|██████████| 91/91 [00:00<00:00, 173.44it/s]


Epoch 18, Loss: 0.3076


Epoch 19/20: 100%|██████████| 91/91 [00:00<00:00, 161.87it/s]


Epoch 19, Loss: 0.2858


Epoch 20/20: 100%|██████████| 91/91 [00:00<00:00, 159.87it/s]

Epoch 20, Loss: 0.2735
✅ Diffusion model saved!





In [13]:
import torch
import torch.nn as nn
import torchaudio
import os
from tqdm import tqdm

# --------------------------
# Inference Function
# --------------------------
def generate_audio(
    output_dir="generated_audio",
    num_samples=5,
    latent_dim=128,
    timesteps=1000,
):
    os.makedirs(output_dir, exist_ok=True)

    # Load models
    decoder = Decoder().to(device)
    decoder.load_state_dict(torch.load("saved_models/decoder.pth"))
    decoder.eval()

    diffusion = MLPDiffusion(latent_dim).to(device)
    diffusion.load_state_dict(torch.load("saved_models/diffusion_model.pth"))
    diffusion.eval()

    scheduler = DiffusionScheduler(timesteps)

    with torch.no_grad():
        for i in range(num_samples):
            x_t = torch.randn(1, latent_dim).to(device)  # Start with noise

            for t in tqdm(reversed(range(timesteps)), desc=f"Generating sample {i+1}"):
                t_tensor = torch.tensor([t], device=device)
                z_pred = diffusion(x_t, t_tensor)
                beta_t = scheduler.beta[t].to(device)
                alpha_t = scheduler.alpha[t].to(device)
                alpha_hat_t = scheduler.alpha_hat[t].to(device)

                if t > 0:
                    noise = torch.randn_like(x_t)
                else:
                    noise = 0

                x_t = (
                    (1 / alpha_t.sqrt()) *
                    (x_t - ((1 - alpha_t) / (1 - alpha_hat_t).sqrt()) * z_pred)
                ) + beta_t.sqrt() * noise

            # Decode latent to waveform
            waveform = decoder(x_t).cpu().squeeze()
            if waveform.ndim == 1:
                waveform = waveform.unsqueeze(0)  # (1, samples)

            # Save to .wav
            out_path = os.path.join(output_dir, f"sample_{i+1}.wav")
            torchaudio.save(out_path, waveform, sample_rate=16000)
            print(f"✅ Saved: {out_path}")

# --------------------------
# Run
# --------------------------
generate_audio(num_samples=5)


Generating sample 1: 1000it [00:00, 1508.55it/s]


✅ Saved: generated_audio\sample_1.wav


Generating sample 2: 1000it [00:00, 1595.68it/s]


✅ Saved: generated_audio\sample_2.wav


Generating sample 3: 1000it [00:00, 1711.01it/s]


✅ Saved: generated_audio\sample_3.wav


Generating sample 4: 1000it [00:00, 1602.44it/s]


✅ Saved: generated_audio\sample_4.wav


Generating sample 5: 1000it [00:00, 1525.81it/s]


✅ Saved: generated_audio\sample_5.wav


In [7]:
import torch
import torchaudio
import torch.nn.functional as F
import torch.nn as nn
import os
from tqdm import tqdm
from scipy.signal import butter, lfilter

class DiffusionScheduler:
    def __init__(self, timesteps=1000, beta_start=1e-4, beta_end=0.02):
        self.timesteps = timesteps
        self.beta = torch.linspace(beta_start, beta_end, timesteps)
        self.alpha = 1.0 - self.beta
        self.alpha_hat = torch.cumprod(self.alpha, dim=0)

    def q_sample(self, x_start, t, noise):
        sqrt_alpha_hat = self.alpha_hat[t].sqrt().unsqueeze(1)
        sqrt_one_minus_alpha_hat = (1 - self.alpha_hat[t]).sqrt().unsqueeze(1)
        return sqrt_alpha_hat * x_start + sqrt_one_minus_alpha_hat * noise


class Decoder(nn.Module):
    def __init__(self, latent_dim=128, output_dim=22050*4):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(latent_dim, 512),
            nn.ReLU(),
            nn.Linear(512, 2048),
            nn.ReLU(),
            nn.Linear(2048, output_dim),
            nn.Tanh()
        )

    def forward(self, x):
        return self.net(x)


class MLPDiffusion(nn.Module):
    def __init__(self, latent_dim=128, time_embed_dim=64):
        super().__init__()
        self.time_embed = nn.Sequential(
            nn.Linear(1, time_embed_dim),
            nn.ReLU(),
            nn.Linear(time_embed_dim, time_embed_dim),
            nn.ReLU()
        )

        self.net = nn.Sequential(
            nn.Linear(latent_dim + time_embed_dim, 256),
            nn.ReLU(),
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.Linear(256, latent_dim)
        )

    def forward(self, x, t):
        t = t.float().unsqueeze(1) / 1000  # Normalize timestep
        t_emb = self.time_embed(t)
        x_t = torch.cat([x, t_emb], dim=1)
        return self.net(x_t)


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Re-initialize model objects
decoder = Decoder().to(device)
diffusion = MLPDiffusion().to(device)

# Load weights
decoder.load_state_dict(torch.load("saved_models/decoder.pth", map_location=device))
diffusion.load_state_dict(torch.load("saved_models/diffusion_model.pth", map_location=device))
scheduler = DiffusionScheduler(timesteps=1000)

# ----- Utility: Post-Processing -----
def high_pass_filter(waveform, cutoff=30, sr=16000):
    b, a = butter(1, cutoff / (sr / 2), btype='high', analog=False)
    return torch.tensor(lfilter(b, a, waveform.numpy()))

def normalize(wav):
    return wav / torch.max(torch.abs(wav))

def post_process_waveform(waveform, sr=16000):
    waveform = normalize(waveform)
    waveform = high_pass_filter(waveform, sr=sr)
    waveform = torch.clamp(waveform, -1.0, 1.0)
    return waveform

# ----- Improved Inference Function -----
def improved_generate_audio(
    decoder, diffusion, scheduler,
    num_samples=5, latent_dim=128, timesteps=1000,
    output_dir="generated_audio_improved"
):
    os.makedirs(output_dir, exist_ok=True)
    decoder.eval()
    diffusion.eval()

    with torch.no_grad():
        for i in range(num_samples):
            x = torch.randn(1, latent_dim).to(device)

            for t in tqdm(reversed(range(timesteps)), desc=f"Sample {i+1}"):
                t_tensor = torch.tensor([t], device=device)
                noise_pred = diffusion(x, t_tensor)

                beta_t = scheduler.beta[t].to(device)
                alpha_t = scheduler.alpha[t].to(device)
                alpha_hat_t = scheduler.alpha_hat[t].to(device)

                if t > 0:
                    z = torch.randn_like(x)
                else:
                    z = 0

                x = (
                    (1 / alpha_t.sqrt()) *
                    (x - ((1 - alpha_t) / (1 - alpha_hat_t).sqrt()) * noise_pred)
                ) + beta_t.sqrt() * z

            # Decode latent to waveform
            waveform = decoder(x).cpu().squeeze()
            if waveform.ndim == 1:
                waveform = waveform.unsqueeze(0)

            # Post-process
            waveform = post_process_waveform(waveform)

            # Save
            out_path = os.path.join(output_dir, f"sample_{i+1}.wav")
            torchaudio.save(out_path, waveform, sample_rate=16000)
            print(f"✅ Saved: {out_path}")

# Call this once everything is trained and loaded
improved_generate_audio(
    decoder=decoder,
    diffusion=diffusion,
    scheduler=scheduler,
    num_samples=5
)


Sample 1: 1000it [00:00, 1422.09it/s]


✅ Saved: generated_audio_improved\sample_1.wav


Sample 2: 1000it [00:00, 1601.44it/s]


✅ Saved: generated_audio_improved\sample_2.wav


Sample 3: 1000it [00:00, 1543.81it/s]


✅ Saved: generated_audio_improved\sample_3.wav


Sample 4: 1000it [00:00, 1408.98it/s]


✅ Saved: generated_audio_improved\sample_4.wav


Sample 5: 1000it [00:00, 1625.24it/s]


✅ Saved: generated_audio_improved\sample_5.wav
