In [None]:
sound_files_dir='sounds-with-classes'

In [None]:
def get_sound_files_paths(sound_files_dir):
    from os import walk
    sound_files_paths = []
    for (dirpath, dirnames, filenames) in walk(sound_files_dir):
        if len(filenames) > 0:
            for filename in filenames:
                if len(filename) > 0 and filename[0] != '.':
                    sound_files_paths.append('/'.join([dirpath, filename]))
    return sound_files_paths

In [None]:
sound_files_paths = get_sound_files_paths(sound_files_dir)

In [None]:
def read_full_data_from_drive(data_path):
    class_to_file_paths = {}
    for sound_file_path in sound_files_paths:
        file_class = sound_file_path.split('/')[-2]
        if file_class in class_to_file_paths:
            class_to_file_paths[file_class].append(sound_file_path)
        else:
            class_to_file_paths[file_class] = [sound_file_path]
    return class_to_file_paths

def read_data_from_drive(sound_file_path):
    from scipy.io import wavfile
    fs, data = wavfile.read(sound_file_path)
    return data

In [None]:
class_to_file_paths = read_full_data_from_drive(sound_files_dir)

In [None]:
print(class_to_file_paths)

In [None]:
import IPython.display as ipd
ipd.Audio(read_data_from_drive(class_to_file_paths['0'][0]), rate=44100)

In [None]:
def split_data_to_train_and_valid(data, valid_percent=20.0, test_percent=10.0):
    import math
    import random
    train_data = {}
    valid_data = {}
    for class_id, sound_files_paths in data.items():
        valid_samples_in_class = math.floor(len(sound_files_paths) * (valid_percent / 100))
        train_sampler_in_class = len(sound_files_paths) - valid_samples_in_class
        valid_samples_ids = random.sample(range(len(sound_files_paths)), valid_samples_in_class)
        for i in range(len(sound_files_paths)):
            if i in valid_samples_ids:
                valid_data[sound_files_paths[i]] = class_id
            else:
                train_data[sound_files_paths[i]] = class_id
    return (train_data, valid_data)

In [None]:
train_data, valid_data = split_data_to_train_and_valid(class_to_file_paths)

In [None]:
print(train_data)

In [None]:
def get_spectrogram_of_file(samples, sample_rate=44100):
    from scipy import signal
    frequencies, times, spectrogram = signal.spectrogram(samples, sample_rate)
    return spectrogram.T

In [None]:
def get_items_count(data):
    items_count = 0
    for file_path in data:
        items_count += len(read_data_from_drive(file_path))
    return items_count

def get_spectrogram_count(data):
    items_count = 0
    for file_path in data:
        items_count += get_spectrogram_of_file(read_data_from_drive(file_path)).shape[0]
    return items_count

In [None]:
train_data_items_count = get_spectrogram_count(train_data)
valid_data_items_count = get_spectrogram_count(valid_data)

In [None]:
def build_vocabulary(class_to_file_paths):
    vocabulary = {}
    vocab_index = 0
    for file_class, file_paths in class_to_file_paths.items():
        for file_path in file_paths:
            file_data = read_data_from_drive(file_path)
            for file_item in file_data:
                if file_item not in vocabulary:
                    vocabulary[file_item] = vocab_index
                    vocab_index += 1
    reversed_vocabulary = dict(zip(vocabulary.values(), vocabulary.keys()))
    return (vocabulary, reversed_vocabulary)

In [None]:
vocabulary, reversed_vocabulary = build_vocabulary(class_to_file_paths)

In [None]:
from keras.utils import to_categorical
import numpy as np

class StepanBatchGenerator(object):

    def __init__(self, data, num_steps, batch_size, num_classes, vocabulary, skip_steps=1, freq=129):
        self.num_steps = num_steps
        self.batch_size = batch_size
        self.num_classes = num_classes
        self.skip_steps = skip_steps
        self.vocabulary = vocabulary
        self.freq = freq
        
        self.current_data = None
        self.current_file_idx = 0
        self.current_pos_in_file = 0
        
        self.current_spectrogram = None
        self.current_pos_in_spectrogram = 0

        self.file_paths = []
        self.file_classes = []
        for file_path, file_class in data.items():
            self.file_paths.append(file_path)
            self.file_classes.append(file_class)
        self.__change_file()
    
    def __map_data_to_vocabulary(self, data):
        return [self.vocabulary[d] for d in data if d in self.vocabulary]
    
    def __change_file(self):
        while True:
            try:
                if self.current_file_idx+1 >= len(self.file_paths):
                    self.current_file_idx = 0
                else:
                    self.current_file_idx += 1
                self.current_data = read_data_from_drive(self.file_paths[self.current_file_idx])
                self.current_spectrogram = get_spectrogram_of_file(self.current_data)
                self.current_pos_in_file = 0
                self.current_pos_in_spectrogram = 0
                break
            except:
                pass
        
    def generate(self):
        x = np.zeros((self.batch_size, self.num_steps, self.freq))
        y = np.zeros((self.batch_size, self.num_classes))
        while True:
            for i in range(self.batch_size):
                if self.current_pos_in_spectrogram + self.num_steps >= len(self.current_spectrogram):
                    self.__change_file()
                x[i, :, :] = self.current_spectrogram[self.current_pos_in_spectrogram:self.current_pos_in_spectrogram + self.num_steps, :]
                y[i, :] = to_categorical(self.file_classes[self.current_file_idx], num_classes=self.num_classes)
                self.current_pos_in_spectrogram += self.skip_steps
            yield x, y

In [None]:
num_steps = 40
skip_steps = 20
batch_size = 8
num_classes = len(class_to_file_paths)

In [None]:
train_data_generator = StepanBatchGenerator(train_data, num_steps, batch_size, num_classes, vocabulary, skip_steps=skip_steps)
valid_data_generator = StepanBatchGenerator(valid_data, num_steps, batch_size, num_classes, vocabulary, skip_steps=skip_steps)

In [None]:
def pieces_in_whole_sound_with_step(data_count, num_steps, skip_steps):
    pieces = 0
    end_piece = num_steps
    while end_piece <= data_count:
        pieces += 1
        end_piece += skip_steps
    return pieces

In [None]:
train_steps = pieces_in_whole_sound_with_step(train_data_items_count, num_steps, skip_steps)//batch_size
validation_steps = pieces_in_whole_sound_with_step(valid_data_items_count, num_steps, skip_steps)//batch_size
print('Train steps: ', train_steps)
print('Valid steps: ', validation_steps)

In [None]:
import keras
from keras.models import Sequential
from keras.layers import Dense, Activation, Embedding, Dropout, TimeDistributed, LSTM
from keras.initializers import Constant, RandomNormal

hidden_size = 500
num_epochs = 50

stepan_model = Sequential()
stepan_model.add(LSTM(hidden_size, dropout=0.05, return_sequences=False, input_shape=(num_steps, 129)))
stepan_model.add(Dense(units=num_classes, activation='softmax'))
                 
stepan_model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['categorical_accuracy'])
print(stepan_model.summary())
                 
hist = stepan_model.fit_generator(train_data_generator.generate(), train_steps, num_epochs,
                        validation_data=valid_data_generator.generate(),
                        validation_steps=validation_steps)

In [None]:
stepan_model.save("stepan_model.hdf5")

In [None]:
from keras.models import load_model
stepan_model = load_model("stepan_model.hdf5")

In [None]:
dummy_iters = 0
example_training_generator = StepanBatchGenerator(valid_data, num_steps, 1, num_classes, vocabulary, skip_steps=skip_steps)
print("Training data:")
for i in range(dummy_iters):
    dummy = next(example_training_generator.generate())
num_predict = 100
for i in range(num_predict):
    data = next(example_training_generator.generate())
    prediction = stepan_model.predict(data[0])
    predicted = np.argmax(prediction)
    true = np.argmax(data[1])
    print('Label {}: predicted: {} true: {}'.format(i, predicted, true))