# Implement Conv2D Layers

In [1]:
import pathlib
import numpy as np
import math
import h5py
from datetime import datetime

import tensorflow as tf 
from tensorflow.keras import layers

2024-04-28 10:49:04.946593: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: SSE4.1 SSE4.2 AVX AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
data_dir = pathlib.Path('data')

In [3]:
def get_waveform(wavfile):
    x = tf.io.read_file(str(wavfile))
    x, sample_rate = tf.audio.decode_wav(x, desired_channels=1, desired_samples=16000)
    return tf.squeeze(x, axis=-1)


def get_spectrogram(waveform):
    # Convert the waveform to a spectrogram via a STFT
    spectrogram = tf.signal.stft(waveform, frame_length=255, frame_step=128)

    # Obtain the magnitude of the STFT
    spectrogram = tf.abs(spectrogram)

    # Add a 'channels' dimension, so that the spectrogram can be used as an
    # image-like input data w/ convolution layers, which expect shape
    # (batch_size, height, width, channels)
    spectrogram = spectrogram[..., tf.newaxis]
    return spectrogram

def relu(x):
    return x.clip(0.0)

## Get the input data

In [4]:
waveform = get_waveform(data_dir/'yes.wav')
spec = get_spectrogram(waveform)
input_data = spec[tf.newaxis,...]
input_data.shape, input_data[0,0,:10,:]

(TensorShape([1, 124, 129, 1]),
 <tf.Tensor: shape=(10, 1), dtype=float32, numpy=
 array([[0.00087561],
        [0.00134371],
        [0.00557508],
        [0.01203688],
        [0.01582851],
        [0.01979508],
        [0.03313684],
        [0.05369601],
        [0.05009932],
        [0.03737277]], dtype=float32)>)

## Load the model

In [5]:
h5_model = tf.keras.models.load_model('simple-sr.h5')
h5_model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 resizing (Resizing)         (None, 32, 32, 1)         0         
                                                                 
 normalization (Normalizatio  (None, 32, 32, 1)        3         
 n)                                                              
                                                                 
 conv2d (Conv2D)             (None, 30, 30, 32)        320       
                                                                 
 conv2d_1 (Conv2D)           (None, 28, 28, 64)        18496     
                                                                 
 max_pooling2d (MaxPooling2D  (None, 14, 14, 64)       0         
 )                                                               
                                                                 
 dropout (Dropout)           (None, 14, 14, 64)        0

## Downsample and Normalize

In [6]:
# Downsample the input
l1 = h5_model.layers[0]
l1_out = l1(input_data)
l1_out[0,:3,:3,:]

<tf.Tensor: shape=(3, 3, 1), dtype=float32, numpy=
array([[[0.00606233],
        [0.02537882],
        [0.09502278]],

       [[0.00929757],
        [0.02589507],
        [0.49793145]],

       [[0.00940787],
        [0.02270582],
        [0.08615017]]], dtype=float32)>

In [34]:
# Normalize
l2 = h5_model.layers[1]
l2_out = l2(l1_out.numpy())
l2_out_1 = l2_out[0,0:3,0:3,:].numpy()
l2_out.shape, l2_out_1

(TensorShape([1, 32, 32, 1]),
 array([[[-0.15616861],
         [-0.13089252],
         [-0.0397617 ]],
 
        [[-0.15193523],
         [-0.13021699],
         [ 0.48745418]],
 
        [[-0.1517909 ],
         [-0.1343902 ],
         [-0.05137172]]], dtype=float32))

## Conv2D

### Get the filter and bias

In [35]:
l3 = h5_model.layers[2]
filter_1 = l3.weights[0].numpy()[:,:,:,0]
bias_1 = l3.weights[1].numpy()[0]
filter_1, bias_1

(array([[[-0.23518433],
         [ 0.09234667],
         [ 0.02250122]],
 
        [[-0.01212053],
         [-0.02103543],
         [ 0.06111678]],
 
        [[ 0.2551161 ],
         [ 0.13403681],
         [-0.07359912]]], dtype=float32),
 0.04490134)

### Run the tensorflow layer and view results

In [36]:
# run the tf conv2d layer and print the channel values for the first entry
t = datetime.now()
l3_out = l3(l2_out.numpy())
dt = datetime.now() - t
print('Time elapsed: %s' % dt)
print(l3_out.numpy().shape)
l3_out[0,0,0,:]

Time elapsed: 0:00:00.001706
(1, 30, 30, 32)


<tf.Tensor: shape=(32,), dtype=float32, numpy=
array([0.05006328, 0.        , 0.05499763, 0.06115919, 0.        ,
       0.20250143, 0.        , 0.12057613, 0.        , 0.        ,
       0.13808972, 0.        , 0.16267581, 0.        , 0.04060512,
       0.09993156, 0.        , 0.02537329, 0.0487398 , 0.05872676,
       0.0784109 , 0.12273581, 0.15963419, 0.01881824, 0.18096109,
       0.07243733, 0.        , 0.        , 0.05815995, 0.0821218 ,
       0.        , 0.07613287], dtype=float32)>

### Calculate the first entry's value

In [37]:
# calculate the first value
np.sum(l2_out_1 * filter_1) + bias_1

0.050063286

### Calculate the channel values for the first entry

In [38]:
# calculate the channel values for the first entry
channels = l3.weights[0].numpy().shape[-1]
test_output = np.zeros(channels)
for i in range(channels):
    filter = l3.weights[0].numpy()[:,:,:,i]
    bias = l3.weights[1].numpy()[i]
    test_output[i] = relu(np.sum(l2_out_1 * filter) + bias)
test_output

array([0.05006329, 0.        , 0.05499763, 0.06115918, 0.        ,
       0.20250145, 0.        , 0.12057613, 0.        , 0.        ,
       0.13808972, 0.        , 0.16267581, 0.        , 0.04060512,
       0.09993156, 0.        , 0.02537329, 0.0487398 , 0.05872676,
       0.07841089, 0.12273582, 0.1596342 , 0.01881824, 0.18096109,
       0.07243733, 0.        , 0.        , 0.05815995, 0.0821218 ,
       0.        , 0.07613287])

### Implement conv2d

In [39]:
# expects a 4 dimensional input (groups, rows, cols, channels)
# and filters (rows, cols, in_channels, out_channels)
# biases is 1 dimensional matching the number of channels or None
def conv2d(input, filters, biases):
    output_shape = list(input.shape)
    # the edges are not padded, so only points that the filters can cover are included
    window_row_size = filters.shape[0]
    window_col_size = filters.shape[1]
    window_row_margin = window_row_size // 2
    window_col_margin = window_col_size // 2
    output_shape[1] -= window_row_margin * 2
    output_shape[2] -= window_col_margin * 2
    # number of channels
    channels = filters.shape[-1]
    output_shape[-1] = channels
    output = np.zeros(output_shape)

    groups = input.shape[0]

    debug = True
    for g in range(groups):
        for r in range(output_shape[1]):  # skip unpadded edges
            for c in range(output_shape[2]):  # skip unpadded edges

                input_window = input[g, r:r+window_row_size, c:c+window_col_size, :]

                for ch in range(channels):
                    filter = filters[:,:,:,ch]
                    try:
                        bias = biases[ch]
                    except:
                        bias = 0.0

                    if debug:
                        print('in shape: %s, filter: %s' % (input_window.shape, filter.shape))
                        debug = False
                    val = relu(
                        np.sum(input_window * filter) + bias
                    )

                    output[g,r,c,ch] = val

    return output

### Test the implementation

In [40]:
print('input shape: %s' % str(l2_out.numpy().shape))
print('weights shape: %s\n' % str(l3.weights[0].numpy().shape))

t = datetime.now()
myl3out = conv2d(l2_out.numpy(), l3.weights[0].numpy(), l3.weights[1].numpy())
dt = datetime.now() - t
print('Time elapsed: %s' % dt)
print(myl3out.shape)
myl3out[0,0,0,:]

input shape: (1, 32, 32, 1)
weights shape: (3, 3, 1, 32)

in shape: (3, 3, 1), filter: (3, 3, 1)
Time elapsed: 0:00:00.594681
(1, 30, 30, 32)


array([0.05006329, 0.        , 0.05499763, 0.06115918, 0.        ,
       0.20250145, 0.        , 0.12057613, 0.        , 0.        ,
       0.13808972, 0.        , 0.16267581, 0.        , 0.04060512,
       0.09993156, 0.        , 0.02537329, 0.0487398 , 0.05872676,
       0.07841089, 0.12273582, 0.1596342 , 0.01881824, 0.18096109,
       0.07243733, 0.        , 0.        , 0.05815995, 0.0821218 ,
       0.        , 0.07613287])

In [41]:
# verify values
def verify_arrays(a1, a2, tolerance=1e-5):
    shapes_equal = a1.shape == a2.shape
    print('Shapes match: %s' % str(shapes_equal))
    if not shapes_equal:
        return

    print('Values match: %s' % str(np.all(a1 - a2 < tolerance)))

In [42]:
verify_arrays(l3_out.numpy(), myl3out)

Shapes match: True
Values match: True


## The next Conv2D layer 

### Run the tensorflow layer and view results

In [43]:
l4 = h5_model.layers[3]
t = datetime.now()
l4_out = l4(l3_out.numpy())
dt = datetime.now() - t
print('Time elapsed: %s' % dt)
print(l4_out.shape)
l4_out[0,0,0,:]

Time elapsed: 0:00:00.002627
(1, 28, 28, 64)


<tf.Tensor: shape=(64,), dtype=float32, numpy=
array([0.        , 0.13606277, 0.1669277 , 0.        , 0.02371086,
       0.        , 0.        , 0.02051658, 0.        , 0.        ,
       0.        , 0.01705355, 0.        , 0.        , 0.        ,
       0.        , 0.        , 0.        , 0.        , 0.        ,
       0.        , 0.01502133, 0.        , 0.2531605 , 0.        ,
       0.        , 0.        , 0.        , 0.        , 0.        ,
       0.        , 0.        , 0.        , 0.        , 0.        ,
       0.185608  , 0.        , 0.        , 0.        , 0.14853501,
       0.19258422, 0.19711518, 0.2783924 , 0.        , 0.        ,
       0.        , 0.06133013, 0.        , 0.        , 0.        ,
       0.        , 0.        , 0.        , 0.        , 0.        ,
       0.1239159 , 0.00247141, 0.31450033, 0.        , 0.        ,
       0.        , 0.        , 0.        , 0.1799986 ], dtype=float32)>

### Test the implementation

In [44]:
print('input shape: %s' % str(l3_out.numpy().shape))
print('weights shape: %s\n' % str(l4.weights[0].numpy().shape))

t = datetime.now()
myl4out = conv2d(myl3out, l4.weights[0].numpy(), l4.weights[1].numpy())
dt = datetime.now() - t
print('Time elapsed: %s' % dt)
print(myl4out.shape)
myl4out[0,0,0,:]

input shape: (1, 30, 30, 32)
weights shape: (3, 3, 32, 64)

in shape: (3, 3, 32), filter: (3, 3, 32)
Time elapsed: 0:00:01.084800
(1, 28, 28, 64)


array([0.        , 0.13606275, 0.1669277 , 0.        , 0.02371087,
       0.        , 0.        , 0.02051658, 0.        , 0.        ,
       0.        , 0.01705355, 0.        , 0.        , 0.        ,
       0.        , 0.        , 0.        , 0.        , 0.        ,
       0.        , 0.01502134, 0.        , 0.25316052, 0.        ,
       0.        , 0.        , 0.        , 0.        , 0.        ,
       0.        , 0.        , 0.        , 0.        , 0.        ,
       0.185608  , 0.        , 0.        , 0.        , 0.14853505,
       0.19258418, 0.1971152 , 0.27839242, 0.        , 0.        ,
       0.        , 0.06133016, 0.        , 0.        , 0.        ,
       0.        , 0.        , 0.        , 0.        , 0.        ,
       0.12391588, 0.0024714 , 0.31450042, 0.        , 0.        ,
       0.        , 0.        , 0.        , 0.17999861])

In [45]:
verify_arrays(l4_out.numpy(), myl4out)

Shapes match: True
Values match: True
