<a href="https://colab.research.google.com/github/PebbleBuilds/acc-class/blob/data_augmenting/Data_Augmentation_Testing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import os 
import librosa
import librosa.display
import numpy as np
import matplotlib.pyplot as plt 
import pylab
import torch
from PIL import Image
import datetime
import math

from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [5]:
class Augment:
    def __init__(self, config):
        self.mfcc_data = []
        self.bands = 128
        self.frames = 128
        self.counts = {
            "english": 0,
            "hindi": 0,
            "mandarin": 0,
            "tagalog": 0,
            "other": 0
        }
        self.n_mfccs = config.get("n_mfccs", 5)
        self.pitch_shift_count = config.get("pitch_shift_count",0) #should be even
        self.gaussian_noise_count = config.get("gaussian_noise_count",0)
        self.gaussian_noise_stdev = config.get("gaussian_noise_stdev",1) #1 dB
        self.save_npys = config.get("save_npys",False)
        self.save_jpgs = config.get("save_jpgs",True)
        self.duration = config.get("duration", 1)
        self.sr = config.get("sr", 22050)
        self.noised_factor = config.get("noised_factor", 0.1)

        self.abs_dir = "/content/drive/My Drive/APS360 Group Project"

        # where the raw audio files are located
        self.audio_files_dir = "audio_split_2"
        self.audio_path = os.path.join(self.abs_dir, self.audio_files_dir)

        self.splits = ("train","validation", "test")
        self.labels = os.listdir(os.path.join(self.audio_path,"train"))

        # where to save the generated dataset
        if 'folder' in config:
          self.name = config['folder']
        else: 
          self.name = self.name_generator()
        self.save_path = os.path.join(self.abs_dir, self.name)

    def create_dataset(self):
        for split in self.splits:
            for label in self.labels:
                sub_dir = ("%s/%s"%(split,label))
                print("Processing %s..."%(sub_dir))
                self.create_mfccs(sub_dir)

    def name_generator(self):
        dt = str(datetime.datetime.now()).split(" ")
        return("mfcc_dur_%s_psc_%d_gnc_%d_std_%d_%s_%s"%(self.duration,self.pitch_shift_count,self.gaussian_noise_count, self.gaussian_noise_stdev, dt[0], dt[1]))

    def pitch_shifter(self, clip, sr):
        boundary = int(self.pitch_shift_count/2)
        pitch_shifted_mfccs = []
        for i in range(-boundary, boundary+1):
            if i != 0:
                shifted_clip = librosa.effects.pitch_shift(clip, sr, i)
                pitch_shifted_mfccs.append(self.get_MFCC(shifted_clip))
        return pitch_shifted_mfccs

    def pitch_shifter_audio(self, clip, sr):
        boundary = int(self.pitch_shift_count/2)
        pitch_shifted_clips = []
        for i in range(-boundary, boundary+1):
            if i != 0:
                shifted_clip = librosa.effects.pitch_shift(clip, sr, i)
                pitch_shifted_mfccs.append(shifted_clip)
        return pitch_shifted_mfccs

    def gaussian_noiser(self, mfcc):
        #np.random.seed(1)
        gaussian_noised_mfccs = []
        for i in range(0, self.gaussian_noise_count):
            noise = np.random.normal(0, self.gaussian_noise_stdev, mfcc.shape)
            noisy_mfcc = noise + mfcc
            gaussian_noised_mfccs.append(noisy_mfcc)
        return gaussian_noised_mfccs

    def gaussian_noise_audio(self, clip):
        RMS=math.sqrt(np.mean(clip**2))*self.noised_factor
        gaussian_noised_clips = []
        for i in range(0, self.gaussian_noise_count):
            noise = np.random.normal(0, RMS, clip.shape[0])
            noisy_audio = noise + clip
            gaussian_noised_clips.append(noisy_audio)
        return gaussian_noised_clips

    def get_MFCC(self,speech):
        mfcc = librosa.feature.mfcc(speech, n_mfcc = self.n_mfccs) 
        return mfcc

    def resize_mfcc(self,mfcc):
        resized_mfcc = librosa.util.fix_length(mfcc, self.bands, axis=1)
        resized_mfcc = np.vstack((np.zeros((0, self.bands)), resized_mfcc))
        return resized_mfcc
        
    def save_data(self,data,sub_dir, name_only):
                
        if self.save_npys: # deprecated, but still usable
            new_filename_npy = name_only + '.npy'
            path_npy = [self.save_path,sub_dir,new_filename_npy]
            save_path_npy = os.path.join(*path_npy)
            try:
                np.save(save_path_npy,data)
            except FileNotFoundError:
                save_dir = os.path.join(self.save_path, sub_dir)
                os.makedirs(save_dir)
                np.save(save_path_npy,data)

        if self.save_jpgs:
            new_filename_jpg = name_only + '.jpg'
            path_jpg = [self.save_path,sub_dir,new_filename_jpg]
            save_path_jpg = os.path.join(*path_jpg)
            save_dir_path_jpg = os.path.join(self.save_path, sub_dir)
            pylab.axis('off')
            pylab.axes([0., 0., 1., 1.], frameon=False, xticks=[], yticks=[])
            librosa.display.specshow(data)
            try:
                pylab.savefig(save_path_jpg, bbox_inches=None, pad_inches=0)
                pylab.close()
            except FileNotFoundError:
                save_dir = os.path.join(self.save_path, sub_dir)
                os.makedirs(save_dir)
                pylab.savefig(save_path_jpg, bbox_inches=None, pad_inches=0)
                pylab.close()
            print("Saved %s"%save_path_jpg)

        return True 
    
    def uniform_clip_split(self, sub_dir, file_name):
        path = [self.audio_path,sub_dir,file_name] 
        file_path = os.path.join(*path)
        print(file_path)

        # check if this is train, val, or test
        split, language = sub_dir.split("/")

        name_only = file_name.split(".mp3")[0]
        speech,self.sr = librosa.load(file_path)
        num_frames = int(self.duration*self.sr)
        for i in range(0, int(len(speech)/num_frames)):
            if language in self.counts.keys():
                num = self.counts[language]
                self.counts[language] += 1
            else:
                num = self.counts["other"]
                self.counts["other"] += 1 

            start = i*num_frames
            end = start + num_frames 
            if (len(speech[start:end]) == num_frames):
                clip = speech[start:end]
                clip_name = name_only + "_clip" + str(num)
                # generate and save unaltered MFCC
                raw_mfcc = self.get_MFCC(clip)
                print("Shape:",raw_mfcc.shape)
                cnt = 0
                self.save_data(raw_mfcc, sub_dir, clip_name)
                mfcc = raw_mfcc
                if split == "train":
                    # generate and save noisy, unshifted MFCCs (only for training)
                    noisy_clips = self.gaussian_noise_audio(clip)
                    for i, noisy_clip in enumerate(noisy_clips):
                        noisy_mfcc = self.get_MFCC(noisy_clip)
                        self.save_data(noisy_mfcc,sub_dir,(clip_name+"_noisy%d"%(i)+"_{}".format(cnt)))
                        print(i)

                        # generate and save pitch-shifted MFCCs - both noisy and non-noisy
                        shifted_clips = self.pitch_shifter(clip,self.sr)
                        lowest_shift = -1*int(self.pitch_shift_count/2)
                        for i, shifted_clip in enumerate(shifted_clips):
                            shift = lowest_shift + i
                            if shift >= 0:
                                shift += 1
                            shifted_name = clip_name+("_shifted%d"%(shift)) + "_{}".format(cnt)
                            shifted_mfcc = self.get_MFCC(shifted_clip)
                            self.save_data(shifted_mfcc,sub_dir,shifted_name)

                            noisy_shifted_clips = self.gaussian_noiser(shifted_clip)
                            for j, noisy_shifted_clip in enumerate(noisy_shifted_clips):
                                noisy_shifted_name = shifted_name + ("_noisy%d"%j) + "_{}".format(cnt)
                                noisy_shifted_mfcc = self.get_MFCC(noisy_shifted_clip)
                                self.save_data(noisy_shifted_mfcc,  sub_dir, noisy_shifted_name)

    def create_mfccs(self, sub_dir):
        for file_name in os.listdir(os.path.join(self.audio_path,sub_dir)):
            print(file_name)
            self.uniform_clip_split(sub_dir,file_name)
        return True

In [6]:
config = {
    "pitch_shift_count": 4,
    "gaussian_noise_count": 2,
    "gaussian_noise_stdev": 0,
    "save_jpgs": True,
    "save_npys": False,
    "duration": 10,
    "sr": 22050,
    "n_mfccs": 13,
    "folder": 'asdf',
    "noised_factor": 0
}

data = Augment(config)

data.create_dataset()

Processing train/india...
hindi10.mp3
/content/drive/My Drive/APS360 Group Project/audio_split_2/train/india/hindi10.mp3
Shape: (13, 431)
Saved /content/drive/My Drive/APS360 Group Project/asdf/train/india/hindi10_clip0.jpg
Shape: (13, 431)
Saved /content/drive/My Drive/APS360 Group Project/asdf/train/india/hindi10_clip1.jpg
hindi3.mp3
/content/drive/My Drive/APS360 Group Project/audio_split_2/train/india/hindi3.mp3
Shape: (13, 431)
Saved /content/drive/My Drive/APS360 Group Project/asdf/train/india/hindi3_clip2.jpg
Shape: (13, 431)
Saved /content/drive/My Drive/APS360 Group Project/asdf/train/india/hindi3_clip3.jpg
bengali14.mp3
/content/drive/My Drive/APS360 Group Project/audio_split_2/train/india/bengali14.mp3


KeyboardInterrupt: ignored

In [None]:
%%shell

pwd
!cd "drive/My Drive/APS360 Group Project"
!pwd