# Phonem Feature Extraction Detection 

This code have the purpose of perfomer phonem detection out of a voice recorder. 

In [None]:
!pip install hunga_bunga

In [1]:
import warnings

warnings.filterwarnings('ignore')

In [2]:
#Imports
import math
import librosa
import IPython.display as ipd

import os
import sklearn
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score
from sklearn.neighbors import KNeighborsClassifier

import numpy as np
import matplotlib.pyplot as plt

from numpy import sin, cos, pi, linspace, arange, log10, absolute
from numpy.random import randn
from scipy.signal import lfilter, lfilter_zi, filtfilt, butter, freqz, welch
from scipy.io.wavfile import read , write
from scipy.signal import spectrogram

# Load audio file and present their information

In [None]:
#Load one specific audio

filename = 'A_lata_01.wav'
directory = 'C:/Users/Dasil/1. Processamento Digital do Sinal/Project/word Detection/training/' + filename

fs, audio1 = read(directory)
f,t,S1 = spectrogram(audio1, fs, window='flattop', nperseg=fs//10, noverlap=fs//20, scaling='spectrum', mode='magnitude')

#print information about the audio
print('filename: ', filename)
print('Data Length (s): ',t[-1])
print('Sampling frequency (samples/s): ', fs)

#Playing Audio (Reproduce Audio)
ipd.Audio(filename)

# Functions


In [13]:
#Cepstrum and Power Spectrum

def real_cepstrum(x, n=None):
# Compute the real cepstrum of a real sequence
#    x : ndarray
#        Real sequence to compute real cepstrum of.
#    n : {None, int}, optional
#        Length of the Fourier transform.
#    Returns
#    -------
#    ceps: ndarray
#        The real cepstrum.

    spectrum = np.abs(np.fft.fft(x, n=n))**2
    ceps = np.fft.ifft(np.log(spectrum))

    return ceps, spectrum

In [14]:
#Zero Crossing Rate

def ZCR(samples, frameSize, overlap):
    wlen = len(samples)
    step = frameSize - overlap
    frameNum = math.ceil(wlen/step)
    zcr = np.zeros((frameNum,1))
    for i in range(frameNum):
        curFrame = samples[np.arange(i*step,min(i*step+frameSize,wlen))]
        #To avoid DC bias, usually we need to perform mean substraction on each frame
        curFrame = curFrame - np.mean(curFrame) #Zero-Justified
        zcr[i] = sum(curFrame[0:-1]*curFrame[1::]<=0)
    return zcr

In [None]:
#Filtro ButterWord

wn = 1500/(fs/2)
b3,a3 = butter(4, wn)
#Apply the filter
audio1_filt = lfilter(b3,a3,audio1)

# First Audio Segmentation

Separate the word from the rest of the data

In [None]:
#Apply first threshold
results = [1 if item > 500 else 0 for item in audio1_filt]

# Variables
x1 = x2 = 0
flag = False
chunks = []
coordinates = []
zeros_list = []
silence_len = 10000
min_len = 10000

plt.plot(results)
for i, result in enumerate(results):
    # First signal rise
    if(flag == False and result == 1 and x1 == 0):
        flag = True
        x1 = i
    # Check if fall is permanent
    if(flag == True):
        if(result == 0):
            zeros_list.append(result)
        else:
            zeros_list = []
    # Signal fall
    if(flag == True and len(zeros_list) >= silence_len and x2 == 0):
        flag = False
        x2 = i - len(zeros_list)
        # If data is valid, save it in list
        if(not x2-x1 <= min_len):
            chunks.append(results[x1:x2+1])
            coordinates.append([x1, x2])
            plt.plot([x1, x1], [0, 1], [x2, x2], [0, 1], marker='o')
        # Reset to initial state
        x1 = x2 = 0

print(f'# of chunks detected: {len(chunks)}')

pt1 = coordinates[0]
try:
    pt2 = coordinates[1]
    div_ptn = ((pt2[0] - pt1[1]) / 2) + pt1[1]
except:
    div_ptn = ((pt1[1] + pt1[0]) / 2)
    
# Draw dividing line
plt.plot([div_ptn, div_ptn], [0, 1], marker='o', color="black")
print(pt1[0], pt1[1])
print(pt1[1] - pt1[0])

#Make segmentation point integer
div_ptn = round(div_ptn)

# Fix-sized segmentation (breaks a signal into non-overlapping segments)
signal = audio1 / (2**15)
signal_len = len(signal)
segment_size_t = 1 # segment size in seconds
segment_size = segment_size_t * fs  # segment size in samples

# Break signal into list of segments in a single-line Python code
segment1 = audio1[:div_ptn]
segment2 = audio1[div_ptn:]
segments = [segment1, segment2]

# Find out the number of Segments
n = len(segments)

# Acess the first chunk of the audio
samples = segments[0]
samples = np.array(samples)

# Process each chunk
# Save each segment in a seperate filename
for iS, s in enumerate(segments):
    write('Word Detection/PD_data/'+filename+'_{0:d}.wav'.format(segment_size_t * iS, segment_size_t * (iS + 1)), fs, (s))



# Word Selection for processing

In [None]:
#Load one specific word

filename = 'A_lata_01'
directory = 'C:/Users/Dasil/1. Processamento Digital do Sinal/Project/word Detection/training/' + filename + '_1.wav'

fs, word = read(directory)
f,t,S1 = spectrogram(word, fs, window='flattop', nperseg=fs//10, noverlap=fs//20, scaling='spectrum', mode='magnitude')

#print information about the word
print('filename: ', filename)
print('Data Length (s): ',t[-1])
print('Sampling frequency (samples/s): ', fs)

#Playing word (Reproduce word)
ipd.Audio(filename)

# Spectrogram 

In [None]:
#Graphic Spectrogram 

plt.rcParams['figure.figsize'] = 14,5
plt.pcolormesh(t, f[:450], S1[:450][:])
plt.title("Spectrogram")
plt.xlabel('time(s)')
plt.ylabel('frequency(Hz)')

In [None]:
#Filtro ButterWord

wn = 1500/(fs/2)
b3,a3 = butter(4, wn)
#Apply the filter
word_filt = lfilter(b3,a3,word)

plt.plot(word_filt)

# Show Word Filtered

In [None]:
#Plot word Filtered and only positive part

plt.rcParams['figure.figsize'] = 16,5
#plot(audio1,'r')
plt.plot(np.absolute(word_filt))

# Segment Audio

Do the first segmentation in order to isolate the desired word

In [None]:
#Apply Threshold for the word phonems

results = [1 if item > 0.001*1e8 else 0 for item in audio1_filt**2]
plt.plot(results)

In [None]:
# Variables
x1 = x2 = 0
flag = False
chunks = []
coordinates = []
zeros_list = []
silence_len = 2000
min_len = 5000

plt.plot(results)
for i, result in enumerate(results):
    # First signal rise
    if(flag == False and result == 1 and x1 == 0):
        flag = True
        x1 = i
    # Check if fall is permanent
    if(flag == True):
        if(result == 0):
            zeros_list.append(result)
        else:
            zeros_list = []
    # Signal fall
    if(flag == True and len(zeros_list) >= silence_len and x2 == 0):
        flag = False
        x2 = i - len(zeros_list)
        # If data is valid, save it in list
        if(not x2-x1 <= min_len):
            chunks.append(results[x1:x2+1])
            coordinates.append([x1, x2])
            plt.plot([x1, x1], [0, 1], [x2, x2], [0, 1], marker='o')
        # Reset to initial state
        x1 = x2 = 0

print(f'# of chunks detected: {len(chunks)}')

pt1 = coordinates[0]
try:
    pt2 = coordinates[1]
    div_ptn = ((pt2[0] - pt1[1]) / 2) + pt1[1]
except:
    div_ptn = ((pt1[1] + pt1[0]) / 2)
    
# Draw dividing line
plt.plot([div_ptn, div_ptn], [0, 1], marker='o', color="black")
print(pt1[0], pt1[1])
print(pt1[1] - pt1[0])

# Split Word

Split the word in two phonemes

In [None]:
#Make segmentation point integer
div_ptn = round(div_ptn)

# Fix-sized segmentation (breaks a signal into non-overlapping segments)
signal = audio1 / (2**15)
signal_len = len(signal)
segment_size_t = 1 # segment size in seconds
segment_size = segment_size_t * fs  # segment size in samples

# Break signal into list of segments in a single-line Python code
segment1 = audio1[:div_ptn]
segment2 = audio1[div_ptn:]
segments = [segment1, segment2]

# Find out the number of Segments
n = len(segments)

# Acess the first chunk of the audio
samples = segments[0]
samples = np.array(samples)

# Process each chunk
# Save each segment in a seperate filename
for iS, s in enumerate(segments):
    write('Word Detection/WD_data/'+filename+'_{0:d}.wav'.format(segment_size_t * iS, segment_size_t * (iS + 1)), fs, (s))


# MFCC's

In [None]:
signal = segments[0] #First phoneme 

#Play Audio
ipd.Audio(filename)

#Calculate MFCC'S
mfccs = librosa.feature.mfcc(signal, sr=fs, n_mfcc=42, n_fft = 1024, hop_length = 50,
                                n_mels = 1000,  fmin = 10, fmax = 4000)

#Normalize MFCC's Results (Y axis)
mfccs = sklearn.preprocessing.scale(mfccs, axis=1)

In [None]:
#Visualize MFCC 

plt.figure(figsize=(14,5))
sr = fs
librosa.display.specshow(mfccs[1:],x_axis='time',sr=sr)
plt.colorbar(format="%+2f")
plt.show()

# Training and Classificator

## Read and calculate MFCC for all segmented phonemes

In [31]:
def get_mfccs(signal, fs):
    
    #Calculate MFCCs
    mfccs = librosa.feature.mfcc(signal, sr=fs, n_mfcc=200, n_fft = 1024, hop_length = 50, n_mels = 1000,  fmin = 10, fmax = 4000)
    
    # Calculate average
    #mfccs = np.average(mfccs, axis=1)
    mfccs = np.average(np.average(mfccs, axis=1))
    # Make it into a one-dimensional array
    mfccs = mfccs.flatten()
    mfccs = mfccs.tolist()
    return mfccs
    

In [None]:
#directory = 'C:/Users/lemos/PDS/TP/Word Detection/WD_data/'
directory = 'C:/Users/Dasil/1. Processamento Digital do Sinal/Project/Word Detection/WD_data/'

hope = True
mfcc_data = []
phoneme_list = []

for file in os.listdir(directory):
    
    # Get audio file
    filename = os.path.join(directory, file)
    
    # load Audio for Signal
    signal, fs = librosa.load(filename)
    
    # Get mfccs from audio
    mfccs = get_mfccs(signal, fs)
    mfccs.insert(1,mfccs[0])
    #mfccs = mfccs[:18]

    if(hope):
        
        #Calculate Cepstrung and Power Spectrum
        ceps, spec = real_cepstrum(signal, n=None)
        N = signal.shape[0]

        #Power Specturm
        power_spec = np.abs(spec[:N//2])**2
        index = np.where(power_spec == np.max(power_spec))
        mfccs.append(index[0][0])
        #mfccs.insert(1, mfccs[0])

        #Calculate Zero Cross Rate
        frameSize = 256
        overlap = 0
        zcr = ZCR(signal, frameSize, overlap)
        mfccs.append(np.max(zcr))

        #Calculate Cepstrum
        abs_ceps = np.abs(ceps[:N//2])**2
        mfccs.append(round(np.max(abs_ceps)))
    else:
        # Remove the first coeficient
        mfccs.pop(0)

        # Delete the features of some mfcc coeficients because they are not needed.
        #mfccs = mfccs[:12]
        #mfccs = mfccs[25:]
        #mfccs = mfccs[:18]
    
    # Insert label
    file = file.replace('.wav', '')
    file_parts = file.split('_')
    word = file_parts[1]
    if (len(word) > 4):
        if(file_parts[-1] == '0'):
            phoneme = word[0:3]
        else:
            phoneme = word[3:]
    else:
        if(file_parts[-1] == '0'):
            phoneme = word[0:2]
        else:
            phoneme = word[2:]
    phoneme_list.append(phoneme)
    mfccs.insert(0, phoneme)
    mfcc_data.append(mfccs) 

#Calculate Cepstrung and Power Spectrum
ceps, spec = real_cepstrum(signal, n=None)
N = signal.shape[0]

#Power Specturm
power_spec = np.abs(spec[:N//2])**2
mfccs = power_spec
    
    
# Remove duplicates
phoneme_list = list(dict.fromkeys(phoneme_list))
print(f'Phonemes: {phoneme_list}')
print(f'# of phonemes: {len(phoneme_list)}')

# Loading the Dataset

In [34]:
# Dataset loading
df = pd.DataFrame(mfcc_data)

x = df.iloc[:,1:] # MFCC features
y = df.iloc[:,0] # Phoneme label

# Label changed to number once
label = set(y)
label_list = list(label)
label_list.sort()
for i in range(len(label_list)):
    y[y == label_list[i]] = i
y = np.array(y, dtype = "int")

# Preview dataframe
df

Unnamed: 0,0,1,2,3,4
0,0,-7.001454,391,77.0,6
1,6,-9.812004,456,68.0,61
2,0,-7.047333,326,70.0,4
3,6,-7.520754,447,43.0,10
4,0,-7.977703,845,152.0,2
...,...,...,...,...,...
223,6,-7.281468,438,34.0,8
224,7,-7.363895,232,111.0,2
225,9,-7.302172,478,102.0,4
226,7,-7.458501,246,105.0,2


# Train the classifier

In [35]:
# Determine the train data and test data
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size = 0.3, random_state = 1)
print(f'Train: {x_train.shape}\nTest: {x_test.shape}\n')

# Data standardization
sc = StandardScaler()
sc.fit(x_train)
x_train_std = sc.transform(x_train)
x_test_std = sc.transform(x_test)

# Instantiate an SVM
model_linear = SVC(kernel='linear', random_state=1)
model_poly = SVC(kernel='poly', random_state= 1)
model_rbf = SVC(kernel='rbf', random_state=1)
model_knn = KNeighborsClassifier(n_neighbors=5)
# Fit models
model_linear.fit(x_train_std, y_train)
model_poly.fit(x_train_std, y_train)
model_rbf.fit(x_train_std, y_train)
model_knn.fit(x_train_std, y_train)

# ===== Train data =====
# Predictions
pred_linear_train = model_linear.predict(x_train_std)
pred_poly_train = model_poly.predict(x_train_std)
pred_rbf_train = model_rbf.predict(x_train_std)
pred_knn_train = model_knn.predict(x_train_std)
# Accuracy scores
accuracy_linear_train = accuracy_score(y_train, pred_linear_train)
accuracy_poly_train = accuracy_score(y_train, pred_poly_train)
accuracy_rbf_train = accuracy_score(y_train, pred_rbf_train)
accuracy_knn_train = accuracy_score(y_train, pred_knn_train)

#print('Train result')
#print(f'Linear : {int(accuracy_linear_train*100)}%')
#print(f'Poly : {int(accuracy_poly_train*100)}%')
#print(f'RBF : {int(accuracy_rbf_train*100)}%')
#print("-" * 15)

# ===== Test data =====
# Predictions
pred_linear_test = model_linear.predict(x_test_std)
pred_poly_test = model_poly.predict(x_test_std)
pred_rbf_test = model_rbf.predict(x_test_std)
pred_knn_test = model_knn.predict(x_test_std)
# Accuracy scores
accuracy_linear_test = accuracy_score(y_test, pred_linear_test)
accuracy_poly_test = accuracy_score(y_test, pred_poly_test)
accuracy_rbf_test = accuracy_score(y_test, pred_rbf_test)
accuracy_knn_test = accuracy_score(y_test, pred_knn_test)

#print('Test result')
#print(f'Linear : {int(accuracy_linear_test*100)}%')
#print(f'Poly : {int(accuracy_poly_test*100)}%')
#print(f'RBF : {int(accuracy_rbf_test*100)}%')

# Create dataframe
data = {'Train result':[accuracy_linear_train*100, accuracy_poly_train*100, accuracy_rbf_train*100, accuracy_knn_train*100],
        'Test result': [accuracy_linear_test*100, accuracy_poly_test*100, accuracy_rbf_test*100, accuracy_knn_test*100]}
df_results = pd.DataFrame(data)
# Change the row indexes
df_results.index = ['Linear', 'Poly', 'RBF', 'KNN']
df_results.round(decimals=2)

Train: (159, 4)
Test: (69, 4)



Unnamed: 0,Train result,Test result
Linear,33.33,37.68
Poly,34.59,30.43
RBF,38.36,36.23
KNN,52.2,33.33


In [37]:
filename = 'C:/Users/Dasil/1. Processamento Digital do Sinal/Project/word Detection/WD_data/A_chuta_02_0.wav' 

# load Audio for Signal
signal, fs = librosa.load(filename)

#Calculate MFCCs
mfccs = get_mfccs(signal, fs)

#mfccs.pop(0)

#mfccs = mfccs[:18]

#Calculate Cepstrung and Power Spectrum
ceps, spec = real_cepstrum(signal, n=None)
N = signal.shape[0]

#Power Specturm
power_spec = np.abs(spec[:N//2])**2
index = np.where(power_spec == np.max(power_spec))
mfccs.append(index[0][0])

#Calculate Zero Cross Rate
frameSize = 256
overlap = 0
zcr = ZCR(signal, frameSize, overlap)
mfccs.append(np.max(zcr))

#Calculate Cepstrum
abs_ceps = np.abs(ceps[:N//2])**2
mfccs.append(round(np.max(abs_ceps)))
    

df = pd.DataFrame(mfccs).T
#df

#Data standardization
sc = StandardScaler()
sc.fit(df)
x_std = sc.transform(df)

pred_linear_test = model_linear.predict(x_std)
pred_poly_test = model_poly.predict(x_std)
pred_rbf_test = model_rbf.predict(x_std)
pred_knn_test = model_knn.predict(x_std)

data = {'Linear':[phoneme_list[int(pred_linear_test)]],
        'Poly':[phoneme_list[int(pred_poly_test)]],
        'RBF': [phoneme_list[int(pred_rbf_test)]],
        'KNN': [phoneme_list[int(pred_knn_test)]]}

df_models = pd.DataFrame(data) 
df_models

Unnamed: 0,Linear,Poly,RBF,KNN
0,ca,ca,ca,ca


In [None]:
#Data standardization
sc = StandardScaler()
sc.fit(df)
x_std = sc.transform(df)

pred_linear_test = model_linear.predict(x_std)
pred_poly_test = model_poly.predict(x_std)
pred_rbf_test = model_rbf.predict(x_std)
pred_knn_test = model_knn.predict(x_std)


data = {'Linear':[phoneme_list[int(pred_linear_test)]],
        'Poly':[phoneme_list[int(pred_poly_test)]],
        'RBF': [phoneme_list[int(pred_rbf_test)]],
        'KNN': [phoneme_list[int(pred_knn_test)]]}

df_models = pd.DataFrame(data) 
df_models
#print(phoneme_list[int(pred_linear_test)])
#print(phoneme_list[int(pred_poly_test)])
#print(phoneme_list[int(pred_rbf_test)])

In [None]:
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline
from sklearn.model_selection import GridSearchCV

models = {
        'LogisticRegression'     : LogisticRegression(),
        'SVM'                    : SVC(),
        'RandomForestClassifier' : RandomForestClassifier(),
        'KNN'                    : KNeighborsClassifier()}

hyper = {
        
        'LogisticRegression':{
                                    'penalty'     : ['l2'],
                                    'C'           : np.logspace(0, 4, 10),
                                    'solver'      : ['lbfgs', 'liblinear', 'saga'],
                                    'class_weight': ['balanced'],
                                    'random_state': [0]},
        
        'SVM':{
                                    'C'           : [0.01, 0.1, 1, 10, 100, 1000],
                                    'gamma'       : [1, 0.1, 0.01, 0.001, 0.0001],
                                    'kernel'      : ['rbf', 'linear'],
                                    'class_weight': ['balanced'],
                                    'random_state': [0]},
        
        'RandomForestClassifier':{
                                    'max_depth': [2, 3, 4],
                                    'max_features': [2, 3, 4, 'auto', 'sqrt'],
                                    'n_estimators': [10, 100, 500, 1000],
                                    'class_weight': ['balanced'],
                                    'random_state': [0]},
       
        'KNN':{
                                    'n_neighbors': [5, 10, 15, 20],
                                    'weights': ['uniform', 'distance']}
                              
                              
    }


for model_name in models.keys():

    # Model selection
    clf    = models[model_name]
    params = hyper[model_name]

    # Pipeline (standarization + classifier)
    pipe = Pipeline([ ( 'scaler', StandardScaler() ), ( 'clf', clf ) ])

    # Gridsearch cross-validation
    grid = GridSearchCV(estimator = clf, param_grid = params, cv = 5, return_train_score = True)
    grid.fit(x_train, y_train)

    # Gridsearch cross-validation results
    best_param                  = grid.best_params_
    best_param_test_score_mean  = grid.cv_results_['mean_test_score'][grid.best_index_]
    best_param_test_score_std   = grid.cv_results_['std_test_score'][grid.best_index_]
    best_param_train_score_mean = grid.cv_results_['mean_train_score'][grid.best_index_]
    best_param_train_score_std  = grid.cv_results_['std_train_score'][grid.best_index_]
    
print(best_param)
print(grid)

In [None]:
grid

In [None]:
from hunga_bunga import HungaBungaClassifier, HungaBungaRegressor

clf = HungaBungaClassifier()
clf.fit(x,y)
clf.prefit(x)