In [1]:
import torch

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cpu


In [2]:
import pandas as pd
from torch.utils.data import Dataset
import os
import torchaudio

class My_Dataset(Dataset):

    def __init__(self, csv_file, root_dir, set_number):
        """
        Arguments:
            csv_file (string): Path to the csv file with annotations.
            root_dir (string): Directory with all the audio files.
            set_number (int): Set number to filter the samples.
        """
        self.data = pd.read_csv(csv_file)
        self.root_dir = root_dir
        self.set_number = set_number
        self.filtered_indices = self.filter()

    def __len__(self):
        return len(self.filtered_indices)

    def filter(self):
        filtered_indices = []
        for idx in range(len(self.data)):
            set_value = int(self.data.iloc[idx, 1])
            if set_value == self.set_number:
                filtered_indices.append(idx)
        return filtered_indices

    def __getitem__(self, idx):
        idx = self.filtered_indices[idx]
        file = os.path.join(self.root_dir, self.data.iloc[idx, 0])
        class_label = int(self.data.iloc[idx, 2])
        waveform, _ = torchaudio.load(file)
        return waveform, class_label

csv_file = 'dataset.csv'
root_dir = 'Inregistrari'
train_dataset = My_Dataset(csv_file, root_dir, 0)
test_dataset = My_Dataset(csv_file, root_dir, 1)
valid_dataset = My_Dataset(csv_file, root_dir, 2)
print(train_dataset[0])
print(test_dataset[0])
print(valid_dataset[0])


(tensor([[-0.0282, -0.0244, -0.0236,  ...,  0.0104,  0.0114,  0.0126]]), 3)
(tensor([[ 9.1553e-05, -2.7466e-04, -9.1553e-04,  ..., -3.9673e-04,
         -4.8828e-04, -4.5776e-04]]), 2)
(tensor([[-0.0032, -0.0017, -0.0019,  ...,  0.0024,  0.0081,  0.0065]]), 3)


In [3]:
batch_size = 64


if device == "cuda":
    num_workers = 2
    pin_memory = True
else:
    num_workers = 0
    pin_memory = False


train_loader = torch.utils.data.DataLoader(
    train_dataset,
    batch_size=batch_size,
    shuffle=True,
    num_workers=num_workers,
    pin_memory=pin_memory,
)
test_loader = torch.utils.data.DataLoader(
    test_dataset,
    batch_size=batch_size,
    shuffle=False,
    drop_last=False,
    num_workers=num_workers,
    pin_memory=pin_memory,
)

In [4]:
classes = ['start', 'stop', 'home', 'pick_up', 'approach', 'free']



In [5]:
import torch.nn as nn
import torch.nn.functional as F

class M4(nn.Module):
    def __init__(self, n_input=1, n_output=6, stride=16, n_channel=32):
        super().__init__()
        self.conv1 = nn.Conv1d(n_input, n_channel, kernel_size=80, stride=stride)
        self.bn1 = nn.BatchNorm1d(n_channel)
        self.pool1 = nn.MaxPool1d(4)
        self.conv2 = nn.Conv1d(n_channel, n_channel, kernel_size=3)
        self.bn2 = nn.BatchNorm1d(n_channel)
        self.pool2 = nn.MaxPool1d(4)
        self.conv3 = nn.Conv1d(n_channel, 2 * n_channel, kernel_size=3)
        self.bn3 = nn.BatchNorm1d(2 * n_channel)
        self.pool3 = nn.MaxPool1d(4)
        self.conv4 = nn.Conv1d(2 * n_channel, 2 * n_channel, kernel_size=3)
        self.bn4 = nn.BatchNorm1d(2 * n_channel)
        self.pool4 = nn.MaxPool1d(4)
        # self.fc1 = nn.Linear(2 * n_channel, n_output)
        self.fc1 = nn.LazyLinear(n_output)
    

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(self.bn1(x))
        x = self.pool1(x)
        x = self.conv2(x)
        x = F.relu(self.bn2(x))
        x = self.pool2(x)
        x = self.conv3(x)
        x = F.relu(self.bn3(x))
        x = self.pool3(x)
        x = self.conv4(x)
        x = F.relu(self.bn4(x))
        x = self.pool4(x)
        x = F.avg_pool1d(x, x.shape[-1])
        x = x.permute(0, 2, 1)
        x = self.fc1(x)
        x = x.squeeze(1)
        return F.log_softmax(x, dim=1)


model = M4(n_input=1, n_output=len(classes))
model.to(device)
model(torch.randn([64, 1, 16000]))
print(model)


def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)


n = count_parameters(model)
print("Number of parameters: %s" % n)

M4(
  (conv1): Conv1d(1, 32, kernel_size=(80,), stride=(16,))
  (bn1): BatchNorm1d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (pool1): MaxPool1d(kernel_size=4, stride=4, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv1d(32, 32, kernel_size=(3,), stride=(1,))
  (bn2): BatchNorm1d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (pool2): MaxPool1d(kernel_size=4, stride=4, padding=0, dilation=1, ceil_mode=False)
  (conv3): Conv1d(32, 64, kernel_size=(3,), stride=(1,))
  (bn3): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (pool3): MaxPool1d(kernel_size=4, stride=4, padding=0, dilation=1, ceil_mode=False)
  (conv4): Conv1d(64, 64, kernel_size=(3,), stride=(1,))
  (bn4): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (pool4): MaxPool1d(kernel_size=4, stride=4, padding=0, dilation=1, ceil_mode=False)
  (fc1): Linear(in_features=64, out_features=6, bias=True)
)
Number



In [6]:
import torch.optim as optim
import torch.optim.lr_scheduler as lr_scheduler

optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=0.0001)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.1)  # reduce the learning after 20 epochs by a factor of 10

In [7]:
train_l = []
train_a = []  # Initialize list to store average accuracy for each epoch

def train(model, epoch, log_interval):
    model.train()
    total_loss = 0
    total_correct = 0  # Initialize total number of correct predictions
    total = 0

    for batch_idx, (data, target) in enumerate(train_loader):
        data = data.to(device)
        target = target.to(device)

        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output.squeeze(), target)
        loss.backward()
        optimizer.step()

        total_loss += loss.item() * data.size(0)
        total += data.size(0)

        # Calculate number of correct predictions
        pred = output.argmax(dim=1, keepdim=True)  # Get the index of the max log-probability
        correct = pred.eq(target.view_as(pred)).sum().item()
        total_correct += correct

        if batch_idx % log_interval == 0:
            print(f"Train Epoch: {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)} ({100. * batch_idx / len(train_loader):.0f}%)]\tLoss: {loss.item():.6f}")

    avg_loss = total_loss / total
    train_l.append(avg_loss)

    avg_accuracy = total_correct / total  # Calculate average accuracy for the epoch
    train_a.append(avg_accuracy)  # Append average accuracy to the list

    print(f"\nTrain Epoch: {epoch}\tAverage Loss: {avg_loss:.6f}\tAverage Accuracy: {avg_accuracy:.2f}\n")

In [8]:
def number_of_correct(pred, target):
    # count number of correct predictions
    return pred.squeeze().eq(target).sum().item()


def get_likely_index(tensor):
    # find most likely label index for each element in the batch
    return tensor.argmax(dim=-1)

true_labels = []
predictions = []
test_l = []
test_a = []
#clear lists
true_labels.clear()
predictions.clear()
test_l.clear()
test_a.clear()

def test(model, epoch):
    model.eval()
    correct = 0
    total_loss = 0  # Initialize total loss for the epoch
    total = 0  # Total number of samples processed

    with torch.no_grad():
        for data, target in test_loader:
            data = data.to(device)
            target = target.to(device)

            output = model(data)
            pred = get_likely_index(output)
            correct += number_of_correct(pred, target)

            _, predicted = torch.max(output, 1)
            predictions.extend(predicted.cpu().numpy())
            true_labels.extend(target.cpu().numpy())

            # Calculate loss for the current batch and accumulate it
            test_loss = F.nll_loss(output.squeeze(), target).item()
            total_loss += test_loss * data.size(0)  # Multiply by batch size to get total loss for the batch
            total += data.size(0)  # Accumulate the total number of samples

        # Calculate average loss and accuracy for the epoch
        avg_loss = total_loss / total
        accuracy = correct / total

        # Append the calculated metrics to their respective lists
        test_l.append(avg_loss)
        test_a.append(accuracy)

        print(f"\nTest Epoch: {epoch}\tAccuracy: {correct}/{total} ({100. * accuracy:.0f}%)\n")

In [9]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchaudio
from tqdm import tqdm

log_interval = 20
n_epoch = 200

pbar_update = 1 / (len(train_loader) + len(test_loader))
losses = []

# The transform needs to live on the same device as the model and the data.
# transform = transform.to(device)
with tqdm(total=n_epoch) as pbar:
    for epoch in range(1, n_epoch + 1):
        train(model, epoch, log_interval)
        test(model, epoch)
        torch.save(model.state_dict(), 'best_command.pt')
        scheduler.step()


  0%|          | 0/200 [00:00<?, ?it/s]


Train Epoch: 1	Average Loss: 1.740443	Average Accuracy: 0.28


Test Epoch: 1	Accuracy: 12/96 (12%)


Train Epoch: 2	Average Loss: 1.432506	Average Accuracy: 0.60


Test Epoch: 2	Accuracy: 12/96 (12%)


Train Epoch: 3	Average Loss: 1.240317	Average Accuracy: 0.72


Test Epoch: 3	Accuracy: 21/96 (22%)


Train Epoch: 4	Average Loss: 1.083425	Average Accuracy: 0.80


Test Epoch: 4	Accuracy: 47/96 (49%)


Train Epoch: 5	Average Loss: 0.955223	Average Accuracy: 0.85


Test Epoch: 5	Accuracy: 56/96 (58%)


Train Epoch: 6	Average Loss: 0.848462	Average Accuracy: 0.86


Test Epoch: 6	Accuracy: 58/96 (60%)


Train Epoch: 7	Average Loss: 0.748206	Average Accuracy: 0.90


Test Epoch: 7	Accuracy: 65/96 (68%)


Train Epoch: 8	Average Loss: 0.647972	Average Accuracy: 0.92


Test Epoch: 8	Accuracy: 73/96 (76%)


Train Epoch: 9	Average Loss: 0.565180	Average Accuracy: 0.94


Test Epoch: 9	Accuracy: 76/96 (79%)


Train Epoch: 10	Average Loss: 0.494230	Average Accuracy: 0.96


Test Epoch: 10	Accuracy: 7

  0%|          | 0/200 [02:14<?, ?it/s]


Train Epoch: 200	Average Loss: 0.099738	Average Accuracy: 1.00


Test Epoch: 200	Accuracy: 83/96 (86%)






In [106]:
#record 10 audios of 1 second each and save them in the recordings folder
import numpy as np
import soundfile as sf
import matplotlib.pyplot as plt
import sounddevice as sd
from scipy.io.wavfile import write

freq = 16000
duration = 1

#load model best_command.pt
model.load_state_dict(torch.load('best_command.pt'))
model.eval()

# record audio
recording = sd.rec(int(duration * freq),
                       samplerate=freq, channels=1)
print("Start recording")
sd.wait()
print("Stop recording")
write(f"test0.wav", freq, recording)

#load the audio 
waveform, _ = torchaudio.load("test0.wav")
waveform = waveform.unsqueeze(0)  # Add a batch dimension if necessary

# Ensure the waveform is on the correct device
waveform = waveform.to(device)

# Now proceed with the prediction
output = model(waveform)
_, predicted = torch.max(output, 1)
print(classes[predicted])

#save the command in a file
#overwrite the command file
f = open("command.txt", "w")
f.write(classes[predicted])
f.close()




Start recording
Stop recording
free
