In [None]:
!pip install sounddevice
!pip install pydub

In [None]:
import tensorflow as tf
import tensorflow_datasets as tfds
import numpy as np
#import sounddevice as sd
import scipy.signal
import time

import IPython
import random
from collections import deque, namedtuple
import matplotlib.pyplot as plt


In [None]:
def ds_cnn(input_shape, num_classes):
    inputs = tf.keras.Input(shape=input_shape)
    x = tf.keras.layers.Conv2D(64, (10, 4), padding='same', activation='relu')(inputs)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.DepthwiseConv2D((3, 3), padding='same', activation='relu')(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Conv2D(64, (1, 1), activation='relu')(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.GlobalAveragePooling2D()(x)
    outputs = tf.keras.layers.Dense(num_classes, activation='softmax')(x)
    return tf.keras.Model(inputs, outputs)

In [None]:
#Preprocess audio files from training dataset, not audio recording
def preprocess(audio, label):
    audio = tf.cast(audio, tf.float32) / 32768.0
    audio = tf.reshape(audio, [-1])
    audio = audio[:16000]
    zero_padding = tf.zeros([16000] - tf.shape(audio), dtype=tf.float32)
    audio = tf.concat([audio, zero_padding], 0)

    stft = tf.signal.stft(audio, frame_length=640, frame_step=320, fft_length=1024)
    spectrogram = tf.abs(stft)
    num_mel_bins = 40
    linear_to_mel_weight_matrix = tf.signal.linear_to_mel_weight_matrix(
        num_mel_bins, spectrogram.shape[-1], 16000, 20.0, 4000.0)
    mel_spectrogram = tf.tensordot(spectrogram, linear_to_mel_weight_matrix, 1)
    mel_spectrogram.set_shape(spectrogram.shape[:-1].concatenate([num_mel_bins]))

    log_mel_spectrogram = tf.math.log(mel_spectrogram + 1e-6)
    mfccs = tf.signal.mfccs_from_log_mel_spectrograms(log_mel_spectrogram)[..., :10]
    mfccs = tf.ensure_shape(mfccs, [49, 10])
    mfccs = tf.expand_dims(mfccs, -1)

    label = tf.cast(label, tf.int64)
    label = tf.ensure_shape(label, [])
    return mfccs, label

In [None]:
def load_dataset():
    ds_train, ds_test = tfds.load(
        'speech_commands',
        split=['train[:5%]', 'train[5%:6%]'],  # small slice for demo
        as_supervised=True,
        with_info=False
    )
    AUTOTUNE = tf.data.AUTOTUNE
    ds_train = ds_train.map(preprocess, num_parallel_calls=AUTOTUNE)
    ds_train = ds_train.batch(32, drop_remainder=True).cache().prefetch(AUTOTUNE)
    ds_test = ds_test.map(preprocess, num_parallel_calls=AUTOTUNE)
    ds_test = ds_test.batch(32, drop_remainder=True).cache().prefetch(AUTOTUNE)
    return ds_train, ds_test

In [None]:
def train_model():
    ds_train, ds_test = load_dataset()
    for _, y in ds_train.take(1):
        num_classes = int(tf.reduce_max(y).numpy()) + 1

    model = ds_cnn((49, 10, 1), num_classes)
    model.compile(optimizer='adam',
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])
    model.summary()

    model.fit(ds_train, validation_data=ds_test, epochs=5)
    model.save("kws_ds_cnn.h5")

    # Convert to TFLite
    converter = tf.lite.TFLiteConverter.from_keras_model(model)
    tflite_model = converter.convert()
    with open("kws_ds_cnn.tflite", "wb") as f:
        f.write(tflite_model)
    print("Saved kws_ds_cnn.tflite")

In [None]:
train_model()

In [None]:
SAMPLE_RATE = 16000
RECORD_SECONDS = 1
NUM_MFCC = 10

interpreter = tf.lite.Interpreter(model_path="kws_ds_cnn.tflite")
interpreter.allocate_tensors()

input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

In [None]:
def record_audio():
    print("Recording...")
    audio = sd.rec(int(SAMPLE_RATE * RECORD_SECONDS), samplerate=SAMPLE_RATE, channels=1, dtype='float32')
    sd.wait()
    print("Recording done.")
    return audio.flatten()

In [None]:
import pydub
from IPython.display import Javascript, Audio
from google.colab import output
from base64 import b64decode
from io import BytesIO

RECORD = """
const sleep  = time => new Promise(resolve => setTimeout(resolve, time))
const b2text = blob => new Promise(resolve => {
  const reader = new FileReader()
  reader.onloadend = e => resolve(e.srcElement.result)
  reader.readAsDataURL(blob)
})
var record = time => new Promise(async resolve => {
  stream = await navigator.mediaDevices.getUserMedia({ audio: true })
  recorder = new MediaRecorder(stream)
  chunks = []
  recorder.ondataavailable = e => chunks.push(e.data)
  recorder.start()
  await sleep(time)
  recorder.onstop = async ()=>{
    blob = new Blob(chunks)
    text = await b2text(blob)
    resolve(text)
  }
  recorder.stop()
})
"""
def colab_record(sec = 5):
  sec += 2
  print('Begining Recording')
  display(Javascript(RECORD))
  s = output.eval_js('record(%d)' % (sec*1000))
  print('Done!')
  # convert to numpy array
  b = b64decode(s.split(',')[1])
  audio = pydub.AudioSegment.from_file(BytesIO(b))
  audio = np.asarray(audio.get_array_of_samples(), dtype='float32')
  # calculate sample rate
  rate = audio.size / sec
  # remove the zeros at the begining of the recording
  audio = np.trim_zeros(audio)
  return audio.flatten(), rate

In [None]:
#Preprocess audio from mic for inference! no training!
def preprocess_audio(audio):
    # Pad or truncate to 16000 samples
    if len(audio) < SAMPLE_RATE:
        audio = np.pad(audio, (0, SAMPLE_RATE - len(audio)), mode='constant')
    else:
        audio = audio[:SAMPLE_RATE]

    stft = tf.signal.stft(audio, frame_length=640, frame_step=320, fft_length=1024)
    spectrogram = tf.abs(stft)

    mel_weight_matrix = tf.signal.linear_to_mel_weight_matrix(
        num_mel_bins=40,
        num_spectrogram_bins=stft.shape[-1],
        sample_rate=SAMPLE_RATE,
        lower_edge_hertz=20.0,
        upper_edge_hertz=4000.0
    )

    mel_spectrogram = tf.tensordot(spectrogram, mel_weight_matrix, 1)
    log_mel_spectrogram = tf.math.log(mel_spectrogram + 1e-6)

    mfccs = tf.signal.mfccs_from_log_mel_spectrograms(log_mel_spectrogram)[..., :NUM_MFCC]
    mfccs = mfccs.numpy()

    mfccs = mfccs[:49]  # Trim or pad time axis
    if mfccs.shape[0] < 49:
        mfccs = np.pad(mfccs, ((0, 49 - mfccs.shape[0]), (0, 0)), mode='constant')

    mfccs = mfccs.reshape(1, 49, NUM_MFCC, 1).astype(np.float32)
    return mfccs

In [None]:
def run_inference(mfccs):
    interpreter.set_tensor(input_details[0]['index'], mfccs)
    interpreter.invoke()
    output = interpreter.get_tensor(output_details[0]['index'])
    prediction = np.argmax(output)
    confidence = np.max(output)
    return prediction, confidence

In [None]:
#Maze Logic
Edge = tuple
Tree = set
Maze = namedtuple('Maze', 'width, height, edges')
Square = tuple

def edge(A, B) -> Edge: return Edge(sorted([A, B]))

def random_tree(nodes, neighbors, pop=deque.pop) -> Tree:
    """Repeat: pop a node and add edge(node, nbr) until all nodes have been added to tree."""
    tree = Tree()
    nodes = set(nodes)
    root = nodes.pop()
    frontier = deque([root])
    while nodes:
        node = pop(frontier)
        nbrs = neighbors(node) & nodes
        if nbrs:
            nbr = random.choice(list(nbrs))
            tree.add(edge(node, nbr))
            nodes.remove(nbr)
            frontier.extend([node, nbr])
    return tree

def neighbors4(square) -> {Square}:
    """The 4 neighbors of an (x, y) square."""
    (x, y) = square
    return {(x + 1, y), (x - 1, y), (x, y + 1), (x, y - 1)}

def grid(width, height) -> {Square}:
    """All squares in a grid of these dimensions."""
    return {(x, y) for x in range(width) for y in range(height)}

def random_maze(width, height, pop=deque.pop) -> Maze:
    """Generate a random maze, using random_tree."""
    tree = random_tree(grid(width, height), neighbors4, pop)
    return Maze(width, height, tree)

def transpose(matrix): return list(zip(*matrix))

def plot_wall(s1, s2):
    """Plot a wall: a black line between squares s1 and s2."""
    (x1, y1), (x2, y2) = s1, s2
    if x1 == x2: # horizontal wall
        y = max(y1, y2)
        X, Y = [x1, x1+1], [y, y]
    else: # vertical wall
        x = max(x1, x2)
        X, Y = [x, x], [y1, y1+1]
    plt.plot(X, Y, 'k-', linewidth=2)

def plot_maze(maze, figsize=None, path=None):
    """Plot a maze by drawing lines between adjacent squares, except for pairs in maze.edges"""
    w, h  = maze.width, maze.height
    plt.figure(figsize=figsize or (w/5, h/5))
    plt.axis('off')
    plt.gca().invert_yaxis()
    exits = {edge((0, 0), (0, -1)), edge((w-1, h-1), (w-1, h))}
    edges = maze.edges | exits
    for sq in grid(w, h):
        for nbr in neighbors4(sq):
            if edge(sq, nbr) not in edges:
                plot_wall(sq, nbr)
    if path: # Plot the solution (or any path) as a red line through the maze
        X, Y = transpose((x + 0.5, y + 0.5) for (x, y) in path)
        plt.plot(X, Y, 'r-', linewidth=2)

def show_game(M, player):
  plot_maze(M, figsize=(6, 6))
  player.plot()
  plt.show()

def check_move(move):
  current_x, current_y = player.position
  new_position = player.position

  if move.lower() == 'right':
      new_position = (current_x + 1, current_y)
  elif move.lower() == 'left':
      new_position = (current_x - 1, current_y)
  elif move.lower() == 'up':
      new_position = (current_x, current_y - 1)
  elif move.lower() == 'down':
      new_position = (current_x, current_y + 1)

  return new_position

In [None]:
#Player Class
class Player: #position of 9, 9 signifies the end
    def __init__(self, maze, start_position=(0, 0)):
        self.maze = maze
        self.position = start_position

    def move(self, new_position):
        if new_position in neighbors4(self.position):
            # Check if the new position is within the maze boundaries
            if 0 <= new_position[0] < self.maze.width and 0 <= new_position[1] < self.maze.height:
                # Check if there's a wall between the current and new positions
                if edge(self.position, new_position) in self.maze.edges:
                    self.position = new_position

    def plot(self):
        x, y = self.position
        plt.plot(x + 0.5, y + 0.5, 'bo', markersize=10)  # Blue circle

In [None]:
#Create Maze and Player
M = random_maze(5, 5)
player = Player(M)

In [None]:
labels = ['yes', 'no', 'up', 'down', 'left', 'right', 'on', 'off', 'stop', 'go', 'unknown', 'silence']

while True:
    show_game(M, player) #game
    audio, rate = colab_record(1)
    mfccs = preprocess_audio(audio)
    pred, conf = run_inference(mfccs)
    new_position = check_move(labels[pred]) #game
    player.move(new_position) #game
    print(f"Prediction: {labels[pred]} (Confidence: {conf:.2f})")
    display(Audio(data=audio, rate=rate))
    #input()
    IPython.display.clear_output(wait=True) #game
    time.sleep(1)