In [None]:
import torch
import os
import matplotlib.pyplot as plt
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset
from torchvision import datasets
from torchvision.transforms import ToTensor
from torch.utils.data import DataLoader
from sklearn.model_selection import LeaveOneOut

In [None]:
import random

random.seed(10)
print(random.random())


0.5714025946899135


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
data_path = "/content/drive/MyDrive/ML CS433 Project 2/data"
#data_path = os.path.join(os.path.dirname(os.path.normpath(os.getcwd())), "data")
noise_path = os.path.join(data_path, "demand")
print(os.path.exists(data_path))
os.path.exists(noise_path)

True


True

In [None]:
# Noise addition function
import soundfile as sf
def set_global_snr(clean, noise, snr_db):    
    clean_pow = np.sum(clean**2)
    noise_pow = np.sum(noise**2)
    noise_scale = np.sqrt( clean_pow / (noise_pow * 10**(snr_db/10)) )
    return noise_scale

In [None]:
# Create a dictionary for noise categories and their environments
noise_categories = dict()
for env in os.listdir(noise_path):
  if env[0] in noise_categories:
    noise_categories[env[0]].append(env)
  else:
    noise_categories[env[0]] = [env]

print(noise_categories)

{'D': ['DLIVING', 'DKITCHEN', 'DWASHING'], 'N': ['NFIELD', 'NPARK', 'NRIVER'], 'O': ['OOFFICE', 'OHALLWAY', 'OMEETING'], 'S': ['SPSQUARE', 'SCAFE', 'STRAFFIC'], 'T': ['TMETRO', 'TBUS', 'TCAR'], 'P': ['PRESTO', 'PCAFETER', 'PSTATION']}


In [None]:
import random

#Due to disk space usage not implementable
def get_channels_():
  training_channels_paths = []
  val_environments_paths = []
  for category, envs in noise_categories.items():
    # Select 2 environments from each category for validation
    val_noise_set = random.sample(envs, 2)
    val_environments_paths.append(noise_path + '/' + val_noise_set[0] + '/' + 'ch05.wav') #maybe randomization of channel would make sense
    val_environments_paths.append(noise_path + '/' + val_noise_set[1] + '/' + 'ch05.wav')
    # Select the remaining environment from each category for training
    training_noise_set = set(envs) - set(val_noise_set)
    channels = os.listdir(noise_path + '/' + list(training_noise_set)[0])
    # Select random channel from each environment for training
    training_channels_paths.append(noise_path + '/' + list(training_noise_set)[0] + '/' + 'ch05.wav')
  return val_environments_paths, training_channels_paths



def get_channels():
  train = [os.path.join(noise_path,noise,'ch05.wav') for noise in ["DKITCHEN","OHALLWAY","SCAFE"]]
  test = [os.path.join(noise_path,noise,'ch05.wav') for noise in ["DLIVING","OOFFICE","SPSQUARE"]]
  return test, train

val_environments_paths, training_channels_paths = get_channels()
print(val_environments_paths)
print(training_channels_paths)


['/content/drive/MyDrive/ML CS433 Project 2/data/demand/DLIVING/ch05.wav', '/content/drive/MyDrive/ML CS433 Project 2/data/demand/OOFFICE/ch05.wav', '/content/drive/MyDrive/ML CS433 Project 2/data/demand/SPSQUARE/ch05.wav']
['/content/drive/MyDrive/ML CS433 Project 2/data/demand/DKITCHEN/ch05.wav', '/content/drive/MyDrive/ML CS433 Project 2/data/demand/OHALLWAY/ch05.wav', '/content/drive/MyDrive/ML CS433 Project 2/data/demand/SCAFE/ch05.wav']


In [None]:
training_SNR = [10]
testing_SNR = [0, 5, 10, 15, 20]

def rms(sig):
  return np.sqrt(np.mean(sig**2))

def get_noisy_signal(sig, noise, snr):
  required_rms = np.sqrt(rms(sig)/10**(snr/10))
  random_index = random.randint(0, len(noise)-len(sig))
  noisy_signal = sig + (required_rms/rms(noise)) * noise[random_index: random_index + len(sig)]
  return noisy_signal


In [None]:
training_channels_paths

['/content/drive/MyDrive/ML CS433 Project 2/data/demand/DKITCHEN/ch05.wav',
 '/content/drive/MyDrive/ML CS433 Project 2/data/demand/OHALLWAY/ch05.wav',
 '/content/drive/MyDrive/ML CS433 Project 2/data/demand/SCAFE/ch05.wav']

In [None]:
val_environments_paths

['/content/drive/MyDrive/ML CS433 Project 2/data/demand/DLIVING/ch05.wav',
 '/content/drive/MyDrive/ML CS433 Project 2/data/demand/OOFFICE/ch05.wav',
 '/content/drive/MyDrive/ML CS433 Project 2/data/demand/SPSQUARE/ch05.wav']

In [None]:
speaker_ID = []
speaker_file = []


def crawl_files(dir_name):
  for root, dirs, files in os.walk(os.path.join(data_path,dir_name)):
    for file in files:
      ID = file.split("_")[0]
      if file.split(".")[0].endswith("north"):
        speaker_file.append(os.path.join(root, file))
        speaker_ID.append(file.split("_")[0])
    
crawl_files("HC")
crawl_files("PD")

#filenames = {}
#for i, j in zip(speaker_ID, speaker_file)
#    filenames.setdefault(i, []).append(j)

filenames = dict(zip(speaker_ID, speaker_file))

filenames


{'ID01': '/content/drive/MyDrive/ML CS433 Project 2/data/HC/ID01_hc_0_0_0_north.wav',
 'ID15': '/content/drive/MyDrive/ML CS433 Project 2/data/HC/ID15_hc_0_0_0_north.wav',
 'ID03': '/content/drive/MyDrive/ML CS433 Project 2/data/HC/ID03_hc_0_0_0_north.wav',
 'ID10': '/content/drive/MyDrive/ML CS433 Project 2/data/HC/ID10_hc_0_0_0_north.wav',
 'ID00': '/content/drive/MyDrive/ML CS433 Project 2/data/HC/ID00_hc_0_0_0_north.wav',
 'ID08': '/content/drive/MyDrive/ML CS433 Project 2/data/HC/ID08_hc_0_0_0_north.wav',
 'ID05': '/content/drive/MyDrive/ML CS433 Project 2/data/HC/ID05_hc_0_0_0_north.wav',
 'ID09': '/content/drive/MyDrive/ML CS433 Project 2/data/HC/ID09_hc_0_0_0_north.wav',
 'ID36': '/content/drive/MyDrive/ML CS433 Project 2/data/HC/ID36_hc_0_0_0_north.wav',
 'ID30': '/content/drive/MyDrive/ML CS433 Project 2/data/PD/ID30_pd_2_1_1_north.wav',
 'ID06': '/content/drive/MyDrive/ML CS433 Project 2/data/PD/ID06_pd_3_1_1_north.wav',
 'ID34': '/content/drive/MyDrive/ML CS433 Project 2/da

In [None]:
noisy_data_path = os.path.join(data_path, "noisy_data")
if not os.path.exists(noisy_data_path):
    os.mkdir(noisy_data_path)
    os.mkdir(os.path.join(noisy_data_path,"test"))
    os.mkdir(os.path.join(noisy_data_path,"train"))

In [None]:
## Functions from feature_extractor

import torchaudio.compliance.kaldi as ta_kaldi
from typing import Tuple
import soundfile as sf
import numpy as np
import torch

def get_waveform(audio_path: str, normalization=True) -> Tuple[np.ndarray, int]:
    """Get the waveform and sample rate of a 16-bit mono-channel WAV or FLAC.
    adapted from https://github.com/pytorch/fairseq/blob/master/fairseq/data/audio/audio_utils.py
    Args:
        path_or_fp (str): the path or file-like object
        normalization (bool): Normalize values to [-1, 1] (Default: True)
    Returns:
        (numpy.ndarray): [n,] waveform array, (int): sample rate
   """   
    waveform, sample_rate = sf.read(audio_path)
    if normalization:
        waveform = normalize_rms(waveform, rms_level = 0.1)
    return waveform, sample_rate


def normalize_rms(signal: np.ndarray, rms_level: float = 0.1) -> np.ndarray:
    a = np.sqrt( (len(signal) * rms_level**2) / np.sum(signal**2) )
    return signal * a


def compute_mel_spectrograms(waveform: np.ndarray,sample_rate: int, options:dict) -> np.ndarray:
    """Compute mel spectrogram from waveform
    Args:
        waveform (np.ndarray): input waveform array
        sample_rate (float): sample rate
    """    
    waveform = torch.from_numpy(waveform).unsqueeze(0)
    features = ta_kaldi.fbank(waveform, **options, sample_frequency=sample_rate)
    return features.numpy()

def compute_stft_spectrograms(waveform: np.ndarray,sample_rate: int, options:dict) -> np.ndarray:
    """Compute mel spectrogram from waveform
    Args:
        waveform (np.ndarray): input waveform array
        sample_rate (float): sample rate
    """    
    waveform = torch.from_numpy(waveform).unsqueeze(0)
    features = ta_kaldi.spectrogram(waveform, **options, sample_frequency=sample_rate)
    return features.numpy()


def compute_noisy_mel_f(audio_f: str, noise_f: str, options:dict, snr):
  signal, s_fs = get_waveform(audio_f)
  noise, n_fs = get_waveform(noise_f)
  if n_fs != s_fs:
    raise("Error: sampling frequency not the same")
  
  noisy_signal = get_noisy_signal(signal,noise,snr)
  features = compute_mel_spectrograms(noisy_signal,s_fs,options)
  return features.T

def compute_noisy_stft_f(audio_f: str, noise_f: str, options:dict, snr):
  signal, s_fs = get_waveform(audio_f)
  noise, n_fs = get_waveform(noise_f)
  if n_fs != s_fs:
    print(n_fs,s_fs)
    raise("Error: sampling frequency not the same")
  
  noisy_signal = get_noisy_signal(signal,noise,snr)
  features = compute_stft_spectrograms(noisy_signal,s_fs,options)
  return features.T


In [None]:
import os
import numpy as np
import csv


# Transform parameters
options_mel = {
    "frame_length" : 32, #ms
    "frame_shift" : 4, #ms
    "window_type" : "hamming",
    "num_mel_bins" : 129,
    "channel" : 0}

options_stft = {
    "frame_length" : 16, #ms
    "frame_shift" : 8, #ms
    "window_type" : "hamming",
    "channel" : 0}

# Paths to data
hc_path = "../../../data/HC/"
pd_path = "../../../data/PD/"
chunks_path = "../../../data/chunks/"

chunk_length = 500 #ms


def divide_audio(feats, options):
    """
    Divides spectrogram into 500 ms chunks

    :param feats: the spectrogram extraction
    :param options: the spectrogram options
    :return: a list of 500 ms chunks of the spectrogram
    """
    nb_frames_in_audio = feats.shape[1]
    nb_frames_per_chunk = int(chunk_length*nb_frames_in_audio / (options["frame_shift"]*(nb_frames_in_audio-1) + options["frame_length"]))
    
    i = 0
    chunks = []
    while ((i+1)*nb_frames_per_chunk < nb_frames_in_audio): # while there still exists another extra 500 ms chunk
        chunks.append(feats[:, i*nb_frames_per_chunk:(i+1)*nb_frames_per_chunk]) # append next chunk of 500 ms
        i += 1
        
    return chunks

def standardize(chunk):
    """
    Standardizes a chunk 

    :param chunk: a chunk of spectrogram
    :return: that chunk, standardized
    """ 
    return (chunk-chunk.mean())/(chunk.std())

def save_as_chunks(o_path, filename, feats, options, spec):
    """
    Saves all standardized chunks from spectrogram to filesystem as csv files

    :param speaker_path: path to speaker folder
    :param filename: name of original audio file
    :param feats: the spectrogram extraction
    :param options: the spectrogram options
    :param spec: the type spectrogram, 'mel' or 'stft'
    """ 
    chunks = divide_audio(feats, options) # Get spectrogram chunks of 500ms
    
    # Name each chunk, standardize and store in filesystem
    for i in range(len(chunks)):
        chunk_filename = f'{spec}_{filename[:-4]}_chunk_{i}.csv' # get rid of '.wav' in original filename
        stand = standardize(chunks[i])
        np.savetxt(f'{o_path}/{chunk_filename}', stand, delimiter=",")



In [None]:
from multiprocessing import Pool

#initial architecture for noisification of the data including randomization. Deprecated

random.seed(10)
def create_one_out(speaker):
  path = os.path.join(noisy_data_path,speaker)
  test_noises, train_noises = get_channels()
  if not os.path.exists(path):
    os.mkdir(path)
    os.mkdir(os.path.join(path,"test"))
    os.mkdir(os.path.join(path,"train"))

  
  for snr in testing_SNR:
    final_dir_m = os.path.join(path,"test", str(snr))
    if not os.path.exists(final_dir_m):
      os.makedirs(final_dir_m)

    for noise in test_noises:
      spec1 = "mel_" + os.path.basename(os.path.dirname(noise))
      for file_ in filenames[speaker]:
        #if not os.path.exists(os.path.join(final_dir_m,f'{spec1}_{os.path.basename(file_)[:-4]}_chunk_{0}.csv')):
          #print("Creating ", os.path.join(final_dir_m,f'{spec1}_{os.path.basename(file_)[:-4]}_chunk_{0}.csv'))
          mel_feats = compute_noisy_mel_f(file_,noise,options_mel,snr)
          save_as_chunks(final_dir_m,os.path.basename(file_),mel_feats,options_mel,spec1)
            


  other_speakers = list(filenames.keys())
  other_speakers.remove(speaker)
  for train_speaker in other_speakers:
    for tr_noise in train_noises:
      snr = 10
      dir_m = os.path.join(path,"train")
      if not os.path.exists(dir_m):
        os.makedirs(dir_m)
    
      spec1 = "mel_" + os.path.basename(os.path.dirname(tr_noise))
    
      for file_ in filenames[train_speaker]:
        
        #if not os.path.exists(os.path.join(dir_m,f'{spec1}_{os.path.basename(file_)[:-4]}_chunk_{0}.csv')):
          #print("Creating mel train chunks")
          mel_feats = compute_noisy_mel_f(file_,tr_noise,options_mel,snr)
          save_as_chunks(dir_m,os.path.basename(file_),mel_feats,options_mel,spec1)
        
      
  print("Finished with left out speaker ", speaker)

      
  
with Pool(8) as p:
  p.map(create_one_out,filenames.keys())


Finished with left out speaker  ID05
Finished with left out speaker  ID09
Finished with left out speaker  ID10
Finished with left out speaker  ID00
Finished with left out speaker  ID36
Finished with left out speaker  ID08
Finished with left out speaker  ID01
Finished with left out speaker  ID15
Finished with left out speaker  ID24
Finished with left out speaker  ID03
Finished with left out speaker  ID02
Finished with left out speaker  ID34
Finished with left out speaker  ID04
Finished with left out speaker  ID30
Finished with left out speaker  ID06
Finished with left out speaker  ID16
Finished with left out speaker  ID20
Finished with left out speaker  ID07


In [None]:
from multiprocessing import Pool

random.seed(10)

train_path = os.path.join(noisy_data_path, "train")
test_path = os.path.join(noisy_data_path, "test")



training_SNR = [10]
testing_SNR = [5, 10, 15]

test_noises_p, train_noises_p = get_channels()
test_noises = [{"noise_path": noise, "out_path": test_path, "SNRs": testing_SNR} for noise in test_noises_p]
train_noises = [{"noise_path": noise, "out_path": train_path, "SNRs": training_SNR} for noise in train_noises_p]

noises = test_noises + train_noises
noises

[{'noise_path': '/content/drive/MyDrive/ML CS433 Project 2/data/demand/DLIVING/ch05.wav',
  'out_path': '/content/drive/MyDrive/ML CS433 Project 2/data/noisy_data/test',
  'SNRs': [5, 10, 15]},
 {'noise_path': '/content/drive/MyDrive/ML CS433 Project 2/data/demand/OOFFICE/ch05.wav',
  'out_path': '/content/drive/MyDrive/ML CS433 Project 2/data/noisy_data/test',
  'SNRs': [5, 10, 15]},
 {'noise_path': '/content/drive/MyDrive/ML CS433 Project 2/data/demand/SPSQUARE/ch05.wav',
  'out_path': '/content/drive/MyDrive/ML CS433 Project 2/data/noisy_data/test',
  'SNRs': [5, 10, 15]},
 {'noise_path': '/content/drive/MyDrive/ML CS433 Project 2/data/demand/DKITCHEN/ch05.wav',
  'out_path': '/content/drive/MyDrive/ML CS433 Project 2/data/noisy_data/train',
  'SNRs': [10]},
 {'noise_path': '/content/drive/MyDrive/ML CS433 Project 2/data/demand/OHALLWAY/ch05.wav',
  'out_path': '/content/drive/MyDrive/ML CS433 Project 2/data/noisy_data/train',
  'SNRs': [10]},
 {'noise_path': '/content/drive/MyDrive

In [None]:
from multiprocessing import Pool


def add_noisy_data(noise):
    spec1 = "mel_" + os.path.basename(os.path.dirname(noise["noise_path"]))
    for snr in noise["SNRs"]:
        out_path = os.path.join(noise["out_path"], str(snr))
        if not os.path.exists(out_path):
            os.mkdir(out_path)
        
        for file_ in filenames.values():
            if not os.path.exists(os.path.join(out_path,f'{spec1}_{os.path.basename(file_)[:-4]}_chunk_{0}.csv')):
                print("Creating chunks for ", file_)
                mel_feats = compute_noisy_mel_f(file_,noise["noise_path"],options_mel,snr)
                save_as_chunks(out_path,os.path.basename(file_),mel_feats,options_mel,spec1)
                
with Pool(2) as p:
    p.map(add_noisy_data,noises)

    

Creating chunks for Creating chunks for  /content/drive/MyDrive/ML CS433 Project 2/data/HC/ID15_hc_0_0_0_north.wav 
/content/drive/MyDrive/ML CS433 Project 2/data/HC/ID08_hc_0_0_0_north.wav
Creating chunks for  /content/drive/MyDrive/ML CS433 Project 2/data/HC/ID05_hc_0_0_0_north.wav
Creating chunks for  /content/drive/MyDrive/ML CS433 Project 2/data/HC/ID03_hc_0_0_0_north.wav
Creating chunks for  /content/drive/MyDrive/ML CS433 Project 2/data/PD/ID30_pd_2_1_1_north.wav
Creating chunks for  /content/drive/MyDrive/ML CS433 Project 2/data/HC/ID10_hc_0_0_0_north.wav
Creating chunks for  /content/drive/MyDrive/ML CS433 Project 2/data/PD/ID06_pd_3_1_1_north.wav
Creating chunks for  /content/drive/MyDrive/ML CS433 Project 2/data/HC/ID00_hc_0_0_0_north.wav
Creating chunks for  /content/drive/MyDrive/ML CS433 Project 2/data/PD/ID34_pd_2_0_0_north.wav
Creating chunks for  /content/drive/MyDrive/ML CS433 Project 2/data/HC/ID05_hc_0_0_0_north.wav
Creating chunks for  /content/drive/MyDrive/ML CS4

In [None]:

os.path.exists(os.path.join(noisy_data_path, "test"))

True

In [None]:
os.path.join(noisy_data_path, "test")

'/home/alex/ml-project-2-masters_of_loss/data/noisy_data/test'

In [None]:
'/home/alex/ml-project-2-masters_of_loss/data/noisy_data/test' == '/home/alex/ml-project-2-masters_of_loss/data/noisy_data/test'

True