In [None]:
import json
import pandas as pd
from datetime import datetime

import evaluate
import numpy as np
import torch
from datasets import load_dataset

In [None]:
speech_data = load_dataset(
    "speech_commands", "v0.02"
)

In [None]:
speech_data

In [None]:
speech_data = speech_data.filter(lambda x: len(x["audio"]["array"]) / 16_000 < 10)

In [None]:
speech_data

In [None]:
subset_of_labels = ["up", "down", "left", "right", "on", "off", "yes", "no"]

In [None]:
speech_data = speech_data.filter(lambda x: x['file'].split("/")[0] in subset_of_labels)

In [None]:
speech_data

In [None]:
train = speech_data["train"]
validation = speech_data["validation"]
test = speech_data["test"]

In [None]:
SEED = 1
train = train.shuffle(seed=SEED)
validation = validation.shuffle(seed=SEED)
test = test.shuffle(seed=SEED)

In [None]:
def extract_fields(example):
    x = example["audio"]["array"]
    return {"label": example["label"], "array": np.pad(x, (0, 16000 - len(x)), constant_values=0)}

In [None]:
train = train.map(extract_fields)
validation = validation.map(extract_fields)
test = test.map(extract_fields)

In [None]:
train = train.map(remove_columns=["file", "audio", "speaker_id", "utterance_id"])
validation = validation.map(remove_columns=["file", "audio", "speaker_id", "utterance_id"])
test = test.map(remove_columns=["file", "audio", "speaker_id", "utterance_id"])

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device = 'cuda:3'
print(device)

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from datasets import load_dataset
import numpy as np

from torch.autograd import Variable

# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(LSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)

        out, _ = self.lstm(x, (h0, c0))

        out = self.fc(out[:, -1, :])
        return out


from datasets import load_metric

accuracy = load_metric("accuracy")

NUM_EPOCHS = 100
BATCH_SIZE = 256

torch.cuda.empty_cache()
model = LSTMModel(16_000, 64, 2, len(subset_of_labels))

model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
    optimizer=optimizer, T_max=NUM_EPOCHS, eta_min=0
)
best_val_loss = float("inf")

In [None]:
def calculate_accuracy(preds, y):
    temp = torch.nn.functional.softmax(preds, dim=1)
    temp = torch.argmax(temp, dim=1)
    return (torch.sum(temp == y) / len(y)).item()

In [None]:
from tqdm import tqdm

results = pd.DataFrame(columns=["epoch", "train_loss", "train_accuracy", "val_loss", "val_accuracy"])

train_loader = DataLoader(train, batch_size=BATCH_SIZE, shuffle=True)
valid_loader = DataLoader(validation, batch_size=BATCH_SIZE)

for epoch in tqdm(range(1, NUM_EPOCHS + 1)):
    model.train()
    train_loss = 0
    train_accuracy = 0
    for batch in train_loader:
        x = batch["array"]
        x = torch.stack(x).to(device)
        x = x.unsqueeze(1)
        x = x.permute(2, 1, 0)
        y = batch["label"].to(device)
        y_pred = model(x.float())
        loss = criterion(y_pred, y)
        train_loss += loss.item()
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        scheduler.step()
        train_accuracy += calculate_accuracy(y_pred, y)
    train_loss /= len(train_loader)
    train_accuracy = train_accuracy / len(train_loader)
    print(f"Epoch {epoch} train loss: {train_loss}, train accuracy: {train_accuracy}")
    model.eval()
    with torch.no_grad():
        val_loss = 0
        val_accuracy = 0
        for batch in valid_loader:
            x = batch["array"]
            x = torch.stack(x).to(device)
            x = x.unsqueeze(1)
            x = x.permute(2, 1, 0)
            y = batch["label"].to(device)
            y_pred = model(x.float())
            val_loss += criterion(y_pred, y).item()
            val_accuracy += calculate_accuracy(y_pred, y)
        val_loss /= len(valid_loader)
        val_accuracy = val_accuracy / len(valid_loader)
        print(f"Epoch {epoch} val loss: {val_loss}, val accuracy: {val_accuracy}")
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), "best_model.pth")
        row = pd.DataFrame(
            {
                "epoch": [epoch],
                "train_loss": [loss.item()],
                "train_accuracy": [train_accuracy],
                "val_loss": [val_loss],
                "val_accuracy": [val_accuracy],
            }
        )
        results = pd.merge(results, row, how="outer")

In [None]:
results

In [None]:
results['val_accuracy'].max()

In [None]:
results.to_csv('lstm_only_selected.csv')

In [None]:
loaded_model = LSTMModel(16_000, 64, 2, len(subset_of_labels))
loaded_model.load_state_dict(torch.load('best_model.pth', map_location=torch.device('cpu')))

loaded_model.eval()

with torch.no_grad():
    val_loss = 0
    val_accuracy = 0
    for batch in DataLoader(validation, batch_size=BATCH_SIZE):
        x = batch["array"]
        x = torch.stack(x).to(device)
        x = x.unsqueeze(1)
        x = x.permute(2, 1, 0)
        y = batch["label"].to(device)
        y_pred = model(x.float())
        val_loss += criterion(y_pred, y).item()
        val_accuracy += calculate_accuracy(y_pred, y)
    val_loss /= len(validation)
    val_accuracy = val_accuracy / len(validation)
print(val_loss)
print(val_accuracy)