<a href="https://colab.research.google.com/github/dshoe17/Deep-SER/blob/master/SER_Audio_Processing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
## Imports
import os, sys
import glob
import numpy as np
import pandas as pd
import librosa
import librosa.display
import matplotlib.pyplot as plt
import IPython.display as ipd
import plotly.express as px
import statsmodels.api as sm
import math
from sklearn.preprocessing import scale
from matplotlib import cm
from PIL import Image

In [0]:
## Function to generate spectrograms from .wav files
def get_spectrogram(wav):
    # D = librosa.stft(wav, n_fft=480, hop_length=160, win_length=480, window='hamming')
    D = librosa.stft(wav, n_fft=480, hop_length=160, win_length=2048, window='hamming')  # Updated parameters
    spect,phase = librosa.magphase(D)
    return spect

In [0]:
## Function to retrieve encoded emotion for given audio .wav file
def get_category(f):
  '''
  This function retrieves the encoded emotional class for a given .wav audio file
  
  Args:
    f (str): the basename of the audio file to decipher

  Returns: 
    router[f[5]] (str): The corresponding emotional label for the given audio file
  '''
    router = {'W':'Anger', 'L':'Boredom', 'E':'Disgust', 'A':'Fear',
          'F':'Happiness', 'T':'Sadness', 'N':'Neutral'}

    return router[f[5]]

In [0]:
## All-encompassing audio graphing function

def graph_audio(f, opt, y=None, sr=None, show=True, shape=None, dest=None, ext=None, verbose=True):
    '''
    This function generates various audio representation graphs for specified .wav files
    (or given audio time series and sampling rate values). It also accepts an optional parameter 
    to save the generated graphs to categorized directories based on the corresponding emotion 
    conveyed in the audio sample.
    
    Args:
        f (str): the absolute path to the input .wav file
        opt (str): the type of audio graph representation to be generated ("spect" => spectrogram,
                   "mp_spect" => mel-power spectrogram, "cqt" => constant-Q transform, "chrom" => chromagram,
                   "mfcc" => MFCC intensity values)
        y (np.ndarray): supplied audio time series; optional
        sr (int): supplied sampling rate of audio time series y; optional
        show (bool): specifies whether or not to show the resulting graph (default is True, which always
                     depicts the resulting graph)
        shape (tuple(int, int)): the dimensions (in inches) of the image to display
        dest (str): if a value is given, this will serve as the path of the root directory to write to (default
                    value is None, which does not save the resulting graph)
        ext (int): if supplied, adds "..._<ext>.png" to saved audio file 
        verbose (bool): specifies whether or not to add axis labels, ticks, and colorbars to resulting plots
                        (default value is True, which adds the aforementioned details)
                        
    Returns: 
        None (function may display a graph and / or save resulting graph file to a specified directory)
    '''
    if sum(map(lambda x: x is None, [y,sr])):
        y, sr = librosa.load(f)
    cmap = cm.get_cmap('viridis')
    
    # Spectrogram
    if opt == 'spect':
        log_spect = np.log(get_spectrogram(y))
        
        if verbose: 
            librosa.display.specshow(log_spect, sr=sr, x_axis='time', y_axis='linear', cmap=cmap)
            plt.colorbar(format='%+2.0f dB')
        else:
            fig, ax = plt.subplots(1)
            fig.subplots_adjust(left=0, right=1, bottom=0, top=1)
            ax.axis('off')
            librosa.display.specshow(log_spect, sr=sr, cmap=cmap)
            plt.axis('off')
                
    # Mel Power Spectrogram
    elif opt == 'mp_spect':
        S = librosa.feature.melspectrogram(y, sr=sr, n_mels=128)
        log_S = librosa.power_to_db(S, ref=np.max)
        
        if verbose:
            librosa.display.specshow(log_S, sr=sr, x_axis='time', y_axis='mel', cmap=cmap)
            plt.colorbar(format='%+2.0f dB')
        else:
            fig, ax = plt.subplots(1)
            fig.subplots_adjust(left=0, right=1, bottom=0, top=1)
            ax.axis('off')
            librosa.display.specshow(log_S, sr=sr, cmap=cmap)
            plt.axis('off')
    
    # Constant-Q Transform
    elif opt == 'cqt':
        C = librosa.cqt(y, sr)
        
        if verbose: 
            librosa.display.specshow(librosa.amplitude_to_db(C**2),
                                     x_axis='time', y_axis='cqt_note', cmap=cmap)
            plt.colorbar(format='%+2.0f dB')
        else:
            fig,ax = plt.subplots(1)
            fig.subplots_adjust(left=0, right=1, bottom=0, top=1)
            ax.axis('off')
            librosa.display.specshow(librosa.amplitude_to_db(C**2), cmap=cmap)
            plt.axis('off')
        
    # Chromagram
    elif opt == 'chrom':
        C = np.abs(librosa.cqt(y, sr))
        chroma = librosa.feature.chroma_cqt(C=C, sr=sr)
        
        if verbose:
            librosa.display.specshow(chroma, x_axis='time', y_axis='chroma', cmap=cmap)
            plt.colorbar()
        else:
            fig,ax = plt.subplots(1)
            fig.subplots_adjust(left=0, right=1, bottom=0, top=1)
            ax.axis('off')
            librosa.display.specshow(chroma, cmap=cmap)
            plt.axis('off')
    
    # MFCC Intensity
    elif opt == 'mfcc':
        raw_mfcc = librosa.feature.mfcc(y=y,sr=sr)
        scaled_mfcc = scaled = scale(raw_mfcc, axis=1)
        
        if verbose:
            librosa.display.specshow(scaled, sr=sr, x_axis='time', cmap=cmap)
            plt.colorbar()
            
        else:
            fig, ax = plt.subplots(1)
            fig.subplots_adjust(left=0, right=1, bottom=0, top=1)
            ax.axis('off')
            librosa.display.specshow(scaled, sr=sr, cmap=cmap)
            plt.axis('off')
    
    if shape:
        fig = plt.gcf()
        dpi = 256
        fig.set_size_inches(*shape)
        
    if show: 
        plt.show()
        
    if dest:
        basename = os.path.basename(f)
        if shape:
            fig.set_size_inches(*shape)
        ext = '_{0:02d}'.format(ext) if ext else ''
        fig.savefig(dest + get_category(basename) + '/' + basename[:-4] + ext + '.png', dpi=256, frameon=False)
        plt.close()

In [0]:
## Mount personal Google Drive to the colab notebook
from google.colab import drive
drive.mount('/content/gdrive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/gdrive


In [0]:
## Unzips audio dataset to the colab session; assumes you have the zipped dataset in
## in the specified directory of your personal Google Drive
zip_path = '/content/gdrive/My Drive/Audio.zip'
!unzip -q '$zip_path'

In [0]:
## Specifies working directory paths
root = '/content/'
path = root + 'Audio/'

In [0]:
## Gets counts of each emotional category in the dataset
from collections import Counter

# asdf = [(get_category(os.path.basename(i)), librosa.get_duration(filename=i)) for i in glob.glob(path + '*.wav')]
asdf = Counter([get_category(os.path.basename(i)) for i in glob.glob(path + '*.wav')])

In [0]:
## Creates list of durations (in s) for every audio clip in every emotion class
from collections import defaultdict

categories = defaultdict(list)

for category, duration in asdf:
    categories[category].append(duration)

In [0]:
## Identifies the factor by which each audio clip in each emotion class should be resampled
{i:500/j for i,j in asdf.items()}

{'Anger': 3.937007874015748,
 'Boredom': 6.172839506172839,
 'Disgust': 10.869565217391305,
 'Fear': 7.246376811594203,
 'Happiness': 7.042253521126761,
 'Neutral': 6.329113924050633,
 'Sadness': 8.064516129032258}

In [0]:
## Procedure to populate resampled audio segments
'''
Incorporates multiprocessing library since it frees memory each time a figure is
saved; improves RAM performance
'''

import warnings
warnings.filterwarnings("ignore")


resample_table = {i:500/len(j) for i,j in categories.items()}

dest = root + 'new_resamples/'

if not os.path.exists(dest):
  os.makedirs(dest)
  for i in categories:
    os.makedirs(dest + i + '/')

for i in categories:
  if not os.path.exists(dest + i + '/'):
    os.makedirs(dest + i + '/')

def new_splits(y, sr, n =5):
    ixs = np.random.randint(0, high=len(y) - sr, size=n)
    splits = [y[i:i+sr] for i in ixs]
    return splits

import multiprocess as mp

def worker():
  for ix, seg in enumerate(splits):
    graph_audio(f, 'mp_spect', y=seg, sr=sr, shape=(251/256,128/256), verbose=False,
                dest=dest, show=False, ext=ix+1)

for i, f in enumerate(glob.glob(path + '*')):
    cat = get_category(os.path.basename(f))
    print(i)
    frac, whole = math.modf(resample_table[cat])
    n_samp = int(whole + np.random.binomial(1,frac))
    y,sr = librosa.load(f)
    splits = new_splits(y,sr,n_samp)
    proc = mp.Process(target=worker)
    proc.daemon=True
    proc.start()
    proc.join()

0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
27

In [0]:
## Verifies that each category has roughly 500 resamples
for sub in glob.glob(dest + '*'):
  print(len(glob.glob(sub + '/*')))

514
498
500
501
502
501
505


In [0]:
## Zip the result to personal Google Drive
!zip -rq '/content/gdrive/My Drive/new_resamples.zip' new_resamples

In [0]:
# import shutil
# shutil.rmtree(dest)

In [0]:
## Optionally download the compressed resamples to local computer
from google.colab import files
files.download('/content/gdrive/My Drive/new_resamples.zip')