# Assignment #2 Sound Detection

Test  and able to edit the notebook! Looks good - Will LaForge

In [5]:
# mount the local google drive for access to the collected data
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [6]:
# root path for the collected data
root_dir='./drive/MyDrive/Colab Notebooks/COSC522/Assignment2/collected_data'

## Read in raw data files

In [397]:
from pandas.core.dtypes.common import classes_and_not_datetimelike
'''
Find the collected samples by class and store in lists related to each class
classes_data is a dictonary of the numpy arrays of data where the class is the key and the arrays are the data
'''
import os
import librosa
import scipy.io.wavfile as wavfile
import numpy as np

classes_data=[]
raw_micro_data=[]
raw_blender_data=[]
raw_music_data=[]
raw_siren_data=[]
raw_vaccum_data=[]
raw_control_data=[]

classes=['microwave','blender','music','siren','vaccum_cleaner', 'control']

# dict to capture raw data by class
classes_data = {
    'microwave':raw_micro_data, 
    'blender':raw_blender_data, 
    'music':raw_music_data, 
    'siren':raw_siren_data, 
    'vaccum_cleaner':raw_vaccum_data, 
    'control': raw_control_data
    }
class_dict={
    'microwave':1, 
    'blender':2, 
    'music':3, 
    'siren':4, 
    'vaccum_cleaner':5, 
    'control': 0
}

# iterate through classes (also the folder structure where raw data is stored)
for cls in classes:
  file_path = f"{root_dir}/{cls}/"
  # iterate through each file in the "class" folder
  for file in os.listdir(file_path):
    #fs,y = wavfile.read(file_path+file)
    y,fs=librosa.load(file_path+file,sr=None,dtype=np.float64)
    # add this files data to the raw data of this class
    classes_data[cls].append(y)

# convert lists back to numpy arrays
for cls in classes_data:
  for data in classes_data:
    data=np.array(data)
 
 
  classes_data[cls]=np.array(classes_data[cls]) 

print(fs)

44100




In [541]:
[{x:len(classes_data[x])} for x in classes_data]

[{'microwave': 5},
 {'blender': 5},
 {'music': 0},
 {'siren': 0},
 {'vaccum_cleaner': 5},
 {'control': 10}]

In [542]:
#len(classes_data['microwave'][0])

# Pre-processing
___

In [543]:
'''
FFT 
'''
# FFT example from Dr. Sai
from scipy import signal

def FFT(audio,fs,FFT_SIZE=1024):
  f,t,ppx = signal.spectrogram(audio, nperseg=FFT_SIZE, fs=fs) #noverlap=FFT_SIZE/2

  return ppx

In [544]:
# # FFT. Might need to use this to get a 2-D array
# from scipy.fftpack import fft

# def fft_method(audio, sampling_rate):
#     # variables
#     T = 1/sampling_rate
#     N = len(audio)
#     max_val = 1.0/(2.0*T) # max frequency spectrum of the FFT.
#     num_vals = N//2  
    
#     # calculate fft
#     yf_all = fft(audio)
    
#     xf = np.linspace(0.0, max_val, num_vals)
#     yf = 2.0/N * np.abs(yf_all[0:num_vals])
    
#     return xf, yf

In [545]:
# xf,yf=fft_method(classes_data['microwave'][0], fs)

# print(xf,yf)

# Data Analysis Pipeline
___

## 1. Feature Engineering



### 1.1  Binning

In [546]:
import cv2

# function for binning data 
def bin_data(ppx,fs,num_time_bins,num_freq_bins):
  p = FFT(ppx,fs)
  return cv2.resize(p[:,:],(num_time_bins,num_freq_bins))

### 1.2 Extracting domain-specific features

In [547]:
'''
Functions for extracting domain-specific features
'''
import numpy as np

def microwave_hum(audio):
  # looking for the "hum" of the microwave between the 1500 and 3000 ms timestamp 
  # the microwave hum in testing prodcued data points that had a median of 28.721306
  ppx=FFT(audio,fs)

  # if np.median(ppx[:40,1500:3000]) >= 60 and np.median(ppx[:40,1500:3000]) <=100:
  #   return 1
  # else:
  #   return 0
  return np.median(ppx[:50,1000:2500])

def binned_microwave_hum(audio):
  # Open CV's resize takes (columns,rows) as the input for desired size
  ppx=FFT(audio,fs)

  resized_pxx=bin_data(ppx,fs,10,10)

  return np.median(resized_pxx[0:3,9:10])

def pitches(audio,fs):

  pitches, magnitudes = librosa.piptrack(y=audio, sr=fs, n_fft=1024)

  return np.mean(pitches)

### 1.3 Windowing Data

In [548]:
'''
Function to truncate audio files to same exact length
--must truncate the audio signal length for scaling and normalizaiton to work
--this function trims seconds from the begining of the file
@parameters:
--audio: raw audio file
--fs:  sampling rate
--max_len: the maximum in seconds that you wish the file to be <default=30 seconds>
returns:
--truncated signal to desired length
'''

def trunc_audio(audio, fs, max_len=30):

  sig_len=len(audio)/fs
  max_len=max_len

  if (sig_len > max_len):
    
    diff=int(round((sig_len-max_len)*fs,0))
      # Truncate the signal to the given length
    sig = audio[diff:]
      
    return sig


In [549]:

'''
Windowing and non-windowing
'''
from skimage.measure import block_reduce

# overall average of the entire recording
def no_win_average(audio):
  return np.average(audio)

# calculate an average of a windowed size of data
def windowed_average(audio, window_size=10, fs=fs):
  window_size = window_size * fs
  
  # code for selecting a subset of the entire data set
  limited_aud=trunc_audio(audio,fs) 
  avg_every_n_window = np.add.reduceat(limited_aud, np.arange(0, len(limited_aud), window_size))
  
  return avg_every_n_window



In [550]:
'''
Function for extracting features
--takes raw audio and extracts different features
'''

def featurize_input(audio):
  stft=np.abs(librosa.stft(audio))

  fv=[]
  # features go here
  fv.append(microwave_hum(audio))
  fv.append(binned_microwave_hum(audio))
  fv.append(no_win_average(audio))
  fv.extend(windowed_average(audio, 5, fs))
  # zero crossing rate -- screwed up the normalizaiton, scaling and model fit 
  #fv.extend(np.mean(librosa.feature.zero_crossing_rate(audio, frame_length=1048)))
  # mel spectrogram
  fv.extend(np.mean(librosa.feature.melspectrogram(audio, sr=fs).T,axis=0))
  # pitches of the audio
  #fv.extend(pitches(audio,fs))

  fv.extend(np.mean(librosa.feature.chroma_stft(S=stft, sr=fs).T,axis=0))

  fv=np.array(fv)

  return fv 

In [None]:

features=[]

for cls in classes_data:
  if len(classes_data[cls])>0:

    for x in classes_data[cls]:
      features.append([featurize_input(x),class_dict[cls]])
        
features=np.array(features) 

#features

In [None]:
import pandas as pd

comb_data=pd.DataFrame(features,columns=['features','labels'])
comb_data=comb_data.fillna(0)
comb_data.head()

In [None]:
# extract data and labels from dataframe
data=np.array(comb_data['features'].tolist())
labels=np.array(comb_data['labels'].tolist())

In [None]:
print(data.shape)
print(labels.shape)

labels=np.array(labels)
data[20]

## 2. Feature Normalization / Visualization

In [None]:
import seaborn as sns

for i in range(data.shape[1]):
  sns.kdeplot(data[:,i])

In [None]:
from sklearn.preprocessing import RobustScaler, MinMaxScaler

#scaler = RobustScaler()
scaler = MinMaxScaler()

data = scaler.fit_transform(data)
for i in range(data.shape[1]):
    sns.kdeplot(data[:,i])

## 3. ML Models for classification
---

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from imblearn.over_sampling import SMOTE
from sklearn.model_selection import KFold, cross_val_score
from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report, accuracy_score

# spliting the data into train and test data sets
x_train, xtest, y_train, ytest = train_test_split(data, labels, test_size=0.30, random_state=42)

# oversampling the test set here just before training the model
#sm = SMOTE(random_state=42)
#x_train,y_train=sm.fit_resample(x_train,y_train)
#print(f"Resampled Test Data {len(x_train)}, Labels {len(y_train)}", end='\n\n')

xtrain,ytrain=x_train,y_train

#training the model
clf = SVC(kernel='rbf') 
# clf=KNeighborsClassifier(n_neighbors=3)

clf.fit(xtrain, ytrain)
cv_scores = cross_val_score(clf, xtrain, ytrain, cv=3)
print(f'Average Cross Validation Score from Training:  {cv_scores.mean()}', sep='\n', end='\n')
print(f'Average Cross Validation Score STD from Training:  {cv_scores.std()}', sep='\n', end='\n\n\n')

#testing the model
ypred = clf.predict(xtest)
# cm = confusion_matrix(ytest, ypred) #using a crosstab table to display more info about the predictions
cm = pd.crosstab(ytest.ravel(), ypred.ravel(), rownames = ['True'], colnames = ['Predicted'], margins = True)
cr = classification_report(ytest, ypred)

print('Confusion Matrix:', cm, sep='\n', end='\n\n\n')
print('Test Statistics:', cr, sep='\n', end='\n\n\n')

#This is what we will be grading (>95 expected)
print('Testing Accuracy:', accuracy_score(ytest, ypred), end='\n\n\n')