In [1]:
import torchaudio as ta
import torch
from torch.utils.data import Dataset, DataLoader

import os

import librosa
import pandas as pd
import IPython.display as ipd

import numpy as np

import glob

from random import shuffle
import gc

from tqdm.auto import tqdm

In [2]:
class Signal_Synthesis_DataGen(Dataset):
    def __init__(self, noise_dir, signal_dir, num_samples=200, noise_path_save=None,\
                 n_fft=400, win_length=400, hop_len=200, f_min=0, f_max=8000, \
                 perform_stft=True, normalize=True, default_sr=16000, sec=6, augment=False):

        self.noise_dir = noise_dir
        self.signal_dir = signal_dir
        self.noise_path_save = noise_path_save
        self.n_fft = n_fft
        self.win_length = win_length
        self.hop_len = hop_len
        self.f_min = f_min
        self.f_max = f_max
        self.perform_stft = perform_stft
        self.normalize = normalize
        self.default_sr = default_sr
        self.sec = sec
        self.augment = augment



        self.noise_nums = self.get_noise_paths(noise_dir)
        self.noise_nums = self.noise_nums[:num_samples]
        self.noise_suffix = ".wav"

        self.signal_nums = self.get_signal_paths(signal_dir)
        self.signal_prefix = "common_voice_en_"
        self.signal_suffix = ".mp3"


    def get_noise_paths(self, noise_dir):
        file_nums = []
        for file in tqdm(os.listdir(noise_dir)):
            num = int(file.split(".")[0])
            file_nums.append(num)
        file_nums = np.asarray(file_nums)
        return file_nums

    def get_signal_paths(self, clips_path):

        file_nums = []
        for file in tqdm(os.listdir(clips_path)):
            num = file.split("_")[3]
            num = int(num.split(".")[0])
            file_nums.append(num)
        file_nums = np.asarray(file_nums)
        return file_nums



    def get_noise_from_sound(self, signal, noise, SNR):

        RMS_s = np.sqrt(np.mean(signal**2))

        RMS_n = np.sqrt(RMS_s**2/pow(10., SNR/10))

        RMS_n_current = np.sqrt(np.mean(noise**2))
        noise = noise*(RMS_n/RMS_n_current)

        return noise



    def get_mixed_signal(self, signal: torch.Tensor, noise: torch.Tensor, default_sr, sec, SNR):

        snip_audio = np.random.randint(0, 2)
        # if snip_audio:
        #     signal = ta.transforms.Vad(sample_rate=default_sr)(signal)

        sig_length = int(default_sr * sec)

        if len(signal) > sig_length:
            signal = signal[: sig_length]
        elif len(signal) <= sig_length:
            zero_signal = np.zeros((signal.shape))
            while len(signal) < sig_length:
                signal = np.concatenate((signal, zero_signal))
                zero_signal = np.zeros(signal.shape)
            signal = signal[ : sig_length]


        noise_len = len(noise)
        signal_len = len(signal)

        if len(noise) > len(signal):
            noise = noise[0 : len(signal)]
        elif len(noise) <= len(signal):

            #noise = torch.cat((noise, torch.zeros((len(signal) - len(noise)))))
            for i in range(int(len(signal)/len(noise))+1):
                noise = np.concatenate((noise, noise))

            noise = noise[:len(signal)]

        noise = self.get_noise_from_sound(signal, noise, SNR)

        signal_noise = signal+noise
        return signal_noise, signal

    def construct_signal_path(self, signal_id):
        file_num = str(self.signal_nums[signal_id])
        file_name = self.signal_prefix + str(file_num) + self.signal_suffix
        path = os.path.join(self.signal_dir, file_name)
        if os.path.exists(path):
            return path
        else:
            raise FileExistsError(f"{path}")
            
    def construct_noise_path(self, noise_id):
        file_num = str(self.noise_nums[noise_id])
        file_name = file_num + self.noise_suffix
        path = os.path.join(self.noise_dir, file_name)
        if os.path.exists(path):
            return path
        else:
            raise FileExistsError(f"{path}")



    def get_ids(self, signal_paths, noise_paths, idx):

        signal_id = idx//len(noise_paths)
        noise_id = idx - signal_id*len(noise_paths)
#         print(signal_id, noise_id)

        signal_path, noise_path = self.construct_signal_path(signal_id), self.construct_noise_path(noise_id)

        signal_noise_add, signal = self.develop_data(signal_path, noise_path)

        return signal_noise_add, signal

    def develop_data(self, signal_path, noise_path):

        SNR = np.random.randint(0, np.random.randint(0, 50)+1)
#         print(SNR)

        noise, nsr = librosa.load(noise_path, sr=self.default_sr)
        signal, ssr = librosa.load(signal_path, sr=self.default_sr)
        # noise, nsr = ta.load(noise_path)
        # noise = ta.transforms.Resample(orig_freq=nsr, new_freq=self.default_sr)(noise)
        # signal, ssr = ta.load(signal_path)
        # signal = ta.transforms.Resample(orig_freq=ssr, new_freq=self.default_sr)(signal)
        # noise = torch.from_numpy(noise)
        # signal = torch.from_numpy(signal)

        signal_noise_add, signal = self.get_mixed_signal(signal, noise, self.default_sr, self.sec, SNR)
        if self.perform_stft:
            signal_noise_add = librosa.stft(signal_noise_add, n_fft=self.n_fft, hop_length=self.hop_len, win_length=self.win_length)
            signal = librosa.stft(signal, n_fft=self.n_fft, hop_length=self.hop_len, win_length=self.win_length)
            # (signal_noise_add, signal) = torch.stft(combined_signal, n_fft=self.n_fft, hop_length=self.hop_len, win_length=self.win_length, normalized=self.normalize)

        return signal_noise_add, signal



    def __len__(self):

        return len(self.signal_nums)*len(self.noise_nums)

    def __getitem__(self, idx):

        if torch.is_tensor(idx):
            idx = idx.tolist()

        signal_noise_add, signal = self.get_ids(self.signal_nums, self.noise_nums, idx)
        gc.collect()

#         signal_noise_add, signal = signal_noise_add/signal_noise_add.max(), signal/signal.max()
        # print("returning the values from getitem dataset")
        return signal_noise_add, signal
#         return signal_noise_add, signal

In [3]:
if __name__ == "__main__":
    noise_dir = "./dataset/UrbanSound8K/audio/"
    noise_metadata = "./dataset/UrbanSound8K/metadata/UrbanSound8K.csv"
    signal_dir = "./dataset/cv-corpus-5.1-2020-06-22/en/clips/"
    signal_metadata = "./dataset/cv-corpus-5.1-2020-06-22/en/train.tsv"
    num_samples = 1000
    use_df = True
    df_path = "./dataset/cv-corpus-5.1-2020-06-22/en/train.tsv"
    signal_save_path = "./signal_paths_save.npy"
    noise_save_path = "./noise_paths_save.npy"
    default_sr = 16000
    sec = 6
    augment=False


    signal_synthesis_dataset = Signal_Synthesis_DataGen(noise_dir, noise_metadata, signal_dir, signal_metadata, num_samples, use_df, df_path, signal_save_path, noise_save_path, default_sr, sec, augment)
    signal_mix, signal = signal_synthesis_dataset.__getitem__(4532)
    print(signal_mix.shape)

HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=11.0), HTML(value='')))




ValueError: invalid literal for int() with base 10: 'fold5'

In [None]:
noise_dir = "./dataset/UrbanSound8K/all_files/"
signal_dir = "./dataset/cv-corpus-5.1-2020-06-22/en/clips/"
num_samples=200, 
noise_path_save = "./noise_paths_save.npy"
default_sr = 16000
sec = 6
augment=False

signal_synthesis_dataset = Signal_Synthesis_DataGen(noise_dir, signal_dir, num_samples=200, noise_path_save=noise_path_save,\
                 n_fft=400, win_length=400, hop_len=200, f_min=0, f_max=8000, \
                 perform_stft=True, normalize=True, default_sr=16000, sec=6, augment=False)

In [None]:
signal_mix, signal = signal_synthesis_dataset.__getitem__(4532)

In [None]:
print(signal_mix.shape)

In [None]:
len(signal_paths)

In [None]:
type(signal_paths)