In [1]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import tensorflow as tf
from tensorflow.contrib.framework.python.ops import audio_ops as contrib_audio
from tensorflow.python.ops import io_ops
import warnings
from os import listdir
from os.path import isfile, join
from graphviz import Digraph
from imageio import imwrite
import scipy.spatial as sp

warnings.simplefilter(action='ignore', category=FutureWarning)

## Functions

### Restore model

In [2]:
def load_model(checkpoint,first_layer_filters=64, mfcc=True):
    
    tf.reset_default_graph()
    sess = tf.Session()
    
    v = tf.get_variable("Variable", shape=[20,8,1,first_layer_filters])
    v1 = tf.get_variable("Variable_1", shape=[first_layer_filters])
    v2 = tf.get_variable("Variable_2", shape=[10,4,first_layer_filters,64])
    v3 = tf.get_variable("Variable_3", shape=[64])
    if mfcc:
        v4 = tf.get_variable("Variable_4", shape=[62720,12])
    else:
        v4 = tf.get_variable("Variable_4", shape=[404544,12])
    v5 = tf.get_variable("Variable_5", shape=[12])
    saver = tf.train.Saver()

    init = tf.global_variables_initializer()
    sess.run(init)
    
    saver.restore(sess, checkpoint)
    first_weights = np.asarray(v.eval(session=sess))
    first_bias = np.asarray(v1.eval(session=sess))
    second_weights = np.asarray(v2.eval(session=sess))
    second_bias = np.asarray(v3.eval(session=sess))
    final_fc_weights = np.asarray(v4.eval(session=sess))
    final_fc_bias = np.asarray(v5.eval(session=sess))
    
    weights = {'first_weights':first_weights, 'first_bias':first_bias,
                 'second_weights':second_weights, 'second_bias':second_bias,
                 'final_fc_weights':final_fc_weights, 'final_fc_bias':final_fc_bias}
    
    return {'session':sess, 'weights':weights}

### Generate spectogram and MFCC from wav file
Wav: input wave file

Mode:
- Set to "original" to display image as used in the networks
- Set to "enhanced" to display a brightened enhanced version

Plot: if true, show spectogram and MFCC plot

In [3]:
def wav_to_spectogram(wav, mode="original", plot=False):

    if mode == "original":
        model_settings = {'dct_coefficient_count': 40, 
                          'window_size_samples': 480, 
                          'label_count': 12, 
                          'desired_samples': 16000, 
                          'window_stride_samples': 160, 
                          'spectrogram_length': 98, 
                          'sample_rate': 16000, 
                          'fingerprint_size': 3920}
    else:
        # settings from wav_to_spectrogram script from tensorflow tutorial
        model_settings = {'dct_coefficient_count': 40, 
                          'window_size_samples': 256,
                          'label_count': 12, 
                          'desired_samples': 16000, 
                          'window_stride_samples': 128, 
                          'spectrogram_length': 98, 
                          'sample_rate': 16000, 
                          'fingerprint_size': 3920}

    # load file
    desired_samples = model_settings['desired_samples']
    wav_filename_placeholder_ = tf.placeholder(tf.string, [])
    wav_loader = io_ops.read_file(wav_filename_placeholder_)
    wav_decoder = contrib_audio.decode_wav(wav_loader, desired_channels=1, desired_samples=desired_samples)

    # required placeholders for things we don't really use, here no values are set yet,
    # they are just placeholders
    foreground_volume_placeholder_ = tf.placeholder(tf.float32, [])
    time_shift_padding_placeholder_ = tf.placeholder(tf.int32, [2, 2])
    time_shift_offset_placeholder_ = tf.placeholder(tf.int32, [2])
    background_data_placeholder_ = tf.placeholder(tf.float32, [desired_samples, 1])
    background_volume_placeholder_ = tf.placeholder(tf.float32, [])
    time_shift_padding_placeholder_ = tf.placeholder(tf.int32, [2, 2])
    time_shift_offset_placeholder_ = tf.placeholder(tf.int32, [2])
    scaled_foreground = tf.multiply(wav_decoder.audio, foreground_volume_placeholder_)
    padded_foreground = tf.pad(scaled_foreground, time_shift_padding_placeholder_, mode='CONSTANT')
    sliced_foreground = tf.slice(padded_foreground, time_shift_offset_placeholder_, [desired_samples, -1])
    background_mul = tf.multiply(background_data_placeholder_, background_volume_placeholder_)
    background_add = tf.add(background_mul, sliced_foreground)
    background_clamp = tf.clip_by_value(background_add, -1.0, 1.0)

    # Run the spectrogram and MFCC ops to get a 2D 'fingerprint' of the audio.
    spectrogram = contrib_audio.audio_spectrogram(
        background_clamp,
        window_size=model_settings['window_size_samples'],
        stride=model_settings['window_stride_samples'],
        magnitude_squared=True)

    mfcc = contrib_audio.mfcc(
        spectrogram,
        wav_decoder.sample_rate,
        dct_coefficient_count=model_settings['dct_coefficient_count'])

    # set some paramters / settings for the spectrogram / mfcc
    input_dict = {
        wav_filename_placeholder_: input_wav, # path to file we want to analyze
        time_shift_padding_placeholder_: [[0, 0], [0, 0]],
        time_shift_offset_placeholder_: [0, 0],
        background_data_placeholder_ : np.zeros([desired_samples, 1]), # no background noise
        background_volume_placeholder_ : 0.0, # no background noise
        foreground_volume_placeholder_ : 1.0 # don't silence the wav file
    }

    # run spectrogram and mfcc analysis, output is a numpy array
    spectrogram_data = sess.run( spectrogram, feed_dict= input_dict)
    mfcc_data = sess.run( mfcc, feed_dict= input_dict)

    spectrogram_data_plot = spectrogram_data[0]
    mfcc_data_plot = mfcc_data[0]

    # Do some extra preprocessing to make the spectrogram more easy to read_file
    # if the enhanced mode was chosen
    if mode == "enhanced":
        # normalize the array to the 0-255 range
        spectrogram_data_plot *= 255.0 / spectrogram_data_plot.max()

        # brighten it a bit
        brightness = 3 # brighten by 500%
        spectrogram_data_plot = spectrogram_data_plot * brightness

        # clip back to [0, 255] range
        spectrogram_data_plot = np.clip(spectrogram_data_plot, 0.0, 255.0)

    if plot:
        # init plots
        print ("\nSpectrogram data spectrogram: %s" % str(np.shape(spectrogram_data[0])))
        print ("MFCC data shape: %s" % str(np.shape(mfcc_data[0])))
        print ("MFCC has 40 coefficients")

        input_time_size = spectrogram_data.shape[1]
        input_frequency_size = spectrogram_data.shape[2]
        fig2=plt.figure(figsize=(8, 20))
        fig2.suptitle("Spectrogram of wav file")
        plt.xlabel('Time')
        plt.ylabel('Frequency')
        
        # normalize
        row_sums = spectrogram_data_plot.sum(axis=1)
        norm = spectrogram_data_plot / row_sums[:, np.newaxis]

        plt.imshow(np.rot90(norm), cmap='binary')
        plt.xticks([i*input_time_size/10 for i in range(10)], range(0,1000,100))
        plt.yticks([i*input_frequency_size/39.8 for i in range(40)], range(8000,0,-200))
        
    return mfcc_data, spectrogram_data

### Create 4D fingerprint from MFCC

In [4]:
def create_fingerprint(mfcc):
    input_frequency_size = mfcc.shape[2] # model_settings['dct_coefficient_count']
    input_time_size = mfcc.shape[1] #model_settings['spectrogram_length']
    fingerprint_input = mfcc
    fingerprint_4d = tf.reshape(fingerprint_input, [-1, input_time_size, input_frequency_size, 1])  
    
    return fingerprint_4d

### Model predict
Input:
- 4D fingerprint of an MFCC
- Restored weights (dictionary) of a model

Returns:
- The predicted label
- Probability of the predicted label (accuracy)
- The activations of the first convolutional layer, ReLu and max pool

In [5]:
def predict(fingerprint_4d, weights):
    # initialize weights
    first_weights = weights["first_weights"]
    first_bias = weights["first_bias"]
    second_weights = weights["second_weights"]
    second_bias = weights["second_bias"]
    final_fc_weights = weights["final_fc_weights"]
    final_fc_bias = weights["final_fc_bias"]

    # first pass
    first_conv = tf.nn.conv2d(fingerprint_4d, first_weights, [1, 1, 1, 1], 'SAME') + first_bias
    first_relu = tf.nn.relu(first_conv)
    max_pool = tf.nn.max_pool(first_relu, [1, 2, 2, 1], [1, 2, 2, 1], 'SAME')

    # second pass
    second_conv = tf.nn.conv2d(max_pool, second_weights, [1, 1, 1, 1], 'SAME') + second_bias
    second_relu = tf.nn.relu(second_conv)
    second_conv_shape = second_relu.get_shape()
    second_conv_output_width = second_conv_shape[2]
    second_conv_output_height = second_conv_shape[1]
    second_conv_element_count = int(second_conv_output_width * second_conv_output_height * 64)
    flattened_second_conv = tf.reshape(second_relu, [-1, second_conv_element_count])
    final_fc = tf.matmul(flattened_second_conv, final_fc_weights) + final_fc_bias

    # prediction
    pred = tf.nn.softmax(final_fc).eval(session=sess)
    labels = ["_silence_", "_unknown_", "yes", "no", "up", "down", "left", "right", "on", "off", "stop", "go"]
    pred_label = labels[np.argmax(pred)]
    accuracy = np.max(pred)
    
    return [pred_label, accuracy, first_conv, first_relu, max_pool]

<hr>
## Experiments

### Restore model

In [8]:
# Use MFCC with 64 filters in first layer

# C:/python/asr/model_data/mfcc_64f_18000s/speech_commands_train/conv.ckpt-18000
#restore = load_model("C:/python/asr/model_data/mfcc_64f_18000s/speech_commands_train/conv.ckpt-18000", first_layer_filters=64)

# 2, 4, 8
restore = load_model("C:/python/asr/model_data/spectrogram_64f_36000s/speech_commands_train/conv.ckpt-36000", first_layer_filters=64, mfcc=False)

sess = restore["session"]
weights = restore["weights"]

INFO:tensorflow:Restoring parameters from C:/python/asr/model_data/spectrogram_64f_36000s/speech_commands_train/conv.ckpt-36000


### Predict labels

In [None]:
predictions = []
input_dir = 'D:/tmp/speech_dataset/on/'
wavs = [f for f in listdir(input_dir) if isfile(join(input_dir, f))]

# Predict each wav in a dictionary
for wav in wavs:
    input_wav = input_dir + wav
    
    mfcc, spec = wav_to_spectogram(input_wav)
    fingerprint = create_fingerprint(spec)
    label, accuracy, _, _, _ = predict(fingerprint, weights)
    
    predictions.append((wav, label, accuracy))
    print(wav, label, accuracy)    

### Visualize first convolutional layer

In [None]:
first_weights = weights["first_weights"]

fig = plt.figure(figsize=(10, 10))

for i in range(first_weights.shape[3]):
    fig.add_subplot(8, 8, i+1)
    plt.imshow(np.rot90( first_weights[:,:,0,i] ), cmap='gray')
plt.show()

### Visualize second convolutional layer

In [None]:
second_weights = weights["second_weights"]

fig=plt.figure(figsize=(10, 10))

channel = 0

for i in range(first_weights.shape[3]):
    fig.add_subplot(8, 8, i+1)
    plt.imshow(second_weights[:,:,channel,i], cmap='gray')
plt.show()

### Visualize Activations

In [None]:
input_wav = 'D:/tmp/speech_dataset/on/39dce8cc_nohash_0.wav'
mfcc, spec = wav_to_spectogram(input_wav, mode="original", plot=True)

mfcc = spec
filters = 64

fingerprint = create_fingerprint(mfcc)
_, _, first_conv, first_relu, max_pool = predict(fingerprint, weights)

#### Convolutional layer activations

In [None]:
act_conv = np.asarray(first_conv.eval(session=sess))

input_time_size = mfcc.shape[1]
input_frequency_size = mfcc.shape[2] 

# All activations
fig=plt.figure(figsize=(10, 10))
for i in range(filters):
    fig.add_subplot(8, 8, i+1)
    im = act_conv[0,:,:,i].reshape((input_time_size,input_frequency_size))
    plt.imshow(im, cmap='gray')
plt.show()

# Single activation
fig=plt.figure(figsize=(8, 20))
im = act_conv[0,:,:,0].reshape((input_time_size,input_frequency_size))
plt.imshow(np.rot90(im), cmap='jet')
plt.colorbar(fraction=0.02)
plt.show()

#### ReLU activations

In [None]:
act_relu = np.asarray(first_relu.eval(session=sess))

input_time_size = mfcc.shape[1]
input_frequency_size = mfcc.shape[2] 

# All activations
fig=plt.figure(figsize=(10, 10))
for i in range(filters):
    fig.add_subplot(8, 8, i+1)
    im = act_relu[0,:,:,i].reshape((input_time_size,input_frequency_size))
    plt.imshow(im, cmap='jet')
plt.show()

# Single activation
for i in range(filters):
    fig=plt.figure(figsize=(8, 20))
    im = act_relu[0,:,:,i].reshape((input_time_size,input_frequency_size))
    plt.imshow(np.rot90(im), cmap='jet', interpolation='none')
    
    # Show a nice axis
    plt.xticks([i*input_time_size/10 for i in range(10)], range(0,1000,100))
    plt.yticks([i*input_frequency_size/80 for i in range(80)], range(8000,0,-100))
    
    plt.colorbar(fraction=0.02)
    plt.show()

#### Maxpool activations

In [None]:
act_maxpool = np.asarray(max_pool.eval(session=sess))

input_time_size = mfcc.shape[1]
input_frequency_size = mfcc.shape[2] 

# All activations
fig=plt.figure(figsize=(10, 10))
for i in range(filters):
    fig.add_subplot(8, 8, i+1)
    im = act_maxpool[0,:,:,i].reshape((int(np.ceil(input_time_size/2)),
                                       int(np.ceil(input_frequency_size/2))))
    plt.imshow(im, cmap='gray')
plt.show()

# All activations (big)
fig=plt.figure(figsize=(10, 64))
for i in range(filters):
    fig.add_subplot(1, 2, i+1)
    im = act_maxpool[0,:,:,i].reshape((int(np.ceil(input_time_size/2)),
                                       int(np.ceil(input_frequency_size/2))))
    plt.imshow(np.rot90(im), cmap='jet')
    #plt.colorbar(fraction=0.02)
plt.show()
    

### Hierarchy

#### First save filters of all models to images

In [None]:
filters = {}

for model in [2,4,8,16,32,64]:
    # Load model and restore first layer weights
    restore = load_model("C:/python/asr/model_data/spectrogram_{}f_36000s/speech_commands_train/conv.ckpt-36000".format(model),
                        first_layer_filters=model, mfcc=False)
    first_weights = restore["weights"]["first_weights"]
    
    for i in range(first_weights.shape[3]):
        f = first_weights[:,:,0,i] 
        
        # Save the filter as matrix to a dictionary
        filters['{}_{}'.format(model,i+1)] = f
        
        # Save weight as image for visualization
        im = (255.*(f-f.min())/(f.max()-f.min())).astype(np.uint8)
        imwrite('C:/python/asr/filters/spectrogram/{}/{}.png'.format(model,i+1), np.rot90(im))

#### Save ReLU activations of all models for input wav

In [6]:
# Functions for recoloring activations

def jetR(x):
    x /= 255
    return int(255.*plt.cm.jet(x)[0])

def jetG(x):
    x /= 255
    return int(255.*plt.cm.jet(x)[1])

def jetB(x):
    x /= 255
    return int(255.*plt.cm.jet(x)[2])

vr = np.vectorize(jetR, otypes=[np.uint8])
vg = np.vectorize(jetG, otypes=[np.uint8])
vb = np.vectorize(jetB, otypes=[np.uint8])

In [36]:
import os
    
keyword = "on"
files = ["c842b5e4_nohash_1","9a356ab9_nohash_0","ab5b211a_nohash_0","b665723d_nohash_0","b21f0fa4_nohash_0","30a09789_nohash_0","8e05039f_nohash_1"]

In [None]:
for fname in files:
    activations = {}
    input_wav = 'D:/tmp/speech_dataset/{}/{}.wav'.format(keyword,fname)
    output_path = "C:/python/asr/activations/{}/{}".format(keyword,fname) + "/{}/{}.png"

    for model in [2,4,8,16,32,64]:
        # Load model and get activations
        restore = load_model("C:/python/asr/model_data/spectrogram_{}f_36000s/speech_commands_train/conv.ckpt-36000".format(model),
                            first_layer_filters=model, mfcc=False)
        sess = restore["session"]
        mfcc, spec = wav_to_spectogram(input_wav, mode="original")
        fingerprint = create_fingerprint(spec)
        _, _, first_conv, first_relu, max_pool = predict(fingerprint, restore["weights"])

        act_relu = np.asarray(first_relu.eval(session=sess))
        input_time_size = spec.shape[1]
        input_frequency_size = spec.shape[2] 

        for i in range(act_relu.shape[3]):
            f = act_relu[0,:,:,i].reshape((input_time_size,input_frequency_size))

            # Save weight as image for visualization
            im = (255.*(f-f.min())/(f.max()-f.min()) ).astype(np.uint8)

            # colorize to jet
            jet = np.stack([vr(im),vg(im),vb(im)], axis=2)
            im = jet

            path = "C:/python/asr/activations/{}/{}".format(keyword,fname) + "/{}/".format(model)
            if not os.path.exists(path):
                os.makedirs(path)

            imwrite(output_path.format(model,i+1), np.rot90(im))     

#### Image-similarity measures

In [8]:
# Sum of squared differences
def SSE(A, B):
    return np.square(np.subtract(A, B)).sum()

In [9]:
# Sum of absolute differences
def SAE(A, B):
    return np.absolute(np.subtract(A, B)).sum()

In [10]:
# Cosine similarity
def cosine(A, B):
    return (1 - sp.distance.cdist(A, B, 'cosine'))[0,1]

#### Plot a hierarchy graph

In [None]:
dot = Digraph()

# Similarity measure to use
measure = SSE #SSE # requires argmin

# Load all images as graph nodes
for size in [64,32,16,8,4,2]:
    for i in range(size):
        dot.node('{}_{}'.format(size,i), image='C:/python/asr/filters/spectrogram/{}/{}.png'.format(size,i), 
                 label='\n\n{}'.format(i), style="setlinewidth(0)")

# For each parent-child layer
for pair in [(2,4), (4,8), (8,16), (16,32), (32,64)]:
    parent_n, child_n = pair
    # For each child in the child layer
    for child in range(1,child_n):
        # Calculate similarity with all parents
        similarities = [measure(filters['{}_{}'.format(child_n, child)],
                                filters['{}_{}'.format(parent_n, i)]) for i in range(1,parent_n)]
        # Choose the best parent
        parent = 1 + np.argmin(similarities)
        # Add graph edge from parent to child
        dot.edge('{}_{}'.format(parent_n, parent),'{}_{}'.format(child_n, child))

dot.render('{}_hierarchy_spectrogram.png'.format(measure.__name__), view=True)  

#### Activation hierarchy

In [38]:
for fname in files:
    for measure in [SSE, SAE, cosine]:
        dot = Digraph()

        # Similarity measure to use
        #measure = cosine #SSE # requires argmin

        # Load all images as graph nodes
        for size in [64,32,16,8,4,2]:
            for i in range(size):
                dot.node('{}_{}'.format(size,i), image='C:/python/asr/activations/{}/{}/{}/{}.png'.format(keyword,fname,size,i), 
                         label='\n\n\t\t\t\t{}'.format(i), style="setlinewidth(0)")

        # For each parent-child layer
        for pair in [(2,4), (4,8), (8,16), (16,32), (32,64)]:
            parent_n, child_n = pair
            # For each child in the child layer
            for child in range(1,child_n):
                # Calculate similarity with all parents
                similarities = [measure(filters['{}_{}'.format(child_n, child)],
                                        filters['{}_{}'.format(parent_n, i)]) for i in range(1,parent_n)]
                # Choose the best parent
                parent = 1 + np.argmin(similarities)
                # Add graph edge from parent to child
                dot.edge('{}_{}'.format(parent_n, parent),'{}_{}'.format(child_n, child))

        dot.render('C:/python/asr/activations/{}/{}/{}_activation_hierarchy.png'.format(keyword,fname,measure.__name__), view=False)  