In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import os
import shutil

source_folder = '/content/drive/MyDrive/Thesis_GitHub_Code'
destination_folder = '/content/Thesis_GitHub_Code'

if not os.path.exists(destination_folder):
    os.makedirs(destination_folder)

for item in os.listdir(source_folder):
    source_path = os.path.join(source_folder, item)
    destination_path = os.path.join(destination_folder, item)
    if os.path.isdir(source_path):
        shutil.copytree(source_path, destination_path)
    else:
        shutil.copy2(source_path, destination_path)

In [None]:
!pip install -r /content/Thesis_GitHub_Code/requirements.txt

In [None]:
!pip install -q datasets git+https://github.com/huggingface/transformers.git@main
!pip install -q encodec
!pip install -q soundfile

In [None]:
# importing modules for GPT2 and audio tokenization
!cp Thesis_GitHub_Code/model.py /content
!cp Thesis_GitHub_Code/utils.py /content

In [None]:
from dataset_splits import split_dataset_files, clas_dict, print_random_file_details
from model import AudioGPT2
from utils import augment, encode_audio, apply_delay_pattern, remove_delay_pattern

In [None]:
import os
import random
import torch
import librosa
import numpy as np
from datasets import load_dataset, Audio
from transformers import EncodecModel, AutoProcessor, GPT2Config, GPT2Model
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim
import torch.nn.functional as F
# used to define neural network layers and setting up and training the network
from IPython.display import Audio as IPyAudio
import soundfile as sf
import csv
from typing import List
from tqdm import tqdm # for progress bars
import soundfile as sf
from audiomentations import Compose, AddGaussianNoise
import wandb

In [None]:
os.environ["WANDB_NOTEBOOK_NAME"] = "FoleyGen_Oct.ipynb"
wandb.login(key = '') # Insert your own wandb login key from your accout

In [None]:
dataset_path = "Thesis_GitHub_Code/DataSet"
output_csv_path = "Thesis_GitHub_Code/dataset_splits.csv"

In [None]:
# Creating a dictionary for all Foley categories
clas_dict = {
    "DogBark": 0,
    "Footstep": 1,
    "Gunshot": 2,
    "Keyboard": 3,
    "MovingMotorVehicle": 4,
    "Rain": 5,
    "SneezeCough": 6,
}

# Splitting datasets into train, test and validation and printing them to a .csv
def split_dataset_files(dataset_path: str,output_csv_path: str, train_ratio: float = 0.8, test_ratio: float = 0.1, seed: int = None):
    training_files: List[dict] = []
    valid_files: List[dict] = []
    test_files: List[dict] = []

    all_files = []
    for root, dirs, files in os.walk(dataset_path):
        for file in files:
            if file.endswith(".wav"):
                class_id = clas_dict.get(os.path.basename(root), None)
                if class_id is not None:
                    all_files.append({"class_id": class_id, "file_path": os.path.join(root, file)})

    if seed is not None:
        random.seed(seed)
    random.shuffle(all_files)

    total_files = len(all_files)
    num_train = int(total_files * train_ratio)
    num_test = int(total_files * test_ratio)
    num_valid = total_files - num_train - num_test

    training_files = all_files[:num_train]
    test_files = all_files[num_train:num_train + num_test]
    valid_files = all_files[num_train + num_test:]

    with open(output_csv_path, "w", newline="") as csv_file:
        writer = csv.writer(csv_file)
        writer.writerow(["filepath", "split"])
        for file_info in training_files:
            writer.writerow([file_info["file_path"], "train"])
        for file_info in test_files:
            writer.writerow([file_info["file_path"], "test"])
        for file_info in valid_files:
            writer.writerow([file_info["file_path"], "validation"])

    return training_files, valid_files, test_files

# For ensuring proper splitting
def print_random_file_details(training_files, valid_files, test_files):
    """
    Prints details of one random audio file from either the training, validation, or test set.
    """
    all_files = [
        {"file": file, "split": "train"} for file in training_files
    ] + [
        {"file": file, "split": "validation"} for file in valid_files
    ] + [
        {"file": file, "split": "test"} for file in test_files
    ]

    random_file = random.choice(all_files)

    print(f"Randomly Selected File:")
    print(f"File Path: {random_file['file']['file_path']}")
    print(f"Class ID: {random_file['file']['class_id']}")
    print(f"Split: {random_file['split']}")

In [None]:
# Split the dataset
training_files, valid_files, test_files = training_files, valid_files, test_files = split_dataset_files(dataset_path, output_csv_path, train_ratio=0.8, test_ratio=0.1, seed=None)

In [None]:
print_random_file_details(training_files, valid_files, test_files)

Randomly Selected File:
File Path: Thesis_GitHub_Code/DataSet/SneezeCough/681.wav
Class ID: 6
Split: train


## Initalizing Encodec Model


In [None]:
# Initialize EnCodec model
from encodec import EncodecModel

encodec_model = EncodecModel.encodec_model_24khz()
encodec_model.set_target_bandwidth(6.0)

# Define codebook_size and num_quantizers based on the actual model
codebook_size = 1024
num_quantizers = 8

## Create AudioDataset Class and Data Loaders

In [None]:
class AudioDataset(Dataset):
    def __init__(self, file_list, encodec_model, max_length=300, codebook_size=1024, apply_augmentation = False):
        self.file_list = file_list
        self.encodec_model = encodec_model
        self.max_length = max_length
        self.codebook_size = codebook_size
        self.vocab_size = self.codebook_size + 1  # +1 for padding
        self.num_quantizers = 8
        self.apply_augmentation = apply_augmentation

    def __len__(self):
        return len(self.file_list)

    def __getitem__(self, idx):
        file_info = self.file_list[idx]
        file_path = file_info["file_path"]
        class_id = file_info["class_id"]  # Retrieve the class ID
        codes = encode_audio(file_path, self.encodec_model, apply_augmentation=self.apply_augmentation)
        delayed_codes = apply_delay_pattern(codes,self.codebook_size, self.num_quantizers)

        # Truncate or pad sequences to max_length
        input_ids = delayed_codes
        padding_value = self.vocab_size - 1
        if input_ids.shape[0] > self.max_length:
            input_ids = input_ids[:self.max_length, :]
        else:
            pad_length = self.max_length - input_ids.shape[0]
            padding = torch.full((pad_length, input_ids.shape[1]), padding_value, dtype=torch.long)
            input_ids = torch.cat([input_ids, padding], dim=0)

        return input_ids, class_id  # Return the class ID along with the input_ids




# Create Data Loaders
max_sequence_length = 300
batch_size = 4

train_dataset = AudioDataset(training_files, encodec_model, max_length=max_sequence_length, apply_augmentation = True) # Apply augmentation only to training files
valid_dataset = AudioDataset(valid_files, encodec_model, max_length=max_sequence_length, apply_augmentation = False)
test_dataset = AudioDataset(test_files, encodec_model, max_length=max_sequence_length, apply_augmentation = False)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False)

## Define AudioGPT2 Model


## Train model

In [None]:
# Initialize the model, loss function, and optimizer
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = AudioGPT2(num_quantizers=num_quantizers, codebook_size=codebook_size).to(device)
criterion = nn.CrossEntropyLoss(ignore_index=model.vocab_size - 1)  # Ignore padding token
optimizer = optim.Adam(model.parameters(), lr=5e-5, weight_decay = 1e-4) # applying learning rate, l2 regularization

#### For starting a new training run

In [None]:
num_epochs = 300

for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    for batch in tqdm(train_loader, desc=f"Training Epoch {epoch+1}"):
        input_ids, class_id = batch
        input_ids, class_id = input_ids.to(device), class_id.to(device)

        optimizer.zero_grad()



        # Prepare inputs and targets
        inputs = input_ids[:, :-1, :]  # [batch_size, seq_length-1, num_quantizers]
        targets = input_ids[:, 1:, :]  # [batch_size, seq_length-1, num_quantizers]

        # Forward pass
        logits = model(input_ids = inputs, class_id = class_id)  # [batch_size, seq_length-1, total_vocab_size]

        # Reshape logits and targets
        batch_size, seq_length_minus1, _ = inputs.shape
        logits = logits.reshape(batch_size * seq_length_minus1, model.total_vocab_size)
        targets = targets.reshape(batch_size * seq_length_minus1, model.num_quantizers)

        # Compute loss per quantizer
        loss = 0
        for q in range(model.num_quantizers):
            q_targets = targets[:, q]  # [batch_size * seq_length_minus1]
            q_offset = q * model.vocab_size
            q_logits = logits[:, q_offset : q_offset + model.vocab_size]  # [batch_size * seq_length_minus1, vocab_size]
            loss += criterion(q_logits, q_targets)
        loss = loss / model.num_quantizers  # Average over quantizers


        # Backward pass and optimization
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    avg_loss = total_loss / len(train_loader)
    print(f"Epoch {epoch + 1}/{num_epochs}, Training Loss: {avg_loss:.4f}")
    wandb.log({"Training Loss": avg_loss, "Epoch": epoch + 1})

    # Validation loop
    model.eval()
    val_loss = 0
    with torch.no_grad():
        for batch in tqdm(valid_loader, desc=f"Validation Epoch {epoch+1}"):
            input_ids, class_id = batch
            input_ids, class_id = input_ids.to(device), class_id.to(device)
            inputs = input_ids[:, :-1, :]
            targets = input_ids[:, 1:, :]
            logits = model(input_ids=inputs, class_id=class_id)
            batch_size, seq_length_minus1, _ = inputs.shape
            logits = logits.reshape(batch_size * seq_length_minus1, model.total_vocab_size)
            targets = targets.reshape(batch_size * seq_length_minus1, model.num_quantizers)

            loss = 0
            for q in range(model.num_quantizers):
                q_targets = targets[:, q]
                q_offset = q * model.vocab_size
                q_logits = logits[:, q_offset : q_offset + model.vocab_size]
                loss += criterion(q_logits, q_targets)
            loss = loss / model.num_quantizers
            val_loss += loss.item()
    avg_val_loss = val_loss / len(valid_loader)
    print(f"Epoch {epoch + 1}/{num_epochs}, Validation Loss: {avg_val_loss:.4f}")
    wandb.log({"Validation Loss": avg_val_loss, "Epoch": epoch + 1})

    if (epoch + 1) % 10 == 0:
        checkpoint_path = f"augmodel_checkpoint_l2_{epoch + 1}.pth"
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': avg_loss,
        }, checkpoint_path)
        print(f"Checkpoint saved for epoch {epoch + 1} at {checkpoint_path}")

wandb.finish()


### Generating audio

In [None]:
import torch
import torch.nn.functional as F
from tqdm import tqdm
import soundfile as sf
from IPython.display import Audio as IPyAudio

checkpoint_path = "/scratch/ssr9055/my_env/BEST_CHECKPOINT_AUG_L2.pth" # Replace with the last saved checkpoint

# Function to remove delay pattern
def remove_delay_pattern(delayed_codes, num_quantizers):
    num_frames = delayed_codes.shape[0] - (num_quantizers - 1)
    codes = torch.zeros(num_frames, num_quantizers, dtype=delayed_codes.dtype)
    for q in range(num_quantizers):
        codes[:, q] = delayed_codes[q:q + num_frames, q]
    return codes  # Shape: [num_frames, num_quantizers]

# Function to generate audio
def generate_audio(model, encodec_model, class_id, num_quantizers=8, codebook_size=1024, max_length=600, temperature=1.0, device='cpu'):
    model.eval()  # Set model to evaluation mode
    encodec_model.eval()  # Set encodec_model to evaluation mode

    model.to(device)  # Move model to the specified device

    # Start with padding tokens for each quantizer
    start_token = codebook_size  # Padding token index
    input_ids = torch.full((1, 1, num_quantizers), start_token, dtype=torch.long, device=device)  # Shape: [1, 1, num_quantizers]

    # Convert the class_id to a tensor and move to device
    class_id_tensor = torch.tensor([class_id], device=device)

    generated = []  # List to hold generated tokens

    with torch.no_grad():  # Disable gradient calculations for generation
        for _ in tqdm(range(max_length), desc="Generating Audio"):
            # Forward pass through the model, passing class_id_tensor
            logits = model(input_ids=input_ids, class_id=class_id_tensor)  # [1, seq_length, total_vocab_size]
            logits = logits[:, -1, :]  # [1, total_vocab_size] - Get the logits for the last time step

            # Apply temperature to control randomness
            logits = logits / temperature

            next_tokens = []  # List to store the next token for each quantizer

            # Sample next token for each quantizer
            for q in range(num_quantizers):
                q_offset = q * (codebook_size + 1)
                q_logits = logits[:, q_offset:q_offset + codebook_size + 1]  # [1, vocab_size]
                q_probs = F.softmax(q_logits, dim=-1)  # Convert logits to probabilities
                q_next_token = torch.multinomial(q_probs, num_samples=1)  # Sample next token
                q_next_token = q_next_token.squeeze(1)  # Remove extra dimension

                # If the sampled token is the padding token, replace it with a valid token
                q_next_token_value = q_next_token.item()
                if q_next_token_value == codebook_size:
                    q_next_token_value = torch.randint(0, codebook_size, (1,)).item()

                next_tokens.append(torch.tensor([q_next_token_value], device=device, dtype=torch.long))

            # Stack the next tokens for each quantizer and append to generated sequence
            next_tokens = torch.stack(next_tokens, dim=1)  # Shape: [1, num_quantizers]
            generated.append(next_tokens.squeeze(0))  # Append generated tokens
            input_ids = torch.cat([input_ids, next_tokens.unsqueeze(0)], dim=1)  # Update input_ids for the next time step

    # Stack the generated tokens to form the final token sequence
    generated_tokens = torch.stack(generated, dim=0)  # [seq_length, num_quantizers]

    # Remove the delay pattern from generated tokens
    codes = remove_delay_pattern(generated_tokens, num_quantizers)

    # Add batch dimension to match the input shape for decode
    codes = codes.unsqueeze(0)

    # Ensure codes are within valid codebook size
    codes = codes.clamp(0, codebook_size - 1)

    # Decode the audio using the EnCodec model
    try:
        with torch.no_grad():
            codes = codes.permute(0, 2, 1)  # [batch_size, num_quantizers, num_frames]
            encoded_frames = [(codes.to(next(encodec_model.parameters()).device), None)]
            decoded_audio = encodec_model.decode(encoded_frames)
            audio = decoded_audio.squeeze().cpu().detach().numpy()  # Convert to numpy array
        return audio
    except Exception as e:
        print(f"Error during decoding: {str(e)}")
        return None


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
max_length = 350  # Adjust as needed
temperature = 0.7  # Adjust as needed

checkpoint = torch.load(checkpoint_path, map_location=device)
model.load_state_dict(checkpoint['model_state_dict'])

# Loop over each category in clas_dict
for class_name, class_id in clas_dict.items():
    print(f"Generating audio for category: {class_name}")

    generated_audio = generate_audio(
        model,
        encodec_model,
        class_id=class_id,  # Pass the class_id for each category
        num_quantizers=num_quantizers,
        codebook_size=codebook_size,
        max_length=max_length,
        temperature=temperature,
        device=device
    )

    if generated_audio is not None:
        # Save the generated audio to a file (optional)
        output_filename = f'generated_audio_{class_name}.wav'
        sf.write(output_filename, generated_audio, 24000)

        # Display the audio
        display(IPyAudio(output_filename))
    else:
        print(f"Audio generation failed for category: {class_name}")
