In [28]:
import os
import torch
import torchaudio
import numpy as np
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader

DATA_PATH = "/kaggle/input/google-speech-commands"

SAMPLE_RATE = 16000       
DURATION = 1.0           
NUM_SAMPLES = int(SAMPLE_RATE * DURATION) 

# class RawAudioDataset(Dataset):
#     def __init__(self, file_paths, labels):
#         self.file_paths = file_paths
#         self.labels = labels

#     def __len__(self):
#         return len(self.labels)

#     def __getitem__(self, idx):
#         waveform, _ = torchaudio.load(self.file_paths[idx])
#         waveform = waveform[:, :NUM_SAMPLES]  # Trim or pad
#         if waveform.shape[1] < NUM_SAMPLES:
#             pad = NUM_SAMPLES - waveform.shape[1]
#             waveform = torch.nn.functional.pad(waveform, (0, pad))

#         waveform = waveform.unsqueeze(0)  # shape: (1, num_samples)
#         label = self.labels[idx]
#         return waveform, label

class RawAudioDataset(Dataset):
    def __init__(self, file_paths, labels):
        self.file_paths = file_paths
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        waveform, _ = torchaudio.load(self.file_paths[idx])
        waveform = waveform[:, :NUM_SAMPLES]  # Trim or pad
        if waveform.shape[1] < NUM_SAMPLES:
            pad = NUM_SAMPLES - waveform.shape[1]
            waveform = torch.nn.functional.pad(waveform, (0, pad))

        # DO NOT add extra dimension here!
        # waveform shape should be [1, NUM_SAMPLES] already from torchaudio
        return waveform, self.labels[idx]



In [43]:
def create_balanced_dataset(data_path):
    file_paths, labels = [], []
    classes = sorted([
        d for d in os.listdir(data_path)
        if os.path.isdir(os.path.join(data_path, d)) and d != "_background_noise_"
    ])
    label_map = {label: idx for idx, label in enumerate(classes)}

    class_to_files = {label: [] for label in classes}
    for label in classes:
        files = [f for f in os.listdir(os.path.join(data_path, label)) if f.endswith('.wav')]
        class_to_files[label] = [os.path.join(data_path, label, f) for f in files]

    min_class_count = min(len(files) for files in class_to_files.values())
    min_class_count = 100

    for label in classes:
        selected_files = class_to_files[label][:min_class_count]
        for file_path in selected_files:
            file_paths.append(file_path)
            labels.append(label_map[label])

    return file_paths, labels, label_map


In [44]:
from sklearn.model_selection import train_test_split

file_paths, labels, label_map = create_balanced_dataset(DATA_PATH)
X_train, X_test, y_train, y_test = train_test_split(file_paths, labels, test_size=0.2, random_state=42)

train_dataset = RawAudioDataset(X_train, y_train)
test_dataset = RawAudioDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32)


In [37]:
count=0
for batch in train_loader:
   
    count=count+1

print(count)

15


In [45]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from tqdm import tqdm

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        pos = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div = torch.exp(torch.arange(0, d_model, 2).float() * (-torch.log(torch.tensor(10000.0)) / d_model))
        pe[:, 0::2] = torch.sin(pos * div)
        pe[:, 1::2] = torch.cos(pos * div)
        pe = pe.unsqueeze(0)
        self.register_buffer("pe", pe)

    def forward(self, x):
        # x: (batch, time, d_model)
        x = x + self.pe[:, :x.size(1), :]
        return x

class CNNLSTMTransformer(nn.Module):
    def __init__(self, num_classes=10, input_channels=1, cnn_out_channels=128, lstm_hidden=128,
                 transformer_heads=4, transformer_ff_dim=256, transformer_layers=2, sample_rate=16000):
        super().__init__()

        # 1D CNN for raw audio
        self.cnn = nn.Sequential(
            nn.Conv1d(input_channels, 64, kernel_size=80, stride=4, padding=38),  # ~conv1 from wav2vec
            nn.ReLU(),
            nn.BatchNorm1d(64),
            nn.Conv1d(64, cnn_out_channels, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
            nn.BatchNorm1d(cnn_out_channels),
        )

        self.embedding_dim = cnn_out_channels

        # LSTM
        self.lstm = nn.LSTM(
            input_size=self.embedding_dim,
            hidden_size=lstm_hidden,
            num_layers=1,
            batch_first=True,
            bidirectional=True
        )
        lstm_output_dim = lstm_hidden * 2

        # Positional Encoding
        self.pos_encoder = PositionalEncoding(lstm_output_dim)

        # Transformer Encoder
        transformer_layer = nn.TransformerEncoderLayer(
            d_model=lstm_output_dim,
            nhead=transformer_heads,
            dim_feedforward=transformer_ff_dim,
            dropout=0.3,
            activation='gelu',
            batch_first=True
        )
        self.transformer = nn.TransformerEncoder(transformer_layer, num_layers=transformer_layers)

        # Classifier
        self.classifier = nn.Sequential(
            #nn.LayerNorm(lstm_output_dim),
            nn.Dropout(0.3),
            nn.AdaptiveAvgPool1d(1),
            nn.Flatten(),
            nn.Linear(lstm_output_dim, num_classes)
        )

    def forward(self, x):
        # x: (batch, 1, num_samples)
        x = self.cnn(x)  # -> (batch, channels, reduced_time)
        x = x.permute(0, 2, 1)  # -> (batch, time, features)
    
        x, _ = self.lstm(x)  # -> (batch, time, 2 * lstm_hidden)
        x = self.pos_encoder(x)  # Add positional encoding
    
        x = self.transformer(x)  # Transformer expects (batch, time, d_model)
    
        # Pool across time
        x = x.permute(0, 2, 1)  # (batch, features, time)
        x = self.classifier(x)
        return x







device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CNNLSTMTransformer(num_classes=len(label_map)).to(device)
print(model)

CNNLSTMTransformer(
  (cnn): Sequential(
    (0): Conv1d(1, 64, kernel_size=(80,), stride=(4,), padding=(38,))
    (1): ReLU()
    (2): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (3): Conv1d(64, 128, kernel_size=(3,), stride=(2,), padding=(1,))
    (4): ReLU()
    (5): BatchNorm1d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (lstm): LSTM(128, 128, batch_first=True, bidirectional=True)
  (pos_encoder): PositionalEncoding()
  (transformer): TransformerEncoder(
    (layers): ModuleList(
      (0-1): 2 x TransformerEncoderLayer(
        (self_attn): MultiheadAttention(
          (out_proj): NonDynamicallyQuantizableLinear(in_features=256, out_features=256, bias=True)
        )
        (linear1): Linear(in_features=256, out_features=256, bias=True)
        (dropout): Dropout(p=0.3, inplace=False)
        (linear2): Linear(in_features=256, out_features=256, bias=True)
        (norm1): LayerNorm((256,), eps=1e-05, elementw

In [48]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)

def train(model, train_loader, criterion, optimizer, device, num_epochs=10):
    model.train()

    for epoch in range(num_epochs):
        total_loss = 0.0
        correct = 0
        total = 0

        loop = tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs}', leave=True)

        for batch in loop:
            inputs, labels = batch
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            
            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            # Backward pass
            loss.backward()
            optimizer.step()

            # Calculate accuracy
            _, preds = torch.max(outputs, 1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)
            total_loss += loss.item()

            # Update loop message
            loop.set_postfix(loss=loss.item(), accuracy=100 * correct / total)

        print(f'Epoch {epoch+1}: Loss = {total_loss / len(train_loader):.4f}, Accuracy = {100 * correct / total:.2f}%')

# Train the model
num_epochs = 10
train(model, train_loader, criterion, optimizer, device, num_epochs)


Epoch 1/10: 100%|██████████| 75/75 [01:14<00:00,  1.01it/s, accuracy=73.8, loss=0.886]


Epoch 1: Loss = 0.7956, Accuracy = 73.79%


Epoch 2/10: 100%|██████████| 75/75 [01:11<00:00,  1.04it/s, accuracy=80.2, loss=0.918]


Epoch 2: Loss = 0.6439, Accuracy = 80.21%


Epoch 3/10: 100%|██████████| 75/75 [01:12<00:00,  1.04it/s, accuracy=82.3, loss=0.483]


Epoch 3: Loss = 0.5513, Accuracy = 82.33%


Epoch 4/10: 100%|██████████| 75/75 [01:12<00:00,  1.04it/s, accuracy=84.2, loss=0.888]


Epoch 4: Loss = 0.4911, Accuracy = 84.25%


Epoch 5/10: 100%|██████████| 75/75 [01:12<00:00,  1.04it/s, accuracy=87.4, loss=0.527]


Epoch 5: Loss = 0.4046, Accuracy = 87.42%


Epoch 6/10: 100%|██████████| 75/75 [01:12<00:00,  1.04it/s, accuracy=86, loss=0.185]   


Epoch 6: Loss = 0.4255, Accuracy = 85.96%


Epoch 7/10: 100%|██████████| 75/75 [01:12<00:00,  1.04it/s, accuracy=86.8, loss=0.408]


Epoch 7: Loss = 0.4429, Accuracy = 86.79%


Epoch 8/10: 100%|██████████| 75/75 [01:12<00:00,  1.04it/s, accuracy=87.6, loss=0.576]


Epoch 8: Loss = 0.3971, Accuracy = 87.58%


Epoch 9/10: 100%|██████████| 75/75 [01:11<00:00,  1.04it/s, accuracy=86.5, loss=0.522]


Epoch 9: Loss = 0.4226, Accuracy = 86.46%


Epoch 10/10: 100%|██████████| 75/75 [01:12<00:00,  1.03it/s, accuracy=88.2, loss=0.325]

Epoch 10: Loss = 0.3509, Accuracy = 88.21%





In [None]:
def evaluate(model, data_loader, criterion, device):
    """Evaluate the model"""
    model.eval()

    total_loss = 0.0
    correct = 0
    total = 0

    loop = tqdm(data_loader, desc="Evaluating", leave=True)

    for batch in loop:
        inputs, labels = batch
        inputs, labels = inputs.to(device), labels.to(device)

        # Transpose inputs for transformer
        #inputs = inputs.transpose(1, 2)

        outputs = model(inputs)
        loss = criterion(outputs, labels)

        _, preds = torch.max(outputs, 1)
        correct += (preds == labels).sum().item()
        total += labels.size(0)
        total_loss += loss.item()
        
        loop.set_postfix(loss=loss.item(), accuracy=100 * correct / total)

    accuracy = 100 * correct / total
    avg_loss = total_loss / len(data_loader)
    
    print(f"Validation Loss: {avg_loss:.4f}, Accuracy: {accuracy:.2f}%")

evaluate(model, train_loader, criterion, device)