<a href="https://colab.research.google.com/github/godofwar1007/Cynaptics-inductionn/blob/main/Task_2_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Important libraries

In [None]:

!pip install librosa
!pip install kaggle
from google.colab import files
files.upload()

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
import torch.optim as optim
import torchvision.utils as vutils

import pandas as pd
import librosa
import numpy as np
import os
import time
from pathlib import Path

some helper thingies


In [None]:
!mkdir -p ~/.kaggle
!mv kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

!kaggle competitions download -c the-frequency-quest


In [None]:
!unzip the-frequency-quest.zip -d ./data

In [None]:
def get_default_device():
    if torch.cuda.is_available():
        return torch.device('cuda')
    else:
        return torch.device('cpu')

def to_device(data, device):
    if isinstance(data, (list, tuple)):
        return [to_device(x, device) for x in data]
    return data.to(device, non_blocking=True)

class DeviceDataLoader():
    def __init__(self, dl, device):
        self.dl = dl
        self.device = device

    def __iter__(self):
        for b in self.dl:
            yield to_device(b, self.device)

    def __len__(self):
        return len(self.dl)


The audio dataset

In [None]:
class AudioDataset(Dataset):
    def __init__(self, df, n_mels=128, fixed_width=300):
        self.df = df
        self.n_mels = n_mels
        self.fixed_width = fixed_width

    def __len__(self):
        return len(self.df)

    def __getitem__(self, index):
        row = self.df.iloc[index]
        filepath = row['filepath']

        try:
            y, sr = librosa.load(filepath, sr=22050)
        except Exception as e:
            print(f"Error loading {filepath}: {e}")
            return torch.zeros((3, self.n_mels, self.fixed_width))

        S = librosa.feature.melspectrogram(y=y, sr=sr, n_mels=self.n_mels)
        S_db = librosa.power_to_db(S, ref=np.max) # S_db is in range [~-80, 0]

        if S_db.shape[1] > self.fixed_width:
            S_db = S_db[:, :self.fixed_width]
        else:
            pad_width = self.fixed_width - S_db.shape[1]
            S_db = np.pad(S_db, ((0, 0), (0, pad_width)), mode='constant')

        S_db_3channel = np.stack([S_db, S_db, S_db], axis=0)

        normalized_spec = (S_db_3channel + 40.0) / 40.0

        normalized_spec = np.clip(normalized_spec, -1.0, 1.0)

        return torch.tensor(normalized_spec, dtype=torch.float32)


The dicriminator

In [None]:
def conv_block(in_channels, out_channels, pool=False):
    layers = [nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
              nn.BatchNorm2d(out_channels),
              nn.LeakyReLU(0.2, inplace=True)]
    if pool: layers.append(nn.MaxPool2d(2))
    return nn.Sequential(*layers)

class Discriminator(nn.Module):
    def __init__(self, in_channels=3):
        super().__init__()

        self.conv1 = conv_block(in_channels, 64)
        self.conv2 = conv_block(64, 128, pool=True)
        self.res1 = nn.Sequential(conv_block(128, 128), conv_block(128, 128))

        self.conv3 = conv_block(128, 256, pool=True)
        self.conv4 = conv_block(256, 512, pool=True)
        self.res2 = nn.Sequential(conv_block(512, 512), conv_block(512, 512))

        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool2d((1, 1)), # 512 x 1 x 1
            nn.Flatten(),                 # 512
            nn.Linear(512, 1),            # 512 -> 1 (Real/Fake)
            nn.Sigmoid()                  # Squish to 0-1 probability
        )

    def forward(self, xb):
        out = self.conv1(xb)
        out = self.conv2(out)
        out = self.res1(out) + out
        out = self.conv3(out)
        out = self.conv4(out)
        out = self.res2(out) + out
        out = self.classifier(out)
        return out


THe generator

In [None]:
class Generator(nn.Module):
    def __init__(self, latent_size=100, ngf=128):
        super().__init__()
        self.latent_size = latent_size

        self.model = nn.Sequential(
            nn.Linear(latent_size, ngf * 8 * 8 * 19), # 128 * 8 * 19 = 19456
            nn.BatchNorm1d(ngf * 8 * 8 * 19),
            nn.ReLU(),

            nn.Unflatten(1, (ngf * 8, 8, 19)),

            nn.ConvTranspose2d(ngf * 8, ngf * 4, kernel_size=4, stride=2, padding=1, bias=False), # -> [512, 16, 38]
            nn.BatchNorm2d(ngf * 4),
            nn.ReLU(True),

            # State: [512, 16, 38]
            nn.ConvTranspose2d(ngf * 4, ngf * 2, kernel_size=4, stride=2, padding=1, bias=False), # -> [256, 32, 76]
            nn.BatchNorm2d(ngf * 2),
            nn.ReLU(True),

            # State: [256, 32, 76]
            nn.ConvTranspose2d(ngf * 2, ngf, kernel_size=4, stride=2, padding=1, bias=False), # -> [128, 64, 152]
            nn.BatchNorm2d(ngf),
            nn.ReLU(True),

            # State: [128, 64, 152]
            nn.ConvTranspose2d(ngf, 3, kernel_size=4, stride=2, padding=1, bias=False), # -> [3, 128, 304]

            # must crop to the exact size of [3, 128, 300]
            nn.AdaptiveAvgPool2d((128, 300)),

            # Output normalized to [-1, 1]
            nn.Tanh()
        )

    def forward(self, z):
        return self.model(z)


Running the model


In [None]:
!ls -l ./data

In [None]:
!ls -l ./data/train

In [None]:
print("="*30)
print("Starting Audio GAN Training...")

device = get_default_device()
print(f"Using device: {device}")

os.makedirs("gan_audio_images", exist_ok=True)

N_MELS = 128
FIXED_WIDTH = 300
BATCH_SIZE = 32
lr = 0.0002
num_epochs = 50
latent_size = 100

print("Scanning audio files...")
data_dir = Path("./data")
train_audio_path = data_dir / "train" / "train" # Corrected path
filepaths = [] # We only need a list for filepaths

for folder in train_audio_path.iterdir():
    if folder.is_dir():
        # We just loop through and find all files
        for ext in ['*.wav', '*.mp3', '*.ogg']:
            for file in folder.glob(ext):
                filepaths.append(file) # We only append the path

# Our DataFrame *only* has filepaths. This is correct for the GAN!
df = pd.DataFrame({'filepath': filepaths})
train_ds = AudioDataset(df, n_mels=N_MELS, fixed_width=FIXED_WIDTH)
train_dl = DataLoader(train_ds, BATCH_SIZE, shuffle=True, num_workers=2, pin_memory=True)
train_dl = DeviceDataLoader(train_dl, device)
print(f"DataLoaders are ready. Found {len(df)} real audio files.")

D = Discriminator(in_channels=3).to(device)
G = Generator(latent_size=latent_size).to(device)

criterion = nn.BCELoss()

""" apparently the D was too smart and the d_loss was very low during a training and
gan was collapsing so i fixed it by nerfing the D by reducing its lr by 10"""
d_lr = lr / 10
g_lr = lr

print(f"Starting training with D_lr: {d_lr} and G_lr: {g_lr}")
d_optimizer = optim.Adam(D.parameters(), lr=d_lr, betas=(0.5, 0.999))
g_optimizer = optim.Adam(G.parameters(), lr=g_lr, betas=(0.5, 0.999))

fixed_noise = torch.randn(BATCH_SIZE, latent_size).to(device)


Training loop

In [None]:
print("Starting GAN training! This will take a while...")
total_step = len(train_dl)

for epoch in range(num_epochs):
    for i, real_images in enumerate(train_dl):

        current_batch_size = real_images.size(0)

        D.zero_grad()
        real_labels = torch.ones(current_batch_size, 1).to(device)

        outputs = D(real_images)
        d_loss_real = criterion(outputs, real_labels)

        z = torch.randn(current_batch_size, latent_size).to(device)
        fake_images = G(z)
        fake_labels = torch.zeros(current_batch_size, 1).to(device)

        outputs = D(fake_images.detach())
        d_loss_fake = criterion(outputs, fake_labels)

        d_loss = d_loss_real + d_loss_fake
        d_loss.backward()
        d_optimizer.step()

        G.zero_grad()

        outputs = D(fake_images)
        g_loss = criterion(outputs, real_labels)

        g_loss.backward()
        g_optimizer.step()

        if (i+1) % 100 == 0:
            print(f"Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{total_step}], "
                  f"D_loss: {d_loss.item():.4f}, G_loss: {g_loss.item():.4f}")

    G.eval()
    with torch.no_grad():
        fake_images_fixed = G(fixed_noise)

        fake_images_fixed = (fake_images_fixed + 1) / 2

        vutils.save_image(fake_images_fixed, f"gan_audio_images/epoch_{epoch+1}.png", normalize=True)
    G.train()

print("TRAINING FINISHED!")

torch.save(G.state_dict(), 'audio_generator_model.pth')
print("Generator model saved to audio_generator_model.pth")

Generating audio

In [None]:

!pip install soundfile
import soundfile as sf

print("="*30)
print("Generating final audio samples...")

device = get_default_device()
latent_size = 100
model_path = 'audio_generator_model.pth'
G = Generator(latent_size=latent_size).to(device)
G.load_state_dict(torch.load(model_path))
G.eval()
print("Generator model loaded.")

os.makedirs("gan_audio_files", exist_ok=True)


num_samples = 5
sample_rate = 22050

with torch.no_grad():
    for i in range(num_samples):

        z = torch.randn(1, latent_size).to(device)
        fake_spec_tensor = G(z)
        fake_spec_norm = fake_spec_tensor.cpu().numpy()[0]
        fake_spec_1ch = fake_spec_norm[0, :, :]

        fake_spec_db = (fake_spec_1ch * 40.0) - 40.0

        S_power = librosa.db_to_power(fake_spec_db)

        y_fake = librosa.feature.inverse.mel_to_audio(
            S_power,
            sr=sample_rate,
            n_fft=2048,
            hop_length=512
        )

        filename = f"gan_audio_files/generated_audio_{i+1}.wav"
        sf.write(filename, y_fake, sample_rate)

print(f"Successfully generated {num_samples} audio files in 'gan_audio_files' folder!")
print("="*30)