In [1]:
!pip install librosa
!pip install sounddevice soundfile numpy

Collecting sounddevice
  Downloading sounddevice-0.5.3-py3-none-win_amd64.whl.metadata (1.6 kB)
Downloading sounddevice-0.5.3-py3-none-win_amd64.whl (364 kB)
Installing collected packages: sounddevice
Successfully installed sounddevice-0.5.3


In [21]:
import sounddevice as sd
import numpy as np
import queue
import time

SAMPLE_RATE = 16000 # Please don't change
WINDOW_SIZE = 3.0
STRIDE = 1.0

MIC_RATE = 44100

WINDOW_SAMPLES = SAMPLE_RATE * WINDOW_SIZE
STRIDE_SAMPLES = SAMPLE_RATE * STRIDE


<h3>MEL</h3>

In [4]:
import librosa
def generate_mel(y):
    mel = librosa.feature.melspectrogram(
            y=y,
            sr=SAMPLE_RATE,
            n_fft=512,
            hop_length=160,
            n_mels=40
        )
    mel = librosa.power_to_db(mel, ref=np.max).astype(np.float32)
    return torch.from_numpy(mel).float()

<h3>Model</h3>

In [5]:
from torch import nn
import torch    

class MBConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels, expansion=4, kernel_size=3, stride=1):
        super().__init__()
        hidden_dim = in_channels * expansion
        padding = kernel_size // 2
        
        self.use_residual = (stride == 1 and in_channels == out_channels)
        
        self.block = nn.Sequential(
            nn.Conv2d(in_channels, hidden_dim, kernel_size=1, bias=False),
            nn.BatchNorm2d(hidden_dim),
            nn.ReLU(inplace=True),
            
            nn.Conv2d(hidden_dim, hidden_dim, kernel_size=kernel_size, padding=padding, stride=stride),
            nn.BatchNorm2d(hidden_dim),
            nn.ReLU(inplace=True),
            
            nn.Conv2d(hidden_dim, out_channels, kernel_size=1, bias=False),
            nn.BatchNorm2d(out_channels)
        )
    def forward(self, x):
        out = self.block(x)
        if self.use_residual:
            out += x
        return out
    
class CRNN(nn.Module):
    def __init__(self, num_classes = 1):
        super().__init__()
        self.cnn = nn.Sequential(
            MBConvBlock(1,16, expansion=4),
            MBConvBlock(16, 32, expansion=4,stride=2),
            MBConvBlock(32, 64, expansion=4,stride=2)
        )
        self.rnn = nn.GRU(64, 256, batch_first=True, bidirectional=True)
        self.fc = nn.Sequential(
            nn.Linear(512, 64),
            nn.ReLU(),
            nn.Linear(64, num_classes)
        )
    def forward(self, x):
        x = self.cnn(x)
        x = x.mean(dim=2)
        x = x.permute(0, 2, 1)
        _, h = self.rnn(x)
        h = h.permute(1, 0, 2)
        h = h.contiguous().view(x.size(0), -1)
        out = self.fc(h)
        return out
        
        
        
model = CRNN()
model.load_state_dict(torch.load("trained_models/worst_crnn_with_mbconv_model.pth"))
model.eval()

def wake_word_detect(data):
    mel = generate_mel(data).unsqueeze(0).unsqueeze(0)
    with torch.no_grad():
        out = model(mel).squeeze()
        print(out)
        pred = (out > 0.5).float()
        if pred == 1.0:
            return True
        else:
            return False

In [20]:
import sounddevice as sd

dev = sd.query_devices(sd.default.device[0], 'input')
print(dev)
print("Sample rate:", dev['default_samplerate'])


{'name': 'Mikrofonarray (Realtek(R) Audio', 'index': 1, 'hostapi': 0, 'max_input_channels': 2, 'max_output_channels': 0, 'default_low_input_latency': 0.09, 'default_low_output_latency': 0.09, 'default_high_input_latency': 0.18, 'default_high_output_latency': 0.18, 'default_samplerate': 44100.0}
Sample rate: 44100.0


<h2>Audio Recording</h2>
Starts audio recording and detect wake word
<h3>Say: "Hey, Snips!"</h3>

In [23]:
audio_queue = queue.Queue()
audio_buffer = []

tried_to_detect = False
def callback(indata, frames, time, status):
    if status:
        print(status)
    audio_queue.put(indata.copy().flatten())

with sd.InputStream(samplerate=SAMPLE_RATE,callback=callback):
    try:
        while True:    
            new_audio = audio_queue.get()
            #new_audio = librosa.resample(new_audio, orig_sr=MIC_RATE, target_sr=SAMPLE_RATE)
            if new_audio is None:
                continue
            audio_buffer = np.concatenate((audio_buffer, new_audio))
            if len(audio_buffer) > WINDOW_SAMPLES and not tried_to_detect:
                is_detected = wake_word_detect(audio_buffer)
                if (is_detected) :
                    print("Wake Word Detected!")
                    sd.play(audio_buffer, SAMPLE_RATE)
                    break
                tried_to_detect = True
            if len(audio_buffer) >= WINDOW_SAMPLES + STRIDE_SAMPLES:
                audio_buffer = audio_buffer[int(STRIDE_SAMPLES):]
                tried_to_detect = False
                
    except KeyboardInterrupt:
        print("Exiting from keyboard interrupt")
        

tensor(-6.5023)
tensor(-3.5307)
tensor(-4.6720)
tensor(-4.8157)
tensor(-6.2468)
tensor(-2.7570)
tensor(-2.2642)
tensor(-4.3235)
tensor(-4.5635)
tensor(-4.2110)
tensor(-5.1627)
tensor(-6.6498)
tensor(-5.7129)
tensor(-2.2449)
tensor(3.7066)
Wake Word Detected!
