In [None]:
pip install tqdm

In [1]:
import numpy as np
from matplotlib import pyplot as plt 
from scipy.io import wavfile
import os
import re
from hashlib import sha1
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras import layers, models, regularizers
from tqdm import tqdm
from collections import Counter
import time

ModuleNotFoundError: No module named 'scipy'

In [5]:
MAX_NUM_WAVS_PER_CLASS = 2**27 - 1  # ~134M

def which_set(filename, validation_percentage, testing_percentage):
    """
  Helper function when downloading dataset that determines which data partition the file should belong to.

  We want to keep files in the same training, validation, or testing sets even
  if new ones are added over time. This makes it less likely that testing
  samples will accidentally be reused in training when long runs are restarted
  for example. To keep this stability, a hash of the filename is taken and used
  to determine which set it should belong to. This determination only depends on
  the name and the set proportions, so it won't change as other files are added.

  It's also useful to associate particular files as related (for example words
  spoken by the same person), so anything after '_nohash_' in a filename is
  ignored for set determination. This ensures that 'bobby_nohash_0.wav' and
  'bobby_nohash_1.wav' are always in the same set, for example.

  Args:
    filename: File path of the data sample.
    validation_percentage: How much of the data set to use for validation.
    testing_percentage: How much of the data set to use for testing.

  Returns:
    String, one of 'training', 'validation', or 'testing'.
  """


    base_name = os.path.basename(filename)

      # We want to ignore anything after '_nohash_' in the file name when
      # deciding which set to put a wav in, so the data set creator has a way of
      # grouping wavs that are close variations of each other.

    hash_name = re.sub(r'_nohash_.*$', '', base_name).encode('utf-8')

      # This looks a bit magical, but we need to decide whether this file should
      # go into the training, testing, or validation sets, and we want to keep
      # existing files in the same set even if more files are subsequently
      # added.
      # To do that, we need a stable way of deciding based on just the file name
      # itself, so we do a hash of that and then use that to generate a
      # probability value that we use to assign it.

    hash_name_hashed = hashlib.sha1(hash_name).hexdigest()
    percentage_hash = int(hash_name_hashed, 16) % (MAX_NUM_WAVS_PER_CLASS
            + 1) * (100.0 / MAX_NUM_WAVS_PER_CLASS)
    if percentage_hash < validation_percentage:
        result = 'validation'
    elif percentage_hash < testing_percentage + validation_percentage:
        result = 'testing'
    else:
        result = 'training'
    return result

In [6]:
def get_labels(dataset_list):
    """
Samples are the files themselves upon which we will apply a feature extraction function.
Labels are the parent directories of the file. Thus, this function returns the label of a given wav file. 
Function takes as input either a text file containing wav file paths OR a list of wav file paths.  
    """
    training_labels = []
    if type(dataset_list) == list:
        for file in dataset_list:
            training_labels.append(os.path.dirname(file))
        return np.asarray(training_labels)
    
    elif os.path.isfile(dataset_list):
        with open(dataset_list) as f:
            dataset_files = f.readlines()
            for file in dataset_files:
                training_labels.append(os.path.dirname(file))
        return np.asarray(training_labels)
    else:
        return "Sorry, wrong input format provided"
        

In [7]:
def create_training_txt_file(dataset_path):
    
    with open(os.path.join(dataset_path, 'testing_list.txt')) as f:
        testing_files = f.read().splitlines()


    with open(os.path.join(dataset_path, 'validation_list.txt')) as f:
        validation_files = f.read().splitlines()
        
    complete_dataset = []
    with open(os.path.join(dataset_path, 'training_list.txt'), "w") as train_f:
        list_of_local_dirs = [folder for folder in os.listdir(dataset_path) if os.path.isdir(os.path.join(dataset_path, folder))]
        for folder in list_of_local_dirs:
            for file in os.listdir(folder_path):
                if which_set(os.path.join(folder,file), 10, 10) == "training":
                    train_f.write(f"{os.path.join(folder, file)}\n")
        
                    
        
        
        
        
        
        
        
        
        
        
        
        
        
        

In [8]:
def load_audio_data(training_list_path, training_labels_encoded, testing_list_path, testing_labels_encoded, validation_list_path, validation_labels_encoded):
    
    size_of_sample_considered = 1 #length of the audio file in seconds
    fs = 16000 #the stipulated sampling rate according to the dataset description 
    segmentLength = 1024 # no. of samples to use per segment window 
    adjusted_sample_length = int(size_of_sample_considered*fs/segmentLength)*segmentLength # size of audio sample adjusted to be a power of 2. 

    with open(training_list_path) as f1:
        X_training_audio_dataset_paths = f1.read().splitlines()
        
    with open(testing_list_path) as f2:
        X_testing_audio_dataset_paths = f2.read().splitlines()
        
    with open(validation_list_path) as f3:
        X_validation_audio_dataset_paths = f3.read().splitlines()
    
    
    def get_stratified_sample(audio_file_paths, audio_file_labels, fraction):
        X_stratified, _, y_stratified, _ = train_test_split(audio_file_paths,audio_file_labels, train_size = fraction, stratify = audio_file_labels)
        return X_stratified, y_stratified
        
    Training_size = 80000
    fraction_of_original_size = float(Training_size)/len(X_training_audio_dataset_paths)
    
    X_training_audio_dataset_paths, y_train = get_stratified_sample(X_training_audio_dataset_paths, training_labels_encoded, fraction_of_original_size)
    
    y_test = testing_labels_encoded 
    y_val = validation_labels_encoded
    
    X_train_wav = []
    X_test_wav = []
    X_val_wav = []
    

    
    for i in tqdm(range(Training_size)):
        try: 
            fs, train_sample_wav = wavfile.read(os.path.join(dataset_path, X_training_audio_dataset_paths[i]))
        except ValueError:
            print(os.path.join(dataset_path, X_training_audio_dataset_paths[i]))
            pass
            
        _dummy_sample_wav = train_sample_wav.copy() # get copy of wav file that you can modify
        _dummy_sample_wav.resize(adjusted_sample_length)
        _dummy_sample_wav = _dummy_sample_wav.reshape(-1, segmentLength)
        X_train_wav.append(_dummy_sample_wav.astype(np.float32))

        
        
    for i in tqdm(range(len(X_testing_audio_dataset_paths))):
        fs, test_sample_wav = wavfile.read(os.path.join(dataset_path, X_testing_audio_dataset_paths[i]))
        _dummy_sample_wav = test_sample_wav.copy()
        _dummy_sample_wav.resize(adjusted_sample_length)
        _dummy_sample_wav = _dummy_sample_wav.reshape(-1, segmentLength)
        X_test_wav.append(_dummy_sample_wav.astype(np.float32))
        
        
    for i in tqdm(range(len(X_validation_audio_dataset_paths))):
        fs, val_sample_wav = wavfile.read(os.path.join(dataset_path, X_testing_audio_dataset_paths[i]))
        _dummy_sample_wav = val_sample_wav.copy()
        _dummy_sample_wav.resize(adjusted_sample_length)
        _dummy_sample_wav = _dummy_sample_wav.reshape(-1, segmentLength)
        X_val_wav.append(_dummy_sample_wav.astype(np.float32))
        
        
    return X_train_wav, X_test_wav, X_val_wav, y_train, y_test, y_val
    
    

    
    
    
        

In [9]:
def compute_mfccs(samples, fs, upper_edge_hz, lower_edge_hz, num_mel_bins, num_mfcc):
    frame_length = 1024
    stfts = tf.signal.stft(samples, frame_length=frame_length, frame_step=frame_length, fft_length=frame_length) # no overlap
    spectrograms = tf.abs(stfts)
    spectrograms = tf.reshape(spectrograms, (spectrograms.shape[0],spectrograms.shape[1],-1))
    num_spectrogram_bins = stfts.shape[-1]
    linear_to_mel_weight_matrix = tf.signal.linear_to_mel_weight_matrix(num_mel_bins, num_spectrogram_bins, fs, lower_edge_hz, upper_edge_hz)
    mel_spectrograms = tf.tensordot(spectrograms, linear_to_mel_weight_matrix, 1)
    log_mel_spectrograms = tf.math.log(mel_spectrograms + 1e-6)
    mfccs = tf.signal.mfccs_from_log_mel_spectrograms(log_mel_spectrograms)[..., :num_mfcc]
    return tf.reshape(mfccs, (mfccs.shape[0],mfccs.shape[1],mfccs.shape[2],-1))



## Handling Creation of Text Files containing Samples for Training, Testing and Validation

In [10]:
global dataset_path
dataset_path = os.path.join("speech_commands_v0.02")
training_dataset_path = os.path.join(dataset_path, "training_list.txt")
testing_dataset_path = os.path.join(dataset_path, "testing_list.txt")
validation_dataset_path = os.path.join(dataset_path, "validation_list.txt")
if not os.path.exists(training_dataset_path):
    create_training_txt_file(dataset_path)

NameError: name 'os' is not defined

In [11]:
testing_labels = get_labels(testing_dataset_path)
print(len(testing_labels))
training_labels = get_labels(training_dataset_path)
print(len(training_labels))
validation_labels = get_labels(validation_dataset_path)
print(len(validation_labels))

NameError: name 'testing_dataset_path' is not defined

## Ordinal Encoding of Labels

In [12]:
training_encoder = LabelEncoder()
training_encoder.fit(training_labels)
training_labels_encoded = training_encoder.transform(training_labels)

# training_encoder.fit(testing_labels)
testing_labels_encoded = training_encoder.transform(testing_labels)

validation_labels_encoded = training_encoder.transform(validation_labels)


print(training_encoder.classes_)

# sanity check


NameError: name 'LabelEncoder' is not defined

##  Getting an idea of class counts in Training and Testing Sets

In [13]:
print("Training:", Counter(training_labels))
print("\nTesting:", Counter(testing_labels))
print("\nValidation:", Counter(validation_labels))

print("\n",Counter(training_labels_encoded))
print("\n",Counter(testing_labels_encoded))

NameError: name 'Counter' is not defined

## Audio Processing Pipeline on WAV files

A little bit about the audio files at hand:
- We have a total of 105 829 audio files split in the following way for Training, Testing and Validation:
    - Training: 84850
    - Testing: 11005
    - Validation: 9981
- Each audio file was sampled at a 16000 Hz rate and each file is *trimmed down to one second length*. Thus per audio file, you can expect 16000 samples of information describing that file.
- The breakdown of classes found in each set is as follows:
    - Training Set: '_background_noise_','backward','bed','bird','cat','dog','down','eight','five','follow','forward','four','go','happy','house','learn','left','marvin','nine','no','off','on','one','right','seven','sheila','six','stop','three','tree','two' 'up','visual','wow','yes','zero'.
    - Testing Set: 'backward','bed','bird','cat','dog','down','eight','five','follow','forward','four','go','happy','house','learn','left','marvin','nine','no','off','on','one','right','seven','sheila','six','stop','three','tree','two' 'up','visual','wow','yes','zero'

In [14]:
X_train, X_test, X_val, y_train, y_test, y_val = load_audio_data(training_dataset_path, training_labels_encoded, testing_dataset_path, testing_labels_encoded, validation_dataset_path, validation_labels_encoded)

NameError: name 'training_dataset_path' is not defined

In [15]:
start = time.time()
X_train_mfccs = compute_mfccs(X_train, fs = 16000, upper_edge_hz = 8000.0, lower_edge_hz = 800.0, num_mel_bins = 80, num_mfcc = 13)
end = time.time()
print("MFCC extraction time:", end-start, "seconds")

NameError: name 'time' is not defined

In [16]:
start = time.time()
X_test_mfccs = compute_mfccs(X_test, fs = 16000, upper_edge_hz = 8000.0, lower_edge_hz = 800.0, num_mel_bins = 80, num_mfcc = 13)
end = time.time()
print("MFCC extraction time:", end-start, "seconds")

NameError: name 'time' is not defined

In [119]:
start = time.time()
X_val_mfccs = compute_mfccs(X_val, fs = 16000, upper_edge_hz = 8000.0, lower_edge_hz = 800.0, num_mel_bins = 80, num_mfcc = 13)
end = time.time()
print("MFCC extraction time:", end-start, "seconds")

MFCC extraction time: 28.3651282787323 seconds


In [120]:
print("X_train shape:", X_train_mfccs.shape)
print("X_test shape:", X_test_mfccs.shape)
print("X_val shape:", X_val_mfccs.shape)

X_train shape: (80000, 15, 13, 1)
X_test shape: (11005, 15, 13, 1)
X_val shape: (9981, 15, 13, 1)


In [121]:
## Initiation of Conv Neural Network Architecture

In [122]:
batch_size = 32
epochs = 50

train_set = (X_train_mfccs) # normalize to mean 0.5 and variance = 0
train_labels = y_train
print(len(train_set), len(train_labels))


test_set = (X_test_mfccs) # normalize to mean 0.5 and variance = 0
test_labels = y_test

val_set = (X_val_mfccs)
val_labels = y_val



80000 80000


In [1]:
model = tf.keras.models.Sequential()

model.add(layers.InputLayer(input_shape=(train_set.shape[1],train_set.shape[2],train_set.shape[3]), batch_size= batch_size))
model.add(layers.Conv2D(filters=5,kernel_size=(5,5),padding="same",input_shape=(train_set[0].shape)))
model.add(layers.BatchNormalization())
model.add(layers.Activation('relu'))

model.add(layers.Conv2D(filters=16,kernel_size=(5,5),padding='same'))
model.add(layers.BatchNormalization())
model.add(layers.Activation('relu'))

model.add(layers.MaxPool2D((2,2)))

model.add(layers.Conv2D(filters=32,kernel_size=(5,5),padding='same'))
model.add(layers.BatchNormalization())
model.add(layers.Activation('relu'))

model.add(layers.MaxPool2D((5,5)))

model.add(layers.Conv2D(filters=48,kernel_size=(5,5),padding='same'))
model.add(layers.BatchNormalization())
model.add(layers.Activation('relu'))

model.add(layers.GlobalAveragePooling2D())

model.add(layers.Flatten())

model.add(layers.Dense(8))
model.add(layers.Activation('relu'))

model.add(layers.Dense(36))
model.add(layers.Activation('softmax'))


model.compile(loss='sparse_categorical_crossentropy', optimizer=tf.keras.optimizers.Adam(learning_rate = 0.001), metrics=['accuracy'])
model.fit(train_set, train_labels, batch_size, epochs, validation_data=(val_set, val_labels))

NameError: name 'tf' is not defined

In [None]:
model.summary()
score = model.evaluate(test_set, test_labels, batch_size = 64)

In [None]:
model.save("First_it.h5")