In [1]:
import pandas as pd
import numpy as np
import os
import math
from scipy.io import wavfile
import matplotlib.pyplot as plt
import python_speech_features as psf
import h5py
from keras.models import load_model

Using TensorFlow backend.


In [2]:
path = os.getcwd()
temp = path.split('/')
temp.pop(-1)
temp.pop(-1)
path = '/'.join(temp)
path += '/data_v_7_stc/test/'

In [3]:
for root, dirnames, filenames in os.walk(path):
    test = pd.DataFrame(filenames, columns=['filename'])
print(test.shape)
test.head(2)

(610, 1)


Unnamed: 0,filename
0,speech_0100.wav
1,speech_t_0013.wav


### Load files

In [4]:
def make_fv(record):
    # load file
    fs, data = wavfile.read(path + record)
    # create features
    mfcc = psf.mfcc(data, samplerate=fs, nfft=1024, nfilt=26, numcep=13)
    filelength = len(data) / fs
    return (mfcc, filelength)

test['features'] = test.filename.apply(lambda x: make_fv(x))
tmp = test.features.apply(pd.Series)
test['features'] = tmp[0]
test['length'] = tmp[1]

### Cut audiofiles to 6 seconds

In [6]:
maxlen = 6  # in seconds

def cut_large(record):
    global maxlen
    if record.length < maxlen:
        res = pd.DataFrame([(record.filename, record.features)], columns=['filename', 'features'])
        return res
    
    n_rows = math.ceil(record.length / maxlen)
    df = pd.DataFrame(columns=['filename', 'features'])
    
    list_of_arrays = np.array_split(record.features, n_rows, axis=0)
    for array in list_of_arrays:
        df = df.append({'filename': record.filename, 'features': array}, ignore_index=True)
    
    return df

list_of_frames = test.apply(lambda x: cut_large(x), axis=1)
cutted = pd.concat(list(list_of_frames))
cutted = cutted.reset_index(drop=True)

### Prepare testing data

In [7]:
X_test = cutted.features

# add zeros at the end
def equalize(array, maxlen, n_features):
    length = array.shape[0]
    if length < maxlen:
        additional = np.array([[0 for i in range(n_features)] for j in range(maxlen - length)])
        res = np.concatenate([array, additional])
        return res
    return array

# pad sequence
def pad_row(series):
    length = max(series.apply(lambda x: len(x))) + 1
    n_features = series[0].shape[1]
    result = series.apply(lambda x: equalize(x, length, n_features))
    return result

tmp = pad_row(X_test)
X_test = np.stack(tmp.values)

print('Vectors are padded')

print(len(X_test), 'test sequences')
X_test.shape

Vectors are padded
922 test sequences


(922, 600, 13)

### Save dataset

In [8]:
h5f = h5py.File('test.h5', 'w')
h5f.create_dataset('test', data=X_test)
h5f.close()

### Load LSTM model

In [9]:
model = load_model('bidirLSTM.h5')

### Predict values on the testing set

In [10]:
labels = ['background', 'bags', 'door', 'keyboard', 'knocking_door', 'ring', 'speech', 'tool']
X_test.shape

(922, 600, 13)

In [11]:
Y_pred = model.predict(X_test)
final = pd.concat([cutted, pd.DataFrame(Y_pred, columns=labels)], axis=1)

### Max-pool the prediction for a file

In [12]:
# drop features
for_grouping = final.drop('features', axis=1)
grouped = for_grouping.groupby('filename')

def pop_rows():
    global labels
    for filename, table in grouped:
        table = table.drop('filename', axis=1)
        
        max_score = np.max(table.values)
        n_max_label = np.argmax(np.asarray(table.values), axis=None)
        max_label = labels[n_max_label % 8]
        
#         print('max score:', max_score)
#         print('position:', max_label % 8)
#         print('max label:', max_label)

        yield {'filename': filename, 'score': max_score, 'label': max_label}

result = pd.DataFrame(pop_rows())
result = result.reindex(['filename', 'score', 'label'], axis=1)

### Save file

In [13]:
path = os.getcwd()
temp = path.split('/')
temp.pop(-1)
path = '/'.join(temp)

# pandas save to txt: filename score label
result.to_csv(path + r'/result.txt', header=None, index=None, sep='\t')