In [None]:
pip install tensorflow

In [1]:
#documentation: https://librosa.github.io/librosa/generated/librosa.feature.mfcc.html

from sklearn.ensemble import RandomForestClassifier
from python_speech_features import mfcc
from python_speech_features import logfbank
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
import scipy.io.wavfile as wav
import glob
import sys
import numpy as np
import math
import scipy
import matplotlib.pyplot as plt
import librosa

from sklearn.model_selection import GridSearchCV
from sklearn.metrics import classification_report
from keras.models import Sequential

Using TensorFlow backend.


In [2]:
import os

if os.getcwd().endswith("Calculator") == True or \
   os.getcwd().endswith("Calculator\\") == True or \
   os.getcwd().endswith("Calculator/") == True :
    cwd = os.getcwd()

print(cwd)

/Users/Wengie/Documents/GitHub/Audio-Calculator


In [3]:
def join_features(mfcc, fbank):
    features = np.concatenate((mfcc, fbank), axis=1)
    return features

truncate_threshold = 0.1
truncate_size = 10000.0
pre_emphasis = 0.97

def normalize(signal):
    result = abs(signal).copy().astype('float64')
    xmin = np.min(result)
    xmax = np.max(result)
    for i in range(0, signal.size):
        result[i] = (result[i] - xmin) / (xmax - xmin)
        result[i] = "%.2f" % result[i]
    
    return result

def calculate_normalized_entropy(signal):
    #normalize
    signal = normalize(signal)
    
    p = np.zeros(101)
    for i in range(0, signal.size):
        p[(signal[i] / 0.01).astype('int64')] += 1
    
    p = p / signal.size
    max_entro = 0.0
    min_entro = 1.0
    entropy = np.zeros(signal.size)
    
    for i in range(0, signal.size):
        index = (signal[i] / 0.01).astype('int64')
        entropy[i] = -p[index]*math.log2(p[index])
        max_entro = max(max_entro, entropy[i])
        min_entro = min(min_entro, entropy[i])
        
    for i in range(0,100):
        if p[i] == 0:
            continue
        #print(-p[i]*math.log2(p[i]))
        
    lamb = (max_entro - min_entro) / 2.0
    return entropy, lamb
    
def plot_sig(sig, ylabel):
    plt.figure()
    plt.plot(sig)
    plt.xlabel("time [s]")
    plt.ylabel(ylabel)
    plt.show()

def preprocess(sig):
    #sig = np.append(sig[0], sig[1:] - pre_emphasis * sig[:-1])
    
    start_point = 0
    threshold = 0.1
    norm_sig = normalize(sig)
    np.set_printoptions(threshold=sys.maxsize)
    
    for i in range(0, sig.size):
        if norm_sig[i] >= truncate_threshold:
            start_point = i
            break
    
    stop_point = sig.size
    for i in reversed(range(0, sig.size)):
        if norm_sig[i] >= (truncate_threshold):
            stop_point = i
            break
    
    #print("start point", start_point)
    #print("stop point", stop_point)
    sig = sig[start_point:stop_point]
    #print(sig.size)
    
    #strech sig to 6000
    sig = librosa.core.resample(sig, sig.size, truncate_size)
    
    #sig = truncate_sig(sig)
    
    return sig
    
def read_file(files, label, enable_plot):
    features = []
    labels = []
    for file in files:
        (rate,sig) = wav.read(file)
        
        sig, sample_rate = librosa.core.load(file)
        #entropy, lamb = calculate_normalized_entropy(sig)
        
        if enable_plot == True:
            plot_sig(sig, "before")
        sig = preprocess(sig)
        
        mfcc_feat = mfcc(sig, rate, nfft=1536)
        fbank_feat = logfbank(sig, rate, nfft=1536)    
        acoustic_features = join_features(mfcc_feat, fbank_feat)
        acoustic_features = acoustic_features.flatten()
        features.append(acoustic_features)
        labels.append(label)
        if enable_plot == True:
            plot_sig(sig, "after")
        
    return features, labels
        
digit_feature_types = ["zero", "one", "two", "three", "four", "five", "six", "seven", "eight",\
                 "nine"]

feature_types = ["zero", "one", "two", "three", "four", "five", "six", "seven", "eight",\
                 "nine", "plus", "minus", "times", "over"]

op_feature_types = ["plus", "minus", "times", "over"]

In [4]:
digit_features = []
digit_labels = []
op_features = []
op_labels = []
for i in digit_feature_types:
    if sys.platform.startswith('win32'):
         os.chdir(cwd+"\\recording data\\" + i + "\\")
    elif sys.platform.startswith('darwin'):
         os.chdir(cwd+"/recording data/" + i + "/")
    
    files = glob.glob(os.path.join(os.getcwd(), '*.wav') )
    features, labels = read_file(files, i, False)
    digit_features.extend(features)
    digit_labels.extend(labels)
    print(i, "finished")

for i in op_feature_types:
    if sys.platform.startswith('win32'):
         os.chdir(cwd+"\\recording data\\" + i + "\\")
    elif sys.platform.startswith('darwin'):
         os.chdir(cwd+"/recording data/" + i + "/")
    
    files = glob.glob(os.path.join(os.getcwd(), '*.wav') )
    features, labels = read_file(files, i, False)
    op_features.extend(features)
    op_labels.extend(labels)
    
    print(i, "finished")
    
print("Input and preprocessing finished")

zero finished
one finished
two finished
three finished
four finished
five finished
six finished
seven finished
eight finished
nine finished




plus finished
minus finished
times finished
over finished
Input and preprocessing finished


In [5]:
digit_features = np.asarray(digit_features)
digit_labels = np. asarray(digit_labels)
op_features = np.asarray(op_features)
op_labels = np. asarray(op_labels)

digit_features_backup = digit_features.copy()
digit_labels_backup = digit_labels.copy()
op_features_backup = op_features.copy()
op_labels_backup = op_labels.copy()


In [6]:
digit_features = digit_features_backup.copy()
digit_labels = digit_labels_backup.copy()

print(digit_features.shape)
print(digit_labels.shape)

print(op_features.shape)
print(op_labels.shape)

(2000, 4836)
(2000,)
(389, 4836)
(389,)


In [7]:
#validate features and labels
for i in range(1, digit_features.shape[0]):
    if digit_features[i].size != digit_features[0].size:
        print("digit features", i)
        print(digit_features[i].size)
    if digit_labels[i].size != 1:
        print("digit labels", i)
        print(digit_labels[i].size)


digit_train_set, digit_test_set, digit_train_label, digit_test_label = train_test_split(digit_features, digit_labels, test_size=1.0/7.0, random_state=0, stratify=digit_labels)
op_train_set, op_test_set, op_train_label, op_test_label = train_test_split(op_features, op_labels, test_size=1.0/7.0, random_state=0, stratify=op_labels)

In [None]:
# digit_scaler = StandardScaler()
# # Fit on training set only.
# digit_scaler.fit(digit_train_set)

# digit_train_set = digit_scaler.transform(digit_train_set)
# digit_test_set = digit_scaler.transform(digit_test_set)

# digit_pca = PCA(.95)
# digit_pca.fit(digit_train_set)
# digit_train_set = digit_pca.transform(digit_train_set)
# digit_test_set = digit_pca.transform(digit_test_set)

# print(digit_train_set.shape)
# print(digit_test_set.shape)

# Logistic Regression

In [None]:
digit_logisticRegr = LogisticRegression(multi_class = 'multinomial', solver = 'lbfgs', penalty = 'l2', max_iter = 10000)

digit_logisticRegr.fit(digit_train_set, digit_train_label)

In [None]:
# Predict for One Observation (image)

# for i in range(0, digit_test_set.shape[0]):
#     if test_label[i] in digit_feature_types:
#         result = digit_logisticRegr.predict(digit_test_set[i].reshape(1,-1))
#         if (result != digit_test_label[i]):
#             print("Correct result:" + digit_test_label[i])
#             print(result)
    
print("test set accuracy: ", digit_logisticRegr.score(digit_test_set, digit_test_label) * 100, "%")
print("train set accuracy: ", digit_logisticRegr.score(digit_train_set, digit_train_label) * 100, "%")

In [None]:
op_logisticRegr = LogisticRegression(multi_class = 'multinomial', solver = 'lbfgs', penalty = 'l2')

op_logisticRegr.fit(op_train_set, op_train_label)

In [None]:
for i in range(0, op_test_set.shape[0]):
    if op_test_label[i] in op_feature_types:
        result = op_logisticRegr.predict(op_test_set[i].reshape(1,-1))
        if (result != op_test_label[i]):
            print("Correct result:" + op_test_label[i])
            print(result)
    
print("test set accuracy: ", op_logisticRegr.score(op_test_set, op_test_label) * 100, "%")
print("train set accuracy: ", op_logisticRegr.score(op_train_set, op_train_label) * 100, "%")

# Random Forest Model

In [None]:
digit_rfc = RandomForestClassifier(n_estimators = 150)
X_train, X_val, y_train, y_val = train_test_split(digit_features, digit_labels, test_size=0.2, random_state=10, shuffle = True, stratify=digit_labels)

digit_rfc.fit(X_train, y_train)
#checking the accuracy of the model
print(digit_rfc.score(X_val, y_val))

In [None]:
op_rfc = RandomForestClassifier(n_estimators = 150)
X_train, X_val, y_train, y_val = train_test_split(op_features, op_labels, test_size=0.2, random_state=10, shuffle = True, stratify=op_labels)
op_rfc.fit(X_train, y_train)

#checking the accuracy of the model
print(op_rfc.score(X_val, y_val))

In [None]:
def read_and_test_file(directory, t = "digit"):
    os.chdir(cwd+"/recording data/"+directory)
    files = glob.glob(os.path.join(os.getcwd(), '*.wav') )
    for file in files:
        (rate,sig) = wav.read(file)
        sig, sample_rate = librosa.core.load(file)
        plot_sig(sig, "Amplitude before truncation")
        sig = preprocess(sig)
        plot_sig(sig, "Amplitude after truncation")
        mfcc_feat = mfcc(sig, rate, nfft=1536)
        fbank_feat = logfbank(sig, rate, nfft=1536)
        
        acoustic_features = mfcc_feat
        acoustic_features = join_features(mfcc_feat, fbank_feat)
        acoustic_features = acoustic_features.flatten()
        print(acoustic_features.shape)
        if t == "digit":
            print(acoustic_features.shape)
            clf_result = clf.predict(acoustic_features.reshape(1,-1))
            rf_result = digit_rfc.predict(acoustic_features.reshape(1,-1))
            #acoustic_features = digit_scaler.transform(acoustic_features.reshape(1,-1))
            #acoustic_features = digit_pca.transform(acoustic_features)
            print(acoustic_features.shape)
            logistic_result = digit_logisticRegr.predict(acoustic_features.reshape(1,-1))
        else:
            logistic_result = op_logisticRegr.predict(acoustic_features.reshape(1,-1))
            rf_result = op_rfc.predict(acoustic_features.reshape(1,-1))
            
        print(file)
        print("logistic: ")
        print(logistic_result)
        print("random forest: ")
        print(rf_result)
        print("clf result: ")
        print(clf_result)

In [None]:
read_and_test_file("test", "digit")

In [None]:
read_and_test_file("op_test", "op")

# svm


In [None]:
X_train, X_val, y_train, y_val = train_test_split(digit_features, digit_labels, test_size=0.2, random_state=10, shuffle = True, stratify=digit_labels)

In [None]:
print('digit svm fitting...')
clf = SVC(C=20.0, gamma=0.00001)
clf.fit(X_train, y_train)
dig_test_acc = clf.score(X_val, y_val)
dig_train_acc = clf.score(X_train, y_train)
print("svm digit test accuracy =%0.3f" % dig_test_acc)
print("svm digit train accuracy =%0.3f" % dig_train_acc)

In [None]:
X_train, X_val, y_train, y_val = train_test_split(op_features, op_labels, test_size=0.2, random_state=10, shuffle = True, stratify=op_labels)

print('operator svm fitting...')
clf = SVC(C=20.0, gamma=0.00001)
clf.fit(X_train, y_train)
op_test_acc = clf.score(X_val, y_val)
op_train_acc = clf.score(X_train, y_train)
print("svm operator test accuracy=%0.3f" % op_test_acc)
print("svm operator train accuracy=%0.3f" % op_train_acc)

# function to tune svm hyperparameters, no need to run every time

In [None]:

#Grid search for best parameters
# Set the parameters by cross-validation
tuned_parameters = [{'kernel': ['rbf'], 'gamma': [1e-3, 1e-4, 1e-5],
                     'C': [1, 10 ,20,30,40,50]}]
#, {'kernel': ['linear'], 'C': [1, 10, 100, 1000]}]

scores = ['precision', 'recall']

for score in scores:
    print("# Tuning hyper-parameters for %s" % score)
    print('')

    clf = GridSearchCV(SVC(), tuned_parameters, cv=5,
                        scoring='%s_macro' % score)
    clf.fit(X_train, y_train)

    print("Best parameters set found on development set:")
    print('')
    print(clf.best_params_)
    print('')
    print("Grid scores on development set:")
    print('')
    means = clf.cv_results_['mean_test_score']
    stds = clf.cv_results_['std_test_score']
    for mean, std, params in zip(means, stds, clf.cv_results_['params']):
        print("%0.3f (+/-%0.03f) for %r"
               % (mean, std * 2, params))
    print('')

    print("Detailed classification report:")
    print('')
    print("The model is trained on the full development set.")
    print("The scores are computed on the full evaluation set.")
    print('')
    y_true, y_pred = y_val, clf.predict(X_val)
    print(classification_report(y_true, y_pred))
    print('')

# CNN

In [41]:
import keras
from keras.models import Sequential
from keras.layers import Dense, Conv2D, Dropout, Flatten, MaxPooling2D
from keras.models import model_from_json

In [52]:
imwidth= 78
imheight= 62
num_classes = 10
X_train, X_val, y_train, y_val = train_test_split(digit_features, digit_labels, test_size=0.2, random_state=10, shuffle = True, stratify=digit_labels)
X_train = X_train.reshape(X_train.shape[0], imheight, imwidth, 1)
X_val = X_val.reshape(X_val.shape[0], imheight, imwidth, 1)

input_shape = (imheight, imwidth, 1)



ValueError: invalid literal for int() with base 10: 'eight'

In [49]:
cnn = Sequential()
cnn.add(Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=input_shape))
cnn.add(Conv2D(64, kernel_size=(3, 3), activation='relu'))
cnn.add(MaxPooling2D(pool_size=(2, 2)))
cnn.add(Dropout(0.25))
cnn.add(Flatten())
cnn.add(Dense(128, activation='relu'))
cnn.add(Dropout(0.5))
cnn.add(Dense(num_classes, activation='softmax'))

In [50]:
cnn.compile(loss=keras.losses.categorical_crossentropy, optimizer=keras.optimizers.adam(), metrics=['accuracy'])
print(cnn.summary())

Model: "sequential_16"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d_22 (Conv2D)           (None, 60, 76, 32)        320       
_________________________________________________________________
conv2d_23 (Conv2D)           (None, 58, 74, 64)        18496     
_________________________________________________________________
max_pooling2d_8 (MaxPooling2 (None, 29, 37, 64)        0         
_________________________________________________________________
dropout_9 (Dropout)          (None, 29, 37, 64)        0         
_________________________________________________________________
flatten_5 (Flatten)          (None, 68672)             0         
_________________________________________________________________
dense_8 (Dense)              (None, 128)               8790144   
_________________________________________________________________
dropout_10 (Dropout)         (None, 128)             

In [55]:
#X_train, X_val, y_train, y_val = train_test_split(digit_features, digit_labels, test_size=0.2, random_state=10, shuffle = True, stratify=digit_labels)
cnn.fit(X_train, y_train, batch_size=32, epochs=10, verbose=1, validation_data=(X_val, y_val))

ValueError: Error when checking target: expected dense_9 to have shape (10,) but got array with shape (1,)

In [53]:
cnn.fit(X_train, y_train, batch_size=64, epochs=50, verbose=1, validation_split=0.1, callbacks=[keras_callback])

NameError: name 'keras_callback' is not defined

In [66]:

print('train X shape:', y_train.shape)
print('test X shape:', y_val.shape)

train X shape: (1600,)
test X shape: (400,)


In [67]:
print(X_train.shape)
print(X_val.shape)

(1600, 62, 78, 1)
(400, 62, 78, 1)


In [60]:
from keras.layers import Dense
from keras import Input
from keras.engine import Model
from keras.utils import to_categorical
from keras.layers import Dense, TimeDistributed, Dropout, Bidirectional, GRU, BatchNormalization, Activation, LeakyReLU, \
    LSTM, Flatten, RepeatVector, Permute, Multiply, Conv2D, MaxPooling2D

In [64]:
ip = Input(shape=X_train[0].shape)
m = Conv2D(64, kernel_size=(4, 4), activation='relu')(ip)
m = MaxPooling2D(pool_size=(4, 4))(m)
# m = Conv2D(128, kernel_size=(2, 2), activation='relu')(ip)
# m = MaxPooling2D(pool_size=(2, 2))(m)
m = Flatten()(m)
m = Dense(32, activation='relu')(m)
op = Dense(10, activation='softmax')(m)

model = Model(input=ip, output=op)

model.summary()

Model: "model_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_2 (InputLayer)         (None, 62, 78, 1)         0         
_________________________________________________________________
conv2d_25 (Conv2D)           (None, 59, 75, 64)        1088      
_________________________________________________________________
max_pooling2d_9 (MaxPooling2 (None, 14, 18, 64)        0         
_________________________________________________________________
flatten_6 (Flatten)          (None, 16128)             0         
_________________________________________________________________
dense_10 (Dense)             (None, 32)                516128    
_________________________________________________________________
dense_11 (Dense)             (None, 10)                330       
Total params: 517,546
Trainable params: 517,546
Non-trainable params: 0
_____________________________________________________

  # Remove the CWD from sys.path while we load stuff.


In [65]:

model.compile(loss='categorical_crossentropy',
              optimizer='adam',
              metrics=['accuracy'])

history = model.fit(X_train,
          y_train,
          epochs=100,
          batch_size=32,
          verbose=0,
          validation_data=(X_val, y_val))

ValueError: Error when checking target: expected dense_11 to have shape (10,) but got array with shape (1,)