In [None]:
# %%

" ai for source separation of percussions in urban sounds"
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
import os
import time
import soundfile as sf
import matplotlib.pyplot as plt
import librosa
import librosa.display
from tqdm import tqdm
import random
import argparse
import pandas as pd
from tqdm import tqdm
# We have 7358 sounds of differents classes (dog bark, drilling, jackhammer, siren, children_playing, engine idling, air conditioner, car horn) with a duration of maximum 4 seconds some are shorter
# 387 files of the percussions class that we want to separate from the others or "hear" better

n_fft = 256
hop_length = n_fft // 4

In [None]:
# %%

# where to find the noise
DATASET_NOISE_PATH = "C:\\Users\\jejep\\Desktop\\STAGE\\dataset"

metadata_noise = os.path.join(DATASET_NOISE_PATH, "UrbanSound8k.csv")
metadata_noise = pd.read_csv(metadata_noise)
print(metadata_noise['class'].value_counts())

# class
# dog_bark            1000
# children_playing    1000
# air_conditioner     1000
# engine_idling       1000
# jackhammer          1000
# drilling            1000
# siren                929
# car_horn             429
# Name: count, dtype: int64

DATASET_PERCUSSION_PATH = "C:\\Users\\jejep\\Desktop\\STAGE\\J.Palmilha\\data"
metadata_perc = os.path.join(DATASET_PERCUSSION_PATH, "metadata.csv")
metadata_perc = pd.read_csv(metadata_perc)

metadata_perc.value_counts()

# Example dataframe for noise classes
noise_data = {
    'class': ['dog_bark', 'children_playing', 'air_conditioner', 'engine_idling',
              'jackhammer', 'drilling', 'siren', 'car_horn'],
    'count': [1000, 1000, 1000, 1000, 1000, 1000, 929, 429]
}
df = pd.DataFrame(noise_data)

# Example paths to noise files (adjust paths according to your setup)
noise_folder = DATASET_NOISE_PATH  # path to the folder containing the noise files

class
dog_bark            1000
children_playing    1000
air_conditioner     1000
engine_idling       1000
jackhammer          1000
drilling            1000
siren                929
car_horn             429
Name: count, dtype: int64


In [None]:
# %%

# Function to create mixture


def create_mixture(percussion_audio, noise_audio):

    percussion_audio /= np.max(np.abs(percussion_audio))
    noise_audio /= np.max(np.abs(noise_audio))

    # Mix audio files
    mixture_audio = percussion_audio + noise_audio

    # Normalize mixture
    mixture_audio /= np.max(np.abs(mixture_audio))

    # Calculate stft
    stft = librosa.stft(mixture_audio, n_fft=n_fft, hop_length=hop_length)

    return mixture_audio, stft


def get_stft(audio):

    # Normalize audio
    audio /= np.max(np.abs(audio))

    # Calculate stft
    stft = librosa.stft(audio, n_fft=n_fft, hop_length=hop_length)

    return stft


def audio_from_spectrogram(mag, phase):
    stft = mag * phase
    # audio = librosa.istft(stft, hop_length=hop_length)
    audio = torch.istft(stft, n_fft=n_fft, hop_length=hop_length,
                        length=4 * 7812, window=torch.hann_window(n_fft, device='cpu'))
    return audio

# calculate error reconstruction and original


def calculate_error(original, reconstruction):
    error = torch.abs(original - reconstruction).mean()  # mean absolute
    return error

In [None]:
# %%

# Total number of percussion files
percussion_count = 387

# Calculate total number of noise files
total_noise_files = df['count'].sum()

# Calculate proportions for each noise class
df['proportion'] = df['count'] / total_noise_files

# every percussion files will get 40 noise files 5 from each class
# this will give us a total number of 387 * 24 = 9288 noise files
# Number of noise files to select for each percussion file
noise_files_per_percussion = 40

# we will save the informations of all the noised used for each percussion file
# save the informations in a csv file : percussion_file, noise_file, noise_class, mixture_file


def pad_audio_center(audio_path):

    # Load audio files
    audio, _ = librosa.load(path=audio_path, sr=7812)

    # Pad audio to have length of 4 seconds
    audio_len = len(audio)
    target_len = 4 * 7812  # 4 seconds at 7812 Hz
    pad_len = target_len - audio_len

    if pad_len > 0:
        pad_left = pad_len // 2
        pad_right = pad_len - pad_left
        audio = np.pad(audio, (pad_left, pad_right), 'constant')
    else:
        audio = audio[:target_len]

    return audio

In [None]:
# %%

DATASET_MIXTURE_PATH = "C:\\Users\\jejep\\Desktop\\STAGE\\data\\mixture_stft"

# Create the folder if it does not exist
if not os.path.exists(DATASET_MIXTURE_PATH):
    os.makedirs(DATASET_MIXTURE_PATH)

# Create a folder to save mixture audio
DATASET_MIXTURE_AUDIO_PATH = "C:\\Users\\jejep\\Desktop\\STAGE\\data\\mixture_audio"

# Create the folder if it does not exist
if not os.path.exists(DATASET_MIXTURE_AUDIO_PATH):
    os.makedirs(DATASET_MIXTURE_AUDIO_PATH)

# Create a folder to save percussion stft
DATASET_PERCUSSION_STFT_PATH = "C:\\Users\\jejep\\Desktop\\STAGE\\data\\percussion_stft"

# Create the folder if it does not exist
if not os.path.exists(DATASET_PERCUSSION_STFT_PATH):
    os.makedirs(DATASET_PERCUSSION_STFT_PATH)

# Create a folder to save noise stft
DATASET_NOISE_STFT_PATH = "C:\\Users\\jejep\\Desktop\\STAGE\\data\\noise_stft"

# Create the folder if it does not exist
if not os.path.exists(DATASET_NOISE_STFT_PATH):
    os.makedirs(DATASET_NOISE_STFT_PATH)

# Iterate over each percussion file
for index, row in metadata_perc.iterrows():
    percussion_filename = row['name']
    percussion_file = os.path.join(
        DATASET_PERCUSSION_PATH, percussion_filename)

    # Create a list to store noise information
    sound_info = []

    # Iterate over each noise class
    for _, noise_row in df.iterrows():
        noise_class = noise_row['class']
        noise_count = noise_row['count']
        noise_proportion = noise_row['proportion']

        # Calculate number of noise files to select for this class
        noise_files_to_select = int(
            noise_files_per_percussion * noise_proportion)

        # Randomly select noise files from the current class
        selected_fold = np.random.randint(1, 11)
        noise_files_in_fold = metadata_noise[(metadata_noise['class'] == noise_class) &
                                             (metadata_noise['fold'] == selected_fold)]['slice_file_name'].tolist()

        if len(noise_files_in_fold) > 0:
            selected_noise_files = np.random.choice(
                noise_files_in_fold, noise_files_to_select, replace=False)

            for noise_file_name in selected_noise_files:
                noise_file = os.path.join(DATASET_NOISE_PATH, f"fold{
                                          selected_fold}", noise_file_name)

                # Load audio files
                percussion_audio = pad_audio_center(percussion_file)
                noise_audio = pad_audio_center(noise_file)

                # Calculate stft
                stft = get_stft(
                    percussion_audio)
                stft = get_stft(noise_audio)

                # create mixture
                mixture_audio, stft = create_mixture(
                    percussion_audio, noise_audio)

                # Save sound information
                sound_info.append({
                    'percussion_file': percussion_filename,
                    'noise_file': noise_file_name,
                    # mixture file name
                    'mix_file': f"{percussion_filename}_{noise_file_name}",
                    'noise_class': noise_class,
                    'fold': selected_fold,
                })

                # Save mixture audio
                mixture_audio_path = os.path.join(
                    DATASET_MIXTURE_AUDIO_PATH, f"{percussion_filename}_{noise_file_name}.wav")
                sf.write(mixture_audio_path, mixture_audio, 7812)

                # Save mixture stft
                mixture_mag, mixture_phase = librosa.magphase(stft)
                mixture_stft_path = os.path.join(
                    DATASET_MIXTURE_PATH, f"{percussion_filename}_{noise_file_name}.npy")
                np.save(mixture_stft_path, mixture_mag)

                # Save noise stft
                noise_mag, noise_phase = librosa.magphase(stft)
                noise_stft_path = os.path.join(
                    DATASET_NOISE_STFT_PATH, f"{noise_file_name}.npy")
                np.save(noise_stft_path, noise_mag)

                # Save percussion stft
                percussion_mag, percussion_phase = librosa.magphase(stft)
                percussion_stft_path = os.path.join(
                    DATASET_PERCUSSION_STFT_PATH, f"{percussion_filename}.npy")
                np.save(percussion_stft_path, percussion_mag)

    # Save sound information to a csv file
    sound_info_df = pd.DataFrame(sound_info)
    sound_info_df.to_csv(os.path.join(
        DATASET_MIXTURE_PATH, f"{percussion_filename}_info.csv"), index=False)

In [None]:
# %%

# Define the dataset class


class MixtureDataset(Dataset):
    # noise class is a string maybe
    def __init__(self, metadata_file, transform=None, noise_class=None):
        self.metadata = pd.read_csv(metadata_file)
        # since we have multiple classes of noise we can choose one to train the model... we can also train the model with all the classes
        # we can use it to load only the noise files of a specific class
        self.noise_class = noise_class
        self.transform = transform

    def __len__(self):
        # get the row with only the noise class we want
        if self.noise_class:
            return len(self.metadata[self.metadata['noise_class'] == self.noise_class])

    def __getitem__(self, idx):
        # get the row with only the noise class we want
        if self.noise_class:
            row = self.metadata[self.metadata['noise_class']
                                == self.noise_class].iloc[idx]
            
            # Load mixture stft
            mixture_stft = np.load(row['mix_file'])
            mixture_stft = torch.tensor(mixture_stft, dtype=torch.float32)
            
            # Load noise stft
            noise_stft = np.load(row['noise_file'])
            noise_stft = torch.tensor(noise_stft, dtype=torch.float32)

            # Load percussion stft
            percussion_stft = np.load(row['percussion_file'])
            percussion_stft = torch.tensor(percussion_stft, dtype=torch.float32)
            
            return {
                'mixture_stft': mixture_stft,
                'noise_stft': noise_stft,
                'percussion_stft': percussion_stft
            }

In [None]:
metadata_perc

Unnamed: 0,name,duration
0,data_2024_02_12-14_06_15.wav,4.0
1,data_2024_02_12-14_06_26.wav,4.0
2,data_2024_02_12-14_06_37.wav,4.0
3,data_2024_02_12-14_06_49.wav,4.0
4,data_2024_02_12-14_06_57.wav,4.0
...,...,...
382,hps_2024_03_13-16_08_59.wav,4.0
383,hps_2024_03_13-16_09_07.wav,4.0
384,hps_2024_03_13-16_09_13.wav,4.0
385,hps_2024_03_13-16_09_19.wav,4.0


In [None]:
sound_info_df

Unnamed: 0,percussion_file,noise_file,mix_file,noise_class,fold
0,hps_2024_03_13-16_09_34.wav,34771-3-0-10.wav,hps_2024_03_13-16_09_34.wav_34771-3-0-10.wav,dog_bark,5
1,hps_2024_03_13-16_09_34.wav,29937-3-0-14.wav,hps_2024_03_13-16_09_34.wav_29937-3-0-14.wav,dog_bark,5
2,hps_2024_03_13-16_09_34.wav,181725-3-0-22.wav,hps_2024_03_13-16_09_34.wav_181725-3-0-22.wav,dog_bark,5
3,hps_2024_03_13-16_09_34.wav,76566-3-0-8.wav,hps_2024_03_13-16_09_34.wav_76566-3-0-8.wav,dog_bark,5
4,hps_2024_03_13-16_09_34.wav,180052-3-0-0.wav,hps_2024_03_13-16_09_34.wav_180052-3-0-0.wav,dog_bark,5
5,hps_2024_03_13-16_09_34.wav,160011-2-0-9.wav,hps_2024_03_13-16_09_34.wav_160011-2-0-9.wav,children_playing,2
6,hps_2024_03_13-16_09_34.wav,60591-2-0-15.wav,hps_2024_03_13-16_09_34.wav_60591-2-0-15.wav,children_playing,2
7,hps_2024_03_13-16_09_34.wav,172593-2-0-28.wav,hps_2024_03_13-16_09_34.wav_172593-2-0-28.wav,children_playing,2
8,hps_2024_03_13-16_09_34.wav,155219-2-0-0.wav,hps_2024_03_13-16_09_34.wav_155219-2-0-0.wav,children_playing,2
9,hps_2024_03_13-16_09_34.wav,155219-2-0-26.wav,hps_2024_03_13-16_09_34.wav_155219-2-0-26.wav,children_playing,2


In [None]:
#%%

# We have the 387 metadata files (same number of percussion)

# we should save the metadata into a single metadata file
# we can use this metadata file to load the data into the dataset class
# we can also use it to split the data into training, validation and test sets
# we can also use it to load only the noise files of a specific class

metadata_info = []

for index, row in metadata_perc.iterrows():
    percussion_filename = row['name']
    sound_info = pd.read_csv(os.path.join(DATASET_MIXTURE_PATH, f"{percussion_filename}_info.csv"))
    metadata_info.append(sound_info)

metadata_info = pd.concat(metadata_info)
metadata_info.to_csv(os.path.join(DATASET_MIXTURE_PATH, "metadata.csv"), index=False)

In [None]:
# %%

# Load metadata
metadata = pd.read_csv(
    "C:\\Users\\jejep\\Desktop\\STAGE\\data\\mixture_stft\\metadata.csv")

In [None]:
metadata

Unnamed: 0,percussion_file,noise_file,mix_file,noise_class,fold
0,data_2024_02_12-14_06_15.wav,18581-3-1-3.wav,data_2024_02_12-14_06_15.wav_18581-3-1-3.wav,dog_bark,2
1,data_2024_02_12-14_06_15.wav,204773-3-9-0.wav,data_2024_02_12-14_06_15.wav_204773-3-9-0.wav,dog_bark,2
2,data_2024_02_12-14_06_15.wav,179096-3-0-10.wav,data_2024_02_12-14_06_15.wav_179096-3-0-10.wav,dog_bark,2
3,data_2024_02_12-14_06_15.wav,63261-3-0-1.wav,data_2024_02_12-14_06_15.wav_63261-3-0-1.wav,dog_bark,2
4,data_2024_02_12-14_06_15.wav,97193-3-0-0.wav,data_2024_02_12-14_06_15.wav_97193-3-0-0.wav,dog_bark,2
...,...,...,...,...,...
14314,hps_2024_03_13-16_09_34.wav,156869-8-0-6.wav,hps_2024_03_13-16_09_34.wav_156869-8-0-6.wav,siren,5
14315,hps_2024_03_13-16_09_34.wav,111671-8-0-3.wav,hps_2024_03_13-16_09_34.wav_111671-8-0-3.wav,siren,5
14316,hps_2024_03_13-16_09_34.wav,203913-8-0-9.wav,hps_2024_03_13-16_09_34.wav_203913-8-0-9.wav,siren,5
14317,hps_2024_03_13-16_09_34.wav,72567-1-2-0.wav,hps_2024_03_13-16_09_34.wav_72567-1-2-0.wav,car_horn,2


In [None]:
# %%

# Load metadata
metadata = pd.read_csv(
    "C:\\Users\\jejep\\Desktop\\STAGE\\data\\mixture_stft\\metadata.csv")


# Define the dataset class

class MixtureDataset(Dataset):
    # noise class is a string maybe
    def __init__(self, metadata_file, transform=None, noise_class=None):
        self.metadata = pd.read_csv(metadata_file)
        # since we have multiple classes of noise we can choose one to train the model... we can also train the model with all the classes
        # we can use it to load only the noise files of a specific class
        self.noise_class = noise_class
        self.transform = transform

    def __len__(self):
        # get the row with only the noise class we want
        if self.noise_class:
            return len(self.metadata[self.metadata['noise_class'] == self.noise_class])

    def __getitem__(self, idx):
        # get the row with only the noise class we want
        if self.noise_class:
            row = self.metadata[self.metadata['noise_class']
                                == self.noise_class].iloc[idx]
            
            # Load mixture stft
            mixture_stft = np.load(row['mix_file'])
            mixture_stft = torch.tensor(mixture_stft, dtype=torch.float32)
            
            # Load noise stft
            noise_stft = np.load(row['noise_file'])
            noise_stft = torch.tensor(noise_stft, dtype=torch.float32)

            # Load percussion stft
            percussion_stft = np.load(row['percussion_file'])
            percussion_stft = torch.tensor(percussion_stft, dtype=torch.float32)
            
            return {
                'mixture_stft': mixture_stft,
                'noise_stft': noise_stft,
                'percussion_stft': percussion_stft
            }

In [None]:
# %%

# Load metadata
metadata = pd.read_csv(
    "C:\\Users\\jejep\\Desktop\\STAGE\\data\\mixture_stft\\metadata.csv")


# Define the dataset class

class MixtureDataset(Dataset):
    # noise class is a string maybe
    def __init__(self, metadata_file, transform=None, noise_class=None):
        self.metadata = pd.read_csv(metadata_file)
        # since we have multiple classes of noise we can choose one to train the model... we can also train the model with all the classes
        # we can use it to load only the noise files of a specific class
        self.noise_class = noise_class
        self.transform = transform

    def __len__(self):
        # get the row with only the noise class we want
        if self.noise_class:
            return len(self.metadata[self.metadata['noise_class'] == self.noise_class])

    def __getitem__(self, idx):
        # get the row with only the noise class we want
        if self.noise_class:
            row = self.metadata[self.metadata['noise_class']
                                == self.noise_class].iloc[idx]

            # Load mixture stft
            mixture_stft = np.load(os.path.join(
                DATASET_MIXTURE_PATH, f"{row['mix_file']}.npy"))
            mixture_stft = torch.tensor(mixture_stft)

            # Load noise stft
            noise_stft = np.load(os.path.join(
                DATASET_NOISE_STFT_PATH, f"{row['noise_file']}.npy"))
            noise_stft = torch.tensor(noise_stft)

            # Load percussion stft
            percussion_stft = np.load(os.path.join(
                DATASET_PERCUSSION_STFT_PATH, f"{row['percussion_file']}.npy"))
            percussion_stft = torch.tensor(percussion_stft)

            # Apply transform if it exists
            if self.transform:
                mixture_stft, noise_stft, percussion_stft = self.transform(
                    mixture_stft, noise_stft, percussion_stft)

            return {
                'mixture_stft': mixture_stft,
                'noise_stft': noise_stft,
                'percussion_stft': percussion_stft
            }

In [None]:
# %%

# Define the train, validation and test datasets
dataset = MixtureDataset(
    metadata_file="C:\\Users\\jejep\\Desktop\\STAGE\\data\\mixture_stft\\metadata.csv", noise_class='siren')

# Split the dataset into train, validation and test sets
train_size = int(0.7 * len(dataset))  # 70% of the data for training
val_size = int(0.1 * len(dataset))  # 10% of the data for validation
test_size = len(dataset) - train_size - val_size  # 20% of the data for testing

train_dataset, val_dataset, test_dataset = torch.utils.data.random_split(
    dataset, [train_size, val_size, test_size])

print(f"Train dataset size: {len(train_dataset)}")

# Define dataloaders
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=4, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=2, shuffle=False)

Train dataset size: 1354


In [None]:
# %%

# test the dataloader
data = next(iter(train_loader))
print(data.keys())

for keys in data.keys():
    print(keys, data[keys].shape)

dict_keys(['mixture_stft', 'noise_stft', 'percussion_stft'])
mixture_stft torch.Size([4, 129, 489])
noise_stft torch.Size([4, 129, 489])
percussion_stft torch.Size([4, 129, 489])
