In [141]:
import torch
from torch import nn
from torch.utils.data import TensorDataset, DataLoader
import numpy as np
import os
import librosa
import librosa.display
from sklearn.model_selection import train_test_split
from tqdm.auto import tqdm
from timeit import default_timer as timer

In [142]:
AUDIO_DIR = "audios"   
SAMPLE_RATE = 44100    
DURATION = 3           
N_MELS = 128           
TEST_SIZE = 0.2        
USE_MFCC = False       

In [143]:
def extract_features(path, sample_rate, duration, n_mels, use_mfcc=False):
    y, sr = librosa.load(path, sr=sample_rate)
    target_len = int(sample_rate * duration)
    if len(y) < target_len:
        y = np.pad(y, (0, target_len - len(y)))
    else:
        y = y[:target_len]
    
    if use_mfcc:
        features = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=n_mels)
    else:
        features = librosa.feature.melspectrogram(y=y, sr=sr, n_mels=n_mels)
        features = librosa.power_to_db(features, ref=np.max)
    
    return features

In [144]:
def load_dataset(audio_dir):
    X, y = [], []
    labels = ["non_wake", "wake"]
    for label_idx, label in enumerate(labels):
        folder = os.path.join(audio_dir, label)
        for file in os.listdir(folder):
            if file.endswith(".wav"):
                path = os.path.join(folder, file)
                feats = extract_features(path, SAMPLE_RATE, DURATION, N_MELS, USE_MFCC)
                X.append(feats)
                y.append(label_idx)
    X = np.array(X)
    y = np.array(y)

    X = X[..., np.newaxis]            # (N, H, W, 1)
    X = np.transpose(X, (0, 3, 1, 2)) # (N, 1, H, W)
    return X, y, labels

In [145]:
X, y, labels = load_dataset(AUDIO_DIR)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=TEST_SIZE, stratify=y)

X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.long)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.long)

train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)
train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [146]:
class WakeWordCNN(nn.Module):
    def __init__(self, input_channels: int, hidden_units: int, output_shape: int):
        super().__init__()
        self.block1 = nn.Sequential(
            nn.Conv2d(input_channels, hidden_units, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(hidden_units, hidden_units, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )
        self.block2 = nn.Sequential(
            nn.Conv2d(hidden_units, hidden_units, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(hidden_units, hidden_units, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )
        self.flatten = nn.Flatten()
        self._to_linear = None
        self.classifier = None
        self.output_shape = output_shape

    def forward(self, x):
        x = self.block1(x)
        x = self.block2(x)
        if self._to_linear is None:
            self._to_linear = x.view(x.size(0), -1).shape[1]
            self.classifier = nn.Linear(self._to_linear, self.output_shape).to(x.device)
        x = self.flatten(x)
        x = self.classifier(x)
        return x

In [147]:
torch.manual_seed(42)
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
print(f"Using device: {device}")

Using device: mps


In [148]:
model_0 = WakeWordCNN(input_channels=1, hidden_units=32, output_shape=2).to(device)

In [149]:
def accuracy_fn(y_true, y_pred):
    correct = (y_true == y_pred).sum().item()
    acc = correct / len(y_true) * 100
    return acc

In [150]:
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(params=model_0.parameters(), lr=0.1)

In [154]:
epochs = 10
train_time_start = timer()

for epoch in tqdm(range(epochs)):
    #print(f"\nEpoch {epoch+1}/{epochs}")
    train_loss = 0
    model_0.train()

    for X_batch, y_batch in train_dataloader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        y_pred = model_0(X_batch)
        loss = loss_fn(y_pred, y_batch)
        train_loss += loss.item()

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    train_loss /= len(train_dataloader)

   
    test_loss, test_acc = 0, 0
    model_0.eval()
    with torch.inference_mode():
        for X_batch, y_batch in test_dataloader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            test_pred = model_0(X_batch)
            test_loss += loss_fn(test_pred, y_batch).item()
            test_acc += accuracy_fn(y_batch, test_pred.argmax(dim=1))

    test_loss /= len(test_dataloader)
    test_acc /= len(test_dataloader)

    #print(f"Train loss: {train_loss:.4f} | Test loss: {test_loss:.4f} | Test acc: {test_acc:.2f}%")

train_time_end = timer()
print(f"\n Training time on {device}: {train_time_end - train_time_start:.2f} sec")

  0%|          | 0/10 [00:00<?, ?it/s]


 Training time on mps: 1.42 sec


In [155]:
torch.manual_seed(42)
def eval_model(model: torch.nn.Module, data_loader: torch.utils.data.DataLoader, loss_fn: torch.nn.Module, accuracy_fn, device = device):
    loss, acc = 0, 0
    model.eval()
    with torch.inference_mode():
        for X, y in data_loader:
            X,y = X.to(device), y.to(device)
            y_pred = model(X)
            loss += loss_fn(y_pred, y)
            acc += accuracy_fn(y, y_pred.argmax(dim=1))
        loss /= len(data_loader)
        acc /= len(data_loader)
        
    return {"model_name": model.__class__.__name__,
            "model_loss": loss.item(),
            "model_acc": acc}

In [156]:
model_0_results = eval_model(model_0, test_dataloader, loss_fn, accuracy_fn, device)
model_0_results

{'model_name': 'WakeWordCNN',
 'model_loss': 0.6929569840431213,
 'model_acc': 57.14285714285714}

In [162]:
import torch.nn.functional as F
import sounddevice as sd
import numpy as np
import scipy.io.wavfile as wav

def record_audio(filename, duration, samplerate=44100):
    print("Recording...")
    recording = sd.rec(int(duration * samplerate), samplerate=samplerate, channels=1, dtype='int16')
    sd.wait()
    print("Record completed.")
    wav.write(filename, samplerate, recording)

duration = 3
filename = "test_audio.wav"
record_audio(filename, duration)
model_0.eval()

# Charger l'audio à prédire
audio_path = "test_audio.wav"
features = extract_features(audio_path, SAMPLE_RATE, DURATION, N_MELS, USE_MFCC)

# Formater comme batch pour le CNN
X = np.expand_dims(features, axis=(0,1))  # (1, 1, H, W)
X = torch.tensor(X, dtype=torch.float32)

device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
model_0.to(device)


X = X.to(device)   # <-- Très important !


with torch.inference_mode():
    preds = model_0(X)
    probs = F.softmax(preds, dim=1)
    pred_class = torch.argmax(probs, dim=1).item()
    confidence = probs[0][pred_class].item()


labels = ["non_wake", "wake"]

print(f"Prediction : {labels[pred_class]} (probability {confidence*100:.2f}%)")

Recording...
Record completed.
Prediction : wake (probability 64.14%)
