In [12]:
import os
import torch
import torchaudio
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, random_split
import numpy as np

In [13]:

DATA_DIR = "ML_TACTIGON/customTSkin/data/audiodati"
SAMPLE_RATE = 16000
DURATION = 1.0
NUM_CLASSES = 4
BATCH_SIZE = 16
EPOCHS = 4
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [15]:
class AudioDataset(Dataset):
    def __init__(self, data_dir, sample_rate, duration):
        self.data = []
        self.labels = []
        self.sample_rate = sample_rate
        self.duration = duration
        self.num_samples = int(sample_rate * duration)
        self.classes = sorted(os.listdir(data_dir))
        self.class_to_idx = {cls: idx for idx, cls in enumerate(self.classes)}

        for cls in self.classes:
            class_dir = os.path.join(data_dir, cls)
            for file in os.listdir(class_dir):
                if file.endswith(".wav"):
                    self.data.append(os.path.join(class_dir, file))
                    self.labels.append(self.class_to_idx[cls])

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        file_path = self.data[idx]
        label = self.labels[idx]
        waveform, sr = torchaudio.load(file_path)
        waveform = torchaudio.transforms.Resample(orig_freq=sr, new_freq=self.sample_rate)(waveform)

        if waveform.shape[1] < self.num_samples:
            waveform = torch.nn.functional.pad(waveform, (0, self.num_samples - waveform.shape[1]))
        else:
            waveform = waveform[:, :self.num_samples]

        return waveform, label


In [16]:
class AudioClassifier(nn.Module):
    def __init__(self, num_classes):
        super(AudioClassifier, self).__init__()
        self.conv1 = nn.Conv1d(1, 16, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv1d(16, 32, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv1d(32, 64, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool1d(2)
        self.dropout = nn.Dropout(0.3)
        self.batch_norm1 = nn.BatchNorm1d(16)
        self.batch_norm2 = nn.BatchNorm1d(32)
        self.batch_norm3 = nn.BatchNorm1d(64)

        self.fc1 = nn.Linear(64 * (SAMPLE_RATE // 8), 128)
        self.fc2 = nn.Linear(128, num_classes)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.relu(self.batch_norm1(self.conv1(x)))
        x = self.pool(x)
        x = self.relu(self.batch_norm2(self.conv2(x)))
        x = self.pool(x)
        x = self.relu(self.batch_norm3(self.conv3(x)))
        x = self.pool(x)
        x = torch.flatten(x, 1)
        x = self.dropout(self.relu(self.fc1(x)))
        x = self.fc2(x)
        return x


In [17]:
def train_model(model, train_loader, val_loader, criterion, optimizer, device, epochs, patience=3):
    best_val_loss = float('inf')
    counter = 0 
    best_model_state = None 

    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        val_loss, val_acc = evaluate_model(model, val_loader, criterion, device)

        print(f"Epoch {epoch + 1}/{epochs}, Train Loss: {running_loss / len(train_loader):.4f}, "
              f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2%}")

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            counter = 0
            best_model_state = model.state_dict()
        else:
            counter += 1
            print(f"Early stopping counter: {counter}/{patience}")

        if counter >= patience:
            print("Early stopping triggered")
            break

    if best_model_state is not None:
        model.load_state_dict(best_model_state)
    return model

def evaluate_model(model, dataloader, criterion, device):
    model.eval()
    total_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            total_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    accuracy = correct / total
    return total_loss / len(dataloader), accuracy


In [18]:
dataset = AudioDataset(DATA_DIR, SAMPLE_RATE, DURATION)

In [19]:
train_size = int(0.7 * len(dataset))
val_size = int(0.2 * len(dataset))
test_size = len(dataset) - train_size - val_size
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

In [20]:
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

In [21]:
model = AudioClassifier(NUM_CLASSES).to(DEVICE)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)

In [22]:
train_model(model, train_loader, val_loader, criterion, optimizer, DEVICE, EPOCHS)

Epoch 1/4, Train Loss: 0.9746, Val Loss: 0.6982, Val Acc: 70.26%
Epoch 2/4, Train Loss: 0.6690, Val Loss: 0.6291, Val Acc: 75.00%
Epoch 3/4, Train Loss: 0.5516, Val Loss: 0.5644, Val Acc: 76.28%
Epoch 4/4, Train Loss: 0.4665, Val Loss: 0.5625, Val Acc: 78.36%


AudioClassifier(
  (conv1): Conv1d(1, 16, kernel_size=(3,), stride=(1,), padding=(1,))
  (conv2): Conv1d(16, 32, kernel_size=(3,), stride=(1,), padding=(1,))
  (conv3): Conv1d(32, 64, kernel_size=(3,), stride=(1,), padding=(1,))
  (pool): MaxPool1d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (dropout): Dropout(p=0.3, inplace=False)
  (batch_norm1): BatchNorm1d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (batch_norm2): BatchNorm1d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (batch_norm3): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (fc1): Linear(in_features=128000, out_features=128, bias=True)
  (fc2): Linear(in_features=128, out_features=4, bias=True)
  (relu): ReLU()
)

In [23]:
def test_model(model, test_loader, criterion, device):
    model.eval()
    total_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            total_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    accuracy = correct / total
    return total_loss / len(test_loader), accuracy

In [24]:
test_loss, test_accuracy = test_model(model, test_loader, criterion, DEVICE)
print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.2%}")

Test Loss: 0.5445, Test Accuracy: 79.03%


In [25]:
torch.save(model, "model_audio.pth")

In [None]:
model = torch.load("model_audio.pth")
model.eval()

  model = torch.load("model_audio.pth")


AudioClassifier(
  (conv1): Conv1d(1, 16, kernel_size=(3,), stride=(1,), padding=(1,))
  (conv2): Conv1d(16, 32, kernel_size=(3,), stride=(1,), padding=(1,))
  (conv3): Conv1d(32, 64, kernel_size=(3,), stride=(1,), padding=(1,))
  (pool): MaxPool1d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (dropout): Dropout(p=0.3, inplace=False)
  (batch_norm1): BatchNorm1d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (batch_norm2): BatchNorm1d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (batch_norm3): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (fc1): Linear(in_features=128000, out_features=128, bias=True)
  (fc2): Linear(in_features=128, out_features=4, bias=True)
  (relu): ReLU()
)

In [27]:
torch.save(model.state_dict(), "model_state_audio.pth")

### Registrazione file audio

In [56]:
import torch
import torchaudio
import pyaudio
import wave
import torch.nn.functional as F

In [67]:
SAMPLE_RATE = 16000
DURATION = 1.0       
NUM_SAMPLES = int(SAMPLE_RATE * DURATION) 
AUDIO_FORMAT = pyaudio.paInt16
CHANNELS = 1
CHUNK = 1024
OUTPUT_FILE = "test.wav"
#PROVA = "ML_TACTIGON/customTSkin/data/audiodati/no/0a2b400e_nohash_0.wav"


In [58]:
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [60]:
def record_audio(output_file, duration, sample_rate):
    audio = pyaudio.PyAudio()
    print("Recording audio...")
    
    stream = audio.open(format=AUDIO_FORMAT,
                        channels=CHANNELS,
                        rate=sample_rate,
                        input=True,
                        frames_per_buffer=CHUNK)
    frames = []
    for _ in range(0, int(sample_rate / CHUNK * duration)):
        data = stream.read(CHUNK)
        frames.append(data)

    print("Recording complete.")
    stream.stop_stream()
    stream.close()
    audio.terminate()

    with wave.open(output_file, 'wb') as wf:
        wf.setnchannels(CHANNELS)
        wf.setsampwidth(audio.get_sample_size(AUDIO_FORMAT))
        wf.setframerate(sample_rate)
        wf.writeframes(b''.join(frames))


In [61]:
def predict_command(model, audio_file, sample_rate, num_samples, class_labels):
    waveform, sr = torchaudio.load(audio_file)

    if sr != sample_rate:
        waveform = torchaudio.transforms.Resample(orig_freq=sr, new_freq=sample_rate)(waveform)
        
    if waveform.shape[1] < num_samples:
        waveform = F.pad(waveform, (0, num_samples - waveform.shape[1]))
    else:
        waveform = waveform[:, :num_samples]
        
    waveform = waveform.unsqueeze(0).to(DEVICE)
    model.eval()
    with torch.no_grad():
        output = model(waveform)
        probabilities = F.softmax(output, dim=1)
        _, predicted_label = torch.max(probabilities, 1)
        predicted_class = class_labels[predicted_label.item()]
        confidence = probabilities[0][predicted_label.item()].item()

    return predicted_class, confidence

In [68]:
record_audio(OUTPUT_FILE, DURATION, SAMPLE_RATE)

Recording audio...
Recording complete.


In [69]:
if __name__ == "__main__":
    CLASS_LABELS = ["down", "no", "up", "yes"]

    model = AudioClassifier(len(CLASS_LABELS)).to(DEVICE)
    model.load_state_dict(torch.load("model_state_audio.pth", map_location=DEVICE))
    model.eval()

    record_audio(OUTPUT_FILE, DURATION, SAMPLE_RATE)

    predicted_command, confidence = predict_command(model, OUTPUT_FILE, SAMPLE_RATE, NUM_SAMPLES, CLASS_LABELS)
    print(f"Il comando predetto è: {predicted_command} (Confidence: {confidence:.2%})")

  model.load_state_dict(torch.load("model_state_audio.pth", map_location=DEVICE))


Recording audio...
Recording complete.
Il comando predetto è: no (Confidence: 85.25%)
