In [None]:
import math, random
import torch
import torchaudio
from torchaudio import transforms
from torchvision import transforms as ts

from IPython.display import Audio
import numpy as np
import pandas as pd
import glob
import os
from sklearn.model_selection import StratifiedKFold



class AudioUtil():
  # Load an audio file. Return the signal as a tensor and the sample rate
  # deafult setting for sr = 16000
  @staticmethod
  def open(audio_file):
    sig, sr = torchaudio.load(audio_file)
    return (sig, sr)

  # ----------------------------
  # Convert the given audio to the desired number of channels
  # stereo = 2, new_channel = 2
  # mono = 1, new_channel = 1
  @staticmethod
  def rechannel(aud, new_channel):
    sig, sr = aud

    if (sig.shape[0] == new_channel):
      # Nothing to do
      return aud

    if (new_channel == 1):
      # Convert from stereo to mono by selecting only the first channel
      resig = sig[:1, :]
    else:
      # Convert from mono to stereo by duplicating the first channel
      resig = torch.cat([sig, sig, sig])
    
    return (resig, sr)
  
  # Since Resample applies to a single channel, we resample one channel at a time
  # need to know what is the optimal value for human voice
  # ----------------------------
  @staticmethod
  def resample(aud, newsr):
    sig, sr = aud

    if (sr == newsr):
      # Nothing to do
      return aud

    num_channels = sig.shape[0]
    # Resample first channel
    resig = torchaudio.transforms.Resample(sr, newsr)(sig[:1,:])
    if (num_channels > 1):
      # Resample the second channel and merge both channels
      retwo = torchaudio.transforms.Resample(sr, newsr)(sig[1:,:])
      resig = torch.cat([resig, retwo])

    return ((resig, newsr))
  # ----------------------------
  # Pad (or truncate) the signal to a fixed length 'max_ms' in milliseconds
  # This is for regular procedure.
  # not preferred since no silence included from competition audio files
  # ----------------------------
  @staticmethod
  def pad_trunc(aud, max_ms):
    sig, sr = aud
    num_rows, sig_len = sig.shape
    max_len = sr//1000 * max_ms
    if (sig_len > max_len):
      direction = random.randint(0,1)
      # Truncate the signal to the given length, just cut to the direction.
      if direction == 0: # left
        sig = sig[:,:max_len]
      else: # right
        sig = sig[:,sig_len-max_len:]  
    elif (sig_len < max_len):
      # Length of padding to add at the beginning and end of the signal
      pad_begin_len = random.randint(0, max_len - sig_len)
      pad_end_len = max_len - sig_len - pad_begin_len
      
      # Pad with 0s
      pad_begin = torch.zeros((num_rows, pad_begin_len))
      pad_end = torch.zeros((num_rows, pad_end_len))
      
      sig = torch.cat((pad_begin, sig, pad_end), 1)
      
    return (sig, sr)
  # ----------------------------
  # This applies for our purpose, time shift do not needed for this.
  # ----------------------------
  @staticmethod
  def time_sample(aud, max_ms):
    sig, sr = aud
    num_rows, sig_len = sig.shape
    max_len = sr//1000 * max_ms
    if sig[1].shape[0] > max_len:
      start = np.random.randint(0,sig[1].shape[0]-max_len)
      sam_sig = sig[:,start:start+max_len-1]
      return (sam_sig, sr)
    else:
      con_sig = torch.cat((sig,sig), axis=1)
      while con_sig[1].shape[0] < max_len:
        con_sig = torch.cat((con_sig,sig), axis=1)
      start = np.random.randint(0,con_sig[1].shape[0]-max_len)
      con_sig = con_sig[:, start:start+max_len]
      return (con_sig, sr)  
  ###############################################################################
  # Raw Augmentation
  ###############################################################################
  # ----------------------------
  # noise, flip, others are implemented in torchaudio transformations.   
  # noise_factor < 0.005 is preferred -> heuristic test
  # ----------------------------
  @staticmethod
  def noise(aud, noise_factor):
    sig, sr = aud
    # sampling from gaussian distribution
    noise = np.random.randn((sig.shape[1]))
    num_channels = sig.shape[0]
    noise_data = sig[0] + noise_factor * noise
    if (num_channels > 1):
        noise_data_two =sig[1] + noise_factor*noise
        noise_data_r = torch.stack([noise_data, noise_data_two])
    else:
        noise_data_r = noise_data.reshape(1,-1)  
    
    return (noise_data_r, sr)
  
  @staticmethod
  def flip(aud):
    # if we consider it as speaking out different languages, this might be helpful.
    sig, sr = aud
    flip_sig = np.flip(sig.numpy()).copy()
    
    return torch.from_numpy(flip_sig), sr

  # Spectrogram
  # ----------------------------
  # Generate a Spectrogram , mel_spectrogram + dB
  # ----------------------------
  @staticmethod
  def spectro_gram(aud, n_mels=128, n_fft=1024, win_len = None, hop_len=512):
    sig,sr = aud
    top_db = 80

    # spec has shape [channel, n_mels, time], where channel is mono, stereo etc
    spec = transforms.MelSpectrogram(sr, n_fft=n_fft, win_length = win_len, hop_length=hop_len, center = True, onesided= True, n_mels=n_mels, mel_scale = 'htk')(sig)
        
    # Convert to decibels
    spec = transforms.AmplitudeToDB(top_db=top_db)(spec)
    
    return (spec)
  # ----------------------------
  # Generate MFCC
  # ----------------------------
  @staticmethod
  def mfcc(aud, n_mels=128, n_mfcc=20,n_fft=2048, hop_len =512):
    sig, sr = aud
    
    spec = transforms.MFCC(sr, n_mfcc, melkwargs={'n_fft': n_fft, 'n_mels': n_mels, 'hop_length': hop_len, 'mel_scale': 'htk'})(sig)
    
    return spec
  
  # Spectrogram Augmentation 
  
  # ----------------------------
  # Augment the Spectrogram by masking out some sections of it in both the frequency
  # dimension (ie. horizontal bars) and the time dimension (vertical bars) to prevent
  # overfitting and to help the model generalise better. The masked sections are
  # replaced with the mean value.
  # ----------------------------
  @staticmethod
  def spectro_augment(spec, max_mask_pct=0.1, n_freq_masks=1, n_time_masks=1):
    _, n_mels, n_steps = spec.shape
    mask_value = spec.mean()
    aug_spec = spec
    
    freq_mask_param = max_mask_pct * n_mels
    for _ in range(n_freq_masks):
      aug_spec = transforms.FrequencyMasking(freq_mask_param)(aug_spec, mask_value)

    time_mask_param = max_mask_pct * n_steps
    for _ in range(n_time_masks):
      aug_spec = transforms.TimeMasking(time_mask_param)(aug_spec, mask_value)
    return aug_spec
  
  #@staticmethod
  # suggest rate btw [0.8, 1.2]
 # def time_stretch(spec, rate=0.8,hop_len =512, n_freq=128):
    
  #  stretch = torchaudio.transforms.TimeStretch(hop_length =hop_len, n_freq = n_freq, fixed_rate = rate)
  #  streched = stretch(spec, 1.2)
  #  return streched
  

In [None]:
# one_hot_encoding
def to_one_hot(k, classes_num):
    target = np.zeros(classes_num)
    target[k] = 1
    return target

# csv
def data_to_frame(data_dir='./dataset/train', min_files=5):
    data_path =[]
    data_label = []
    encoded_label = {}
    total_len = len(os.listdir(data_dir))
    
    k=0
    for dirname, _, filenames in os.walk(data_dir):
        label = dirname.split('/')[-1]
        encoded_label[label] = to_one_hot(k,total_len)
        k+=1
    #only for mac
        if filenames[0] == '.DS_Store':
            pass
        else:
            if len(filenames) < min_files:
                a = len(filenames)
                while a != min_files:
                    random_file = filenames[np.random.randint(0,a)]
                    filenames.append(random_file)
                    a+=1
            for filename in filenames:
                data_label.append(label)
                data_path.append(os.path.join(dirname,filename))

# encoded label 어떻게 처릴할지 생각해야함.
# validation 처리
# label은 마지막에 처리
    df = pd.DataFrame({'path' : data_path, 'class' : data_label })
    
    return df, encoded_label



In [None]:
train_data = data_to_frame(min_files=10)
df = train_data[0]
encoded_label = train_data[1]
df.head()

In [None]:
from torch.utils.data import DataLoader, Dataset, random_split
import torchaudio

# ----------------------------
# Sound Dataset
# ----------------------------
class voiceDS(Dataset):
  def __init__(self, df_x, df_y, data_type, encode_label, transform = None):
    self.df_x = df_x
    self.df_y = df_y
    self.data_type = str(data_type)
    self.encoded_label = encoded_label
    # transform
    self.transform = transform
    # hyperparameters
    self.duration = 5000
    self.sr = 44100
    self.channel = 2
        
  # ----------------------------
  # Number of items in dataset
  # ----------------------------
  def __len__(self):
    return len(self.df_x)    
    
  # ----------------------------
  # Get i'th item in dataset
  # ----------------------------
  def __getitem__(self, idx):
    # Absolute file path of the audio file - concatenate the audio directory with
    # the relative path
    audio_file = self.df_x.loc[idx, 'path']
    # Get the Class ID
    class_id = self.df_y.loc[idx, 'class']
    class_id = torch.Tensor(encoded_label[class_id])
    # open the file
    aud = AudioUtil.open(audio_file)
    # preprocessing: resample, rechannel, fix time
    reaud = AudioUtil.resample(aud, self.sr)
    rechan = AudioUtil.rechannel(reaud, self.channel) 
    dur_aud = AudioUtil.time_sample(rechan, self.duration)
    # Convert mel spectrogram -> (num_channels, Mel freq_bands, time_steps) -> later needs to match power of 2.
    sgram = AudioUtil.spectro_gram(dur_aud, n_mels=64, n_fft=1024, hop_len=None)
    if self.data_type == 'train':
    # raw augmentation -> later
    # aug = AudioUtil.noise(aud,0.001)
    # Mel augmentation
        sgram = AudioUtil.spectro_augment(sgram, max_mask_pct=0.1, n_freq_masks=2, n_time_masks=2)
    
    if self.transform:
        sgram = self.transform(sgram)

    return sgram, class_id
