In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
from torchaudio.transforms import MelSpectrogram
from torchaudio.datasets import SPEECHCOMMANDS
import os
from tqdm import tqdm

# Check if GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

# Define the SpeechCommands dataset
class SubsetSC(SPEECHCOMMANDS):
    def __init__(self, subset: str = None):
        super().__init__("./", download=True)

        def load_list(filename):
            filepath = os.path.join(self._path, filename)
            with open(filepath) as fileobj:
                return [os.path.normpath(os.path.join(self._path, line.strip())) for line in fileobj]

        if subset == "validation":
            self._walker = load_list("validation_list.txt")
        elif subset == "testing":
            self._walker = load_list("testing_list.txt")
        elif subset == "training":
            excludes = load_list("validation_list.txt") + load_list("testing_list.txt")
            excludes = set(excludes)
            self._walker = [w for w in self._walker if w not in excludes]


cuda


In [2]:
def collate_fn(batch):
    # Extract features (waveforms) and labels from the batch
    waveforms = [item[0] for item in batch]
    labels = [item[2] for item in batch]

    # Convert labels to indices
    label_indices = [label_to_index(label) for label in labels]

    # Pad waveforms to have the same length
    max_length = max(waveform.size(1) for waveform in waveforms)
    padded_waveforms = torch.stack([F.pad(waveform, (0, max_length - waveform.size(1))) for waveform in waveforms])

    return padded_waveforms, torch.tensor(label_indices, dtype=torch.long)


In [3]:
# Create training and testing split of the data
train_set = SubsetSC("training")
test_set = SubsetSC("testing")

# Batch size, device, and other setup
batch_size = 256

if device == "cuda":
    num_workers = 1
    pin_memory = True
else:
    num_workers = 0
    pin_memory = False

# Create DataLoader instances for training and testing
train_loader = DataLoader(
    train_set,
    batch_size=batch_size,
    shuffle=True,
    collate_fn=collate_fn,  # Include the collate_fn here
    num_workers=num_workers,
    pin_memory=pin_memory,
)

test_loader = DataLoader(
    test_set,
    batch_size=batch_size,
    shuffle=False,
    drop_last=False,
    collate_fn=collate_fn,  # Include the collate_fn here
    num_workers=num_workers,
    pin_memory=pin_memory,
)



# Function to convert labels to indices and vice versa
def label_to_index(word):
    return labels.index(word)

def index_to_label(index):
    return labels[index]


# Define the RNN model
class RNNModel(nn.Module):
    def __init__(self, input_size=1, hidden_size=64, num_classes=None, num_layers=2):
        super(RNNModel, self).__init__()
        self.num_layers = num_layers
        self.rnn = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        # x should have a shape of (batch_size, sequence_length, input_size)
        h0 = torch.zeros(self.num_layers, x.size(0), self.fc.in_features).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.fc.in_features).to(x.device)
        out, _ = self.rnn(x, (h0, c0))
        out = self.fc(out[:, -1, :])
        return F.log_softmax(out, dim=1)

# Initialize the RNN model, loss function, and optimizer
labels = sorted(list(set(datapoint[2] for datapoint in train_set)))
num_classes = len(labels)

# Modify the input_size argument in the model instantiation
model = RNNModel(input_size=16000, hidden_size=64, num_classes=num_classes, num_layers=2)
model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)


100%|██████████| 2.26G/2.26G [00:28<00:00, 83.9MB/s]


In [4]:
# Training function
def train(model, train_loader, criterion, optimizer, num_epochs=5):
    total_step = len(train_loader)
    for epoch in range(num_epochs):
        model.train()
        for i, (waveform, labels) in enumerate(train_loader):
            waveform = waveform.to(device)
            labels = labels.to(device)

            outputs = model(waveform)
            loss = criterion(outputs, labels)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            if (i + 1) % 100 == 0:
                print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'
                      .format(epoch+1, num_epochs, i+1, total_step, loss.item()))
# Test function
def test(model, test_loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for waveform, labels in test_loader:
            waveform = waveform.to(device)
            labels = labels.to(device)

            outputs = model(waveform)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = correct / total * 100
    print('Test Accuracy: {:.2f}%'.format(accuracy))

# Train the RNN model
train(model, train_loader, criterion, optimizer, num_epochs=5)

# Test the RNN model
test(model, test_loader)

Epoch [1/5], Step [100/332], Loss: 3.4721
Epoch [1/5], Step [200/332], Loss: 3.5174
Epoch [1/5], Step [300/332], Loss: 3.4296
Epoch [2/5], Step [100/332], Loss: 3.2003
Epoch [2/5], Step [200/332], Loss: 3.0872
Epoch [2/5], Step [300/332], Loss: 3.0874
Epoch [3/5], Step [100/332], Loss: 2.8470
Epoch [3/5], Step [200/332], Loss: 2.6945
Epoch [3/5], Step [300/332], Loss: 2.7981
Epoch [4/5], Step [100/332], Loss: 2.3999
Epoch [4/5], Step [200/332], Loss: 2.3800
Epoch [4/5], Step [300/332], Loss: 2.3871
Epoch [5/5], Step [100/332], Loss: 2.0706
Epoch [5/5], Step [200/332], Loss: 2.1194
Epoch [5/5], Step [300/332], Loss: 2.2249
Test Accuracy: 8.27%
