In [1]:
%matplotlib inline


# Keyword spotting



We first import all the necessary modules

In [2]:

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchaudio
import sys
<
import matplotlib.pyplot as plt
import IPython.display as ipd

from tqdm import tqdm

We check if CUDA is available and if it's the case then all our calculus will be way faster


In [3]:
torch.zeros(1).cuda()

tensor([0.], device='cuda:0')

In [4]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


A data point in the SPEECHCOMMANDS dataset is a tuple made of a waveform
(the audio signal), the sample rate, the utterance (label), the ID of
the speaker, the number of the utterance.

We can visualize the first one of the dataset


Here we can list of labels available in the dataset.




In [5]:
# labels = sorted(list(set(datapoint[2] for datapoint in train_set)))
labels = ['down',
            'left',
            'off',
            'on',
            'right',
            'silence',
            'stop',
            'unknown',
            'up',
            'yes']
labels




['down',
 'left',
 'off',
 'on',
 'right',
 'silence',
 'stop',
 'unknown',
 'up',
 'yes']

We are encoding each word using its index in the list of labels.




In [6]:
def label_to_index(word):
    # Return the position of the word in labels
    return torch.tensor(labels.index(word))


def index_to_label(index):
    # Return the word corresponding to the index in labels
    # This is the inverse of label_to_index
    return labels[index]


word_start = "unknown"
index = label_to_index(word_start)
word_recovered = index_to_label(index)

print(word_start, "-->", index, "-->", word_recovered)

unknown --> tensor(7) --> unknown


To turn a list of data point made of audio recordings and utterances
into two batched tensors for the model, we implement a collate function
which is used by the PyTorch DataLoader that allows us to iterate over a
dataset by batches. Please see [the
documentation](https://pytorch.org/docs/stable/data.html#working-with-collate-fn)_
for more information about working with a collate function.

In the collate function, we also apply the resampling, and the text
encoding.




Now that we have a training function, we need to make one for testing
the networks accuracy. We will set the model to ``eval()`` mode and then
run inference on the test dataset. Calling ``eval()`` sets the training
variable in all modules in the network to false. Certain layers like
batch normalization and dropout layers behave differently during
training so this step is crucial for getting correct results.




## Here we load the model (trained with the same technique in another notebook) :
Here, a CNN model trained with 21 epochs

In [7]:
import torchaudio.transforms as T
import torch.nn.functional as F


class M5(nn.Module):
    def __init__(self, n_input=1, n_output=35, n_channel=32):
        super().__init__()
        self.conv1 = nn.Conv2d(n_input, n_channel, kernel_size=(3, 3), padding=(1, 1))
        self.bn1 = nn.BatchNorm2d(n_channel)
        self.pool1 = nn.MaxPool2d((2, 2))
        self.conv2 = nn.Conv2d(n_channel, n_channel, kernel_size=(3, 3), padding=(1, 1))
        self.bn2 = nn.BatchNorm2d(n_channel)
        self.pool2 = nn.MaxPool2d((2, 2))
        self.conv3 = nn.Conv2d(n_channel, 2 * n_channel, kernel_size=(3, 3), padding=(1, 1))
        self.bn3 = nn.BatchNorm2d(2 * n_channel)
        self.pool3 = nn.MaxPool2d((2, 2))
        self.conv4 = nn.Conv2d(2 * n_channel, 2 * n_channel, kernel_size=(3, 3), padding=(1, 1))
        self.bn4 = nn.BatchNorm2d(2 * n_channel)
        self.pool4 = nn.MaxPool2d((2, 2))
        self.fc1 = nn.Linear(2 * n_channel, n_output)

    def forward(self, x):
        # print(f"Input shape: {x.shape}")
        x = self.conv1(x)
        x = F.relu(self.bn1(x))
        # print(f"Shape after conv1: {x.shape}")
        x = self.pool1(x)
        # print(f"Shape after pool1: {x.shape}")
        x = self.conv2(x)
        x = F.relu(self.bn2(x))
        # print(f"Shape after conv2: {x.shape}")
        x = self.pool2(x)
        # print(f"Shape after pool2: {x.shape}")
        x = self.conv3(x)
        x = F.relu(self.bn3(x))
        # print(f"Shape after conv3: {x.shape}")
        x = self.pool3(x)
        # print(f"Shape after pool3: {x.shape}")
        x = self.conv4(x)
        x = F.relu(self.bn4(x))
        # print(f"Shape after conv4: {x.shape}")
        x = self.pool4(x)
        # print(f"Shape after pool4: {x.shape}")
        x = F.adaptive_avg_pool2d(x, 1).squeeze()
        # print(f"Shape after adaptive_avg_pool2d: {x.shape}")
        if len(x.shape) == 1:
            x = x.unsqueeze(0)
        x = self.fc1(x)
        # print(f"Shape before log_softmax: {x.shape}")
        return F.log_softmax(x, dim=1)


# Initialize the model
n_input = 1  # Adjust according to your input data shape
n_output = len(labels) # Adjust according to the number of output classes
model = M5(n_input=n_input, n_output=n_output)
# model.to(device)



# Load the saved model weights
model.load_state_dict(torch.load('modelMFCCNoisySilenceSet90.pth'))

# Set the model to evaluation mode
model.eval()

# If you are using a GPU, move the model to GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

print("Model loaded and ready for inference.")

print(model)


def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)


n = count_parameters(model)
print("Number of parameters: %s" % n)

def get_likely_index(tensor):
    return tensor.argmax(dim=-1)


def pad_or_trim(tensor, target_length):
    if tensor.size(-1) > target_length:
        return tensor[:, :target_length]
    elif tensor.size(-1) < target_length:
        return F.pad(tensor, (0, target_length - tensor.size(-1)))
    return tensor



class CustomMFCCTransform(nn.Module):
    def __init__(self, sample_rate, n_mfcc, n_fft, n_mels, hop_length):
        super().__init__()
        self.mfcc_transform = torchaudio.transforms.MFCC(
            sample_rate=sample_rate,
            n_mfcc=n_mfcc,
            melkwargs={
                "n_fft": n_fft,
                "n_mels": n_mels,
                "hop_length": hop_length,
                "mel_scale": "htk",
            },
        )

    def forward(self, x):
        device = x.device  # Get the device of the input tensor
        self.mfcc_transform.to(device)
        x = self.mfcc_transform(x)  # Apply MFCC
        return x

n_fft = 2048
win_length = None
hop_length = 512
n_mels = 256
n_mfcc = 256
sample_rate = 16000

# Define the custom MFCC transformation
transform = CustomMFCCTransform(
    sample_rate=sample_rate,
    n_mfcc=n_mfcc,
    n_fft=n_fft,
    n_mels=n_mels,
    hop_length=hop_length
)


def predict(tensor):
    target_length = 16000  # Assurez-vous que toutes les entrées ont une longueur cohérente
    tensor = pad_or_trim(tensor, target_length)
    # print(f"Original tensor shape: {tensor.shape}")
    tensor = tensor.to(device)
    tensor = transform(tensor)
    # print(f"Shape after transform: {tensor.shape}")
    tensor = model(tensor.unsqueeze(0))
    tensor = get_likely_index(tensor)
    tensor = index_to_label(tensor.squeeze())
    return tensor


# waveform, sample_rate, utterance, *_ = train_set[-1]
# ipd.Audio(waveform.numpy(), rate=sample_rate)

# print(f"Expected: {utterance}. Predicted: {predict(waveform)}.")

# waveform, sample_rate, utterance, *_ = test_set[931]
# ipd.Audio(waveform.numpy(), rate=sample_rate)
# print(f"data {40} : Expected: {utterance}. Predicted: {predict(waveform)}.")





Model loaded and ready for inference.
M5(
  (conv1): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (bn1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (pool1): MaxPool2d(kernel_size=(2, 2), stride=(2, 2), padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (bn2): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (pool2): MaxPool2d(kernel_size=(2, 2), stride=(2, 2), padding=0, dilation=1, ceil_mode=False)
  (conv3): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (bn3): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (pool3): MaxPool2d(kernel_size=(2, 2), stride=(2, 2), padding=0, dilation=1, ceil_mode=False)
  (conv4): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (bn4): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (pool

In [8]:
from pvrecorder import PvRecorder
import numpy as np 

def preprocess_audio(audio_data, sample_rate=16000):
    # Convertir en tensor
    waveform = torch.tensor(audio_data, dtype=torch.float32)
    waveform = waveform / 32768.0  # Normaliser
    return waveform


recorder = None
i = 0
frame_length = 16000 // 1  # Taille du pas (0.1 seconde à une fréquence d'échantillonnage de 16000 Hz)
window_size = 16000  # Taille de la fenêtre (1 seconde à une fréquence d'échantillonnage de 16000 Hz)

audio_buffer = np.zeros(window_size, dtype=np.int16)

try:
    recorder = PvRecorder(frame_length=frame_length, device_index=1)
    recorder.start()
    print('[Listening ...]')

    while True:
        i += 1
        pcm = recorder.read()
        audio_buffer = np.roll(audio_buffer, -frame_length)
        audio_buffer[-frame_length:] = pcm
        pcm2 = preprocess_audio(audio_buffer.tolist())

        if pcm2.dim() == 1:
            pcm2 = pcm2.unsqueeze(0)

        output = predict(pcm2)
        print(f"Data point #{i} predicted: {output}.")

except KeyboardInterrupt:
    sys.stdout.write('\b' * 2)
    print('Stopping ...')

finally:
    if recorder is not None:
        recorder.delete()
        print('Recorder deleted.')


[Listening ...]
Data point #1 predicted: up.
Data point #2 predicted: silence.
Data point #3 predicted: silence.
Data point #4 predicted: silence.
Data point #5 predicted: silence.
Stopping ...
Recorder deleted.


In [10]:
import pygame
import time
import random

pygame.init()

white = (255, 255, 255)
yellow = (255, 255, 102)
black = (0, 0, 0)
red = (213, 50, 80)
green = (0, 255, 0)
blue = (50, 153, 213)

dis_width = 800
dis_height = 600

dis = pygame.display.set_mode((dis_width, dis_height))
pygame.display.set_caption('Snake Game with Voice Commands')

clock = pygame.time.Clock()
snake_block = 10
snake_speed = 15

font_style = pygame.font.SysFont("bahnschrift", 25)
score_font = pygame.font.SysFont("comicsansms", 35)

def our_snake(snake_block, snake_list):
    for x in snake_list:
        pygame.draw.rect(dis, black, [x[0], x[1], snake_block, snake_block])

def message(msg, color):
    mesg = font_style.render(msg, True, color)
    dis.blit(mesg, [dis_width / 6, dis_height / 3])

def gameLoop():
    global audio_buffer, recorder, frame_length, window_size

    game_over = False
    game_close = False

    x1 = dis_width / 2
    y1 = dis_height / 2

    x1_change = 0
    y1_change = 0

    snake_List = []
    Length_of_snake = 1

    foodx = round(random.randrange(0, dis_width - snake_block) / 10.0) * 10.0
    foody = round(random.randrange(0, dis_height - snake_block) / 10.0) * 10.0

    frame_length = 16000 // 1
    window_size = 16000
    audio_buffer = np.zeros(window_size, dtype=np.int16)

    recorder = PvRecorder(frame_length=frame_length, device_index=0)
    recorder.start()
    print('[Listening ...]')

    try:
        while not game_over:

            while game_close:
                dis.fill(blue)
                message("You Lost! Press Q-Quit or C-Play Again", red)
                pygame.display.update()

                for event in pygame.event.get():
                    if event.type == pygame.KEYDOWN:
                        if event.key == pygame.K_q:
                            game_over = True
                            game_close = False
                        if event.key == pygame.K_c:
                            gameLoop()
                            return

            for event in pygame.event.get():
                if event.type == pygame.QUIT:
                    game_over = True
                if event.type == pygame.KEYDOWN:
                    if event.key == pygame.K_LEFT and x1_change == 0:
                        x1_change = -snake_block
                        y1_change = 0
                    elif event.key == pygame.K_RIGHT and x1_change == 0:
                        x1_change = snake_block
                        y1_change = 0
                    elif event.key == pygame.K_UP and y1_change == 0:
                        y1_change = -snake_block
                        x1_change = 0
                    elif event.key == pygame.K_DOWN and y1_change == 0:
                        y1_change = snake_block
                        x1_change = 0

            pcm = recorder.read()
            audio_buffer = np.roll(audio_buffer, -frame_length)
            audio_buffer[-frame_length:] = pcm
            pcm2 = preprocess_audio(audio_buffer.tolist())

            if pcm2.dim() == 1:
                pcm2 = pcm2.unsqueeze(0)

            output = predict(pcm2)
            print(output)

            if output == 'left' and x1_change == 0:
                x1_change = -snake_block
                y1_change = 0
            elif output == 'right' and x1_change == 0:
                x1_change = snake_block
                y1_change = 0
            elif output == 'up' and y1_change == 0:
                y1_change = -snake_block
                x1_change = 0
            elif output == 'down' and y1_change == 0:
                y1_change = snake_block
                x1_change = 0

            if x1 >= dis_width or x1 < 0 or y1 >= dis_height or y1 < 0:
                game_close = True
            x1 += x1_change
            y1 += y1_change
            dis.fill(blue)
            pygame.draw.rect(dis, green, [foodx, foody, snake_block, snake_block])
            snake_Head = []
            snake_Head.append(x1)
            snake_Head.append(y1)
            snake_List.append(snake_Head)
            if len(snake_List) > Length_of_snake:
                del snake_List[0]

            for x in snake_List[:-1]:
                if x == snake_Head:
                    game_close = True

            our_snake(snake_block, snake_List)

            pygame.display.update()

            if x1 == foodx and y1 == foody:
                foodx = round(random.randrange(0, dis_width - snake_block) / 10.0) * 10.0
                foody = round(random.randrange(0, dis_height - snake_block) / 10.0) * 10.0
                Length_of_snake += 1

            clock.tick(snake_speed)

    except KeyboardInterrupt:
        sys.stdout.write('\b' * 2)
        print('Stopping ...')

    finally:
        if recorder is not None:
            recorder.delete()
            print('Recorder deleted.')

    pygame.quit()

gameLoop()

[Listening ...]
silence
silence
silence
silence
silence
silence
silence
silence
silence
silence
silence
silence
silence
silence
silence
silence
silence
left
silence
silence
silence
silence
silence
silence
silence
silence
up
silence
silence
silence
silence
silence
silence
silence
silence
silence
silence
silence
silence
silence
silence
Stopping ...
Recorder deleted.
