In [1]:
import os
import numpy as np
import scipy
from scipy.io import wavfile
import scipy.fftpack as fft
from scipy.signal import get_window
import IPython.display as ipd
import matplotlib.pyplot as plt
import pathlib
import tensorflow as tf
import pandas as pd
import librosa
from IPython import display

from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten, Conv2D, MaxPooling2D
from tensorflow.keras.utils import to_categorical
from keras import optimizers

In [2]:
data_dir = pathlib.Path("D:\\speech recog\\train set2")

In [3]:
words_list = np.array(tf.io.gfile.listdir(str(data_dir)))
print('words_list:', words_list)

words_list: ['คอส' 'คูณ' 'บวก' 'ยกกำลัง' 'ยี่' 'ร้อย' 'ลบ' 'วาย' 'ศูนย์' 'สอง' 'สาม'
 'สิบ' 'สี่' 'ส่วน' 'หก' 'หนึ่ง' 'หาร' 'ห้า' 'เก้า' 'เจ็ด' 'เท่ากับ' 'เศษ'
 'เอ็กซ์' 'เอ็ด' 'แซด' 'แทน' 'แปด' 'ไซน์']


In [4]:
file = tf.io.gfile.glob(str(data_dir) + '/*/*')
num_samples = len(file)
print('Number of total examples:', num_samples)
print('Number of examples per label:',
      len(tf.io.gfile.listdir(str(data_dir/words_list[0]))))
print('Example file tensor:', file[0])

Number of total examples: 4480
Number of examples per label: 160
Example file tensor: D:\speech recog\train set2\คอส\cos200cut100.wav


In [5]:
testfiles = pathlib.Path("D:\\speech recog\\test set")
testfiles = tf.io.gfile.glob(str(testfiles) + '/*/*')

valfiles = pathlib.Path("D:\\speech recog\\validation")
valfiles = tf.io.gfile.glob(str(valfiles) + '/*/*')

train_files = file
val_files = valfiles
test_files = testfiles

print('Training set size', len(train_files))
print('Validation set size', len(val_files))
print('Test set size', len(test_files))

Training set size 4480
Validation set size 560
Test set size 840


In [6]:
def wav2mfcc(waveform):

    input_len = 217413
    waveform = waveform[:input_len]
    zero_padding = tf.zeros([217413] - tf.shape(waveform),dtype=tf.float32)
    #zero_padding = np.array(zero_padding,np.float32)
  # Cast the waveform tensors' dtype to float32.
    waveform = tf.cast(waveform, dtype=tf.float32)
    #waveform = np.array(waveform, np.float32)
  # Concatenate the waveform with `zero_padding`, which ensures all audio
  # clips are of the same length.
    equal_length = tf.concat([waveform, zero_padding], 0)    
    equal_length = np.array(equal_length, np.float32)
    
    mfcc = librosa.feature.mfcc(equal_length)
    #mfcc = tf.convert_to_tensor(mfcc, dtype=tf.float32)
    
    return mfcc

In [7]:
def decode_audio(audio_binary):
    audio, _ = tf.audio.decode_wav(audio_binary)
    return tf.squeeze(audio, axis=-1)

In [8]:
def get_label(file_path):
    parts = tf.strings.split(file_path, os.path.sep)

  # Note: You'll use indexing here instead of tuple unpacking to enable this 
  # to work in a TensorFlow graph.
    return parts[-2]

In [9]:
def get_waveform_and_label(file_path):
    label = get_label(file_path)
    audio_binary = tf.io.read_file(file_path)
    waveform = decode_audio(audio_binary)
    return waveform, label

In [None]:
get_waveform_and_label(val_files[3])

In [10]:
AUTOTUNE = tf.data.AUTOTUNE
files_ds = tf.data.Dataset.from_tensor_slices(train_files)
waveform_ds = files_ds.map(get_waveform_and_label, num_parallel_calls=AUTOTUNE)

In [None]:
np.array(waveform_ds)

In [11]:
def get_mfcc_and_label_id(audio, label):
    #audio = np.array(audio, np.float32)
    mfcc = wav2mfcc(audio)
    mfcc = tf.expand_dims(mfcc, -1)
    label_id = tf.argmax(label == words_list)
    return mfcc, label_id

In [None]:
waveform_ds

In [None]:
X = []
y = []

def append_X_Y(label, wave):
    label = tf.argmax(label == words_list)
    y.append(label)
    mfcc = wav2mfcc(wave)
    X.append(mfcc)
    
for waveform, label in waveform_ds:
    append_X_Y(label, waveform)

In [None]:
def append_X_Y_test(label, wave):
    label = tf.argmax(label == words_list)
    y_test.append(label)
    mfcc = wav2mfcc(wave)
    X_test.append(mfcc)
    
def append_X_Y_val(label, wave):
    label = tf.argmax(label == words_list)
    y_val.append(label)
    mfcc = wav2mfcc(wave)
    X_val.append(mfcc)

In [None]:
def processtest(files):
    files_ds = tf.data.Dataset.from_tensor_slices(files)
    output_ds = files_ds.map(
        map_func=get_waveform_and_label,
        num_parallel_calls=AUTOTUNE)
    return output_ds

test_dt = processtest(test_files)

def processval(files):
    files_ds = tf.data.Dataset.from_tensor_slices(files)
    output_ds = files_ds.map(
        map_func=get_waveform_and_label,
        num_parallel_calls=AUTOTUNE)
    return output_ds

val_dt = processval(val_files)

In [12]:
def process_pred(files):
    files_ds = tf.data.Dataset.from_tensor_slices(files)
    output_ds = files_ds.map(
        map_func=get_waveform_and_label,
        num_parallel_calls=AUTOTUNE)
    return output_ds

#for_pred = process_pred()

In [None]:
X_test = []
y_test = []

for waveform, label in test_dt:
    append_X_Y_test(label, waveform)

X_val = []
y_val = []

for waveform, label in val_dt:
    append_X_Y_val(label, waveform)

In [None]:
X = np.array(X)
y = np.array(y)
X.shape[0] == len(y)

In [None]:
X_train = X
y_train = y

In [None]:
X_train = np.asarray(X_train).astype(np.float32)
y_train = np.asarray(y_train).astype(np.float32)

In [None]:
X_test = np.array(X_test)
y_test = np.array(y_test)

X_val = np.array(X_val)
y_val = np.array(y_val)

In [None]:
X_test = np.asarray(X_test).astype(np.float32)
y_test = np.asarray(y_test).astype(np.float32)

X_val = np.asarray(X_val).astype(np.float32)
y_val = np.asarray(y_val).astype(np.float32)

In [None]:
X.shape

In [None]:
X_val.shape

In [None]:
#get_mfcc_and_label_id(audio, label)
for waveform, label in waveform_ds.take(10):
    #label = label.numpy().decode('utf-8')
    A = get_mfcc_and_label_id(waveform, label)
    label_id = tf.argmax(label == words_list)

    
    print(A)
    #print(len(A))
    print("Label: ",label)
    print("Label id: ",label_id)

In [13]:
batch_size = 100
epochs = 30
verbose = 1
channel=1

feature_dim_1 = 20
feature_dim_2 = 425

In [None]:
batch_size = 100
epochs = 30
verbose = 1
channel=1

feature_dim_1 = 20
feature_dim_2 = 425

X_train = X_train.reshape(X_train.shape[0], feature_dim_1, feature_dim_2, channel)
X_val = X_val.reshape(X_val.shape[0], feature_dim_1, feature_dim_2, channel)

In [None]:
X_train.shape

Model Training

In [None]:
num_labels = len(words_list)
def get_model():
    model = Sequential()
    model.add(Conv2D(32, kernel_size=(2, 2), activation='relu', input_shape=(feature_dim_1, feature_dim_2, channel)))
    model.add(Conv2D(48, kernel_size=(2, 2), activation='relu'))
    model.add(Conv2D(120, kernel_size=(2, 2), activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Dropout(0.25))
    model.add(Flatten())
    model.add(Dense(128, activation='relu'))
    model.add(Dropout(0.25))
    model.add(Dense(64, activation='relu'))
    model.add(Dropout(0.4))
    model.add(Dense(num_labels, activation='softmax'))
    return model

In [None]:
model = get_model()

optimizer = tf.keras.optimizers.Adam()
# optimizer = optimizers.Adagrad(lr=0.01, epsilon=None, decay=0.0)

model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              optimizer=optimizer,
              metrics=['accuracy'])
model.fit(X_train, y_train, batch_size=batch_size, epochs=epochs, verbose=verbose, validation_data=(X_val, y_val))

In [None]:
#model.save("model_MFCCfinal1.h5") 

In [14]:
from keras.models import load_model
model = load_model('model_MFCCfinal1.h5')

In [None]:
model.summary()

In [None]:
X_test =X_test.reshape(X_test.shape[0], feature_dim_1, feature_dim_2, channel)

In [None]:
y_pred = np.argmax(model.predict(X_test), axis=1)
y_true = y_test

test_acc = sum(y_pred == y_true) / len(y_true)
print(f'Test set accuracy: {test_acc:.0%}')

In [None]:
y_label = []
for i in range(len(y_true)):
    label = words_list[int(y_true[i])]
    y_label = np.append(y_label,label)

In [None]:
ypred_label = []
for i in range(len(y_pred)):
    label = words_list[int(y_pred[i])]
    ypred_label = np.append(ypred_label,label)

In [None]:
ypred_label

In [None]:
wrong_pred = []
for i in range(len(y_label)):
    if ypred_label[i]!= y_label[i]:
        wrong_pred = np.append(wrong_pred,y_label[i])

In [33]:
samples = tf.io.gfile.glob(str("D:\\speech recog\\train set2\เก้า\\nine (1).wav"))
num_samples = len(samples)
print(samples)
print(num_samples)

['D:\\speech recog\\train set2\\เก้า\\nine (1).wav']
1


In [34]:
test = []
testCon = []
def process_pred(files):
    files_ds = tf.data.Dataset.from_tensor_slices(files)
    output_ds = files_ds.map(
        map_func=get_waveform_and_label,
        num_parallel_calls=AUTOTUNE)
    return output_ds


def append_X(wave):
    mfcc = wav2mfcc(wave)
    test.append(mfcc)

def append_Xcon(wave):
    mfcc = wav2mfcc(wave)
    testCon.append(mfcc)    

In [468]:
import pandas as pd

df_test = pd.read_csv('testaccequation.csv')

file_n = df_test['file_location'].values.tolist()
label_n = df_test['label'].values.tolist()

In [35]:
sample_ds=process_pred(samples)
print(sample_ds)

<ParallelMapDataset shapes: ((None,), ()), types: (tf.float32, tf.string)>


In [469]:
sampleCon_ds=process_pred(file_n)
print(sampleCon_ds)

<ParallelMapDataset shapes: ((None,), ()), types: (tf.float32, tf.string)>


In [36]:
for waveform, label in sample_ds:
    append_X(waveform)

In [None]:
for waveform, label in sampleCon_ds:
    append_Xcon(waveform)

In [40]:
test.shape

(1, 20, 425)

In [None]:
test[1].shape

In [37]:
test = np.array(test)
testCon = np.array(testCon)

In [502]:
test = np.asarray(test).astype(np.float32)
testCon = np.asarray(testCon).astype(np.float32)

In [None]:
test.shape

In [None]:
type(test)

In [None]:
testCon.shape

In [30]:
test = test.reshape(test.shape[0], feature_dim_1, feature_dim_2, channel)
testCon = testCon.reshape(testCon.shape[0], feature_dim_1, feature_dim_2, channel)

In [32]:
test.shape

(1, 20, 425, 1)

In [None]:
sampletest = np.argmax(model.predict(test), axis=1)
sampletest

In [None]:
sampleretest = model.predict(test)
#sampleretest

In [474]:
sampleCon = model.predict(testCon)

In [None]:
finalre = []

for i in range(len(sampleretest)):
    print("Candidate ranking of audio segment",i+1)
    rank = []
    array = sampleretest[i]
    worddict = words_list
    for i in range(len(sampleretest[i])):
        max = np.argmax(array)
        result = worddict[int(max)]
        array = np.delete(array,int(max))
        worddict = np.delete(worddict,int(max))
        rank = np.append(rank,result)
        #print(max)
        #print("Candidate",i+1)
        #print(result)
    print(rank)
        #print(array)
    finalre = np.append(finalre,rank[0])


#print("Predict",finalre)
    
i=0
sen= str(finalre[0])
while i<=num_samples-2:
    sen = sen+str(finalre[i+1])
    i+=1
print("Predict",sen)

In [475]:
finalCon = []
final_2nd = []
final_3rd = []
for i in range(len(sampleCon)):
    #print("Sample",i+1)
    rank = []
    array = sampleCon[i]
    worddict = words_list
    for i in range(len(sampleCon[i])):
        max = np.argmax(array)
        result = worddict[int(max)]
        array = np.delete(array,int(max))
        worddict = np.delete(worddict,int(max))
        rank = np.append(rank,result)
        #print(max)
        #print("Candidate",i+1)
        #print(result)
    #print(rank)
        #print(array)
    finalCon = np.append(finalCon,rank[0])
    final_2nd = np.append(final_2nd,rank[1])
    final_3rd = np.append(final_3rd,rank[2])

print("Predict",finalCon)
print("2nd rank",final_2nd)
print("3rd rank",final_3rd)

Predict ['เจ็ด' 'ร้อย' 'สอง' 'ลบ' 'สี่' 'สิบ' 'สาม' 'ลบ' 'เก้า' 'สิบ' 'เจ็ด' 'บวก'
 'สิบ' 'เจ็ด' 'แปด' 'สิบ' 'ห้า' 'บวก' 'สิบ' 'แปด' 'ลบ' 'เจ็ด' 'สิบ' 'เจ็ด'
 'ลบ' 'แปด' 'เจ็ด' 'สิบ' 'เก้า' 'ลบ' 'สี่' 'สิบ' 'สอง' 'ลบ' 'ห้า' 'สิบ'
 'แปด' 'บวก' 'หก' 'หก' 'ส่วน' 'สอง' 'ลบ' 'สิบ' 'หก' 'บวก' 'เศษ' 'ส่วน'
 'สิบ' 'หก' 'หนึ่ง' 'สอง' 'ลบ' 'สิบ' 'หก' 'บวก' 'เศษ' 'เจ็ด' 'สิบ' 'เจ็ด'
 'สิบ' 'แปด' 'บวก' 'เจ็ด' 'สิบ' 'เจ็ด' 'ลบ' 'ห้า' 'สิบ' 'บวก' 'ลบ' 'สิบ'
 'สาม' 'เจ็ด' 'สิบ' 'แปด' 'ลบ' 'ไซน์' 'สิบ' 'สี่' 'บวก' 'ส่วน' 'ห้า' 'แปด'
 'ร้อย' 'สิบ' 'เก้า' 'บวก' 'เก้า' 'สิบ' 'เจ็ด' 'บวก' 'เศษ' 'เจ็ด' 'สิบ'
 'ลบ' 'หก' 'สิบ' 'สิบ' 'แปด' 'ร้อย' 'สิบ' 'เก้า' 'บวก' 'เก้า' 'สิบ' 'สิบ'
 'บวก' 'เศษ' 'สิบ' 'สิบ' 'ลบ' 'หก' 'สิบ' 'เจ็ด' 'เจ็ด' 'ร้อย' 'ส่วน' 'สิบ'
 'เจ็ด' 'ลบ' 'หก' 'สิบ' 'หก' 'ลบ' 'เก้า' 'บวก' 'สิบ' 'สิบ' 'สี่' 'เอ็กซ์'
 'ยี่' 'สอง' 'หนึ่ง' 'สี่' 'สิบ' 'ไซน์' 'หาร' 'แซด' 'เท่ากับ' 'ส่วน' 'ห้า'
 'ไซน์' 'ยกกำลัง' 'สอง' 'หก' 'แปด' 'สิบ' 'เจ็ด' 'บวก' 'เอ็กซ์' 'หาร'
 'ไซน์' 'ยกกำลัง' 'สี่' 'เท่ากับ' 'สาม

In [None]:
final = []
for i in range(num_samples):
    result = words_list[int(sampletest[i])]
    #print(result)
    final = np.append(final,result)
    
i=0
sen= str(final[0])
while i<=num_samples-2:
    sen = sen+str(final[i+1])
    i+=1
print(sen)

In [476]:
#test only 1st rank
y_predEq = finalCon
y_trueEq = label_n

test_accEq = sum(y_predEq == y_trueEq) / len(y_trueEq)
print(f'Test set accuracy: {test_accEq:.02%}')

Test set accuracy: 77.27%


In [477]:
#test 1st+2nd rank
test_accEq2 = (sum(y_predEq == y_trueEq)+ sum(final_2nd == y_trueEq))/ len(y_trueEq)
print(f'Test set accuracy: {test_accEq2:.0%}')

Test set accuracy: 86%


In [478]:
#test 1st+2nd+3rd rank
test_accEq3 = (sum(y_predEq == y_trueEq) + sum(final_2nd == y_trueEq) + sum(final_3rd == y_trueEq))/ len(y_trueEq)
print(f'Test set accuracy: {test_accEq3:.0%}')

Test set accuracy: 89%
