<a href="https://colab.research.google.com/github/SIDLAD/CS-F425-Project/blob/main/Model1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
RunningInColab = 'google.colab' in str(get_ipython())
if RunningInColab:
    from google.colab import drive
    drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [4]:
import os
import torch
import torchaudio
from torch.utils.data import Dataset, DataLoader
import torchaudio.transforms as transforms
from torch.nn.utils.rnn import pad_sequence


# Custom dynamic range compression function
def dynamic_range_compression(x, threshold=-50, ratio=10):
    """
    Applies dynamic range compression to the input tensor.

    Args:
        x (Tensor): Input tensor of shape (batch_size, num_channels, num_frames).
        threshold (float, optional): Threshold value in decibels (dB). Default is -50 dB.
        ratio (float, optional): Compression ratio. Default is 10.

    Returns:
        Tensor: Compressed input tensor with the same shape as the input.
    """
    # Convert input to decibels
    x_db = 20 * torch.log10(torch.abs(x) + 1e-8)

    # Apply compression
    x_compressed = torch.where(
        x_db > threshold, threshold + (x_db - threshold) / ratio, x_db
    )

    # Convert back to linear scale
    x_compressed = torch.pow(10, x_compressed / 20)

    return x_compressed


# Hyperparameters
TARGET_SAMPLE_RATE = 16000
FREQ_MASK_PARAM = 15
TIME_MASK_PARAM = 25
BATCH_SIZE = 32
TARGET_LENGTH_SECONDS = 4  # Target length in seconds
THRESHOLD = -50  # Dynamic range compression threshold
RATIO = 10  # Dynamic range compression ratio


def collate_fn(batch):
    """
    Pads or truncates sequences to a fixed target length while maintaining consistency

    Args:
        batch (list): A list containing mel spectrogram and label pairs

    Returns:
        tuple: A tuple containing the padded/truncated sequences and labels
    """
    # Sort by decreasing sequence length
    batch.sort(key=lambda x: x[0].shape[1], reverse=True)

    # Target length in samples
    target_length = TARGET_LENGTH_SECONDS * TARGET_SAMPLE_RATE

    padded_seqs = []
    for mel_spec, label in batch:
        # Truncate if longer than target length
        if mel_spec.shape[1] > target_length:
            truncate_amount = mel_spec.shape[1] - target_length
            left_truncate = truncate_amount // 2
            right_truncate = truncate_amount - left_truncate
            padded_spec = mel_spec[:, left_truncate:-right_truncate]
        # Pad if shorter than target length
        elif mel_spec.shape[1] < target_length:
            pad_amount = target_length - mel_spec.shape[1]
            padded_spec = torch.nn.functional.pad(mel_spec, (0, pad_amount))
        else:
            padded_spec = mel_spec

        # Apply dynamic range compression
        padded_spec = dynamic_range_compression(
            padded_spec, threshold=THRESHOLD, ratio=RATIO
        )

        padded_seqs.append(padded_spec)

    # Convert to tensor and stack into a batch
    padded_seqs = torch.stack(padded_seqs)
    labels = torch.tensor([item[1] for item in batch])


class AudioDataset(Dataset):
    def __init__(
        self,
        data_dir,
        target_sample_rate=TARGET_SAMPLE_RATE,
        transform=None,
        target_transform=None,
    ):
        self.data_dir = data_dir
        self.classes = sorted(os.listdir(data_dir))
        self.file_paths = []
        self.targets = []
        self.transform = transform
        self.target_transform = target_transform
        self.target_sample_rate = target_sample_rate

        for i, class_name in enumerate(self.classes):
            class_dir = os.path.join(data_dir, class_name)
            for filename in os.listdir(class_dir):
                filepath = os.path.join(class_dir, filename)
                self.file_paths.append(filepath)
                self.targets.append(i)

    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, idx):
        audio_path = self.file_paths[idx]
        waveform, sample_rate = torchaudio.load(audio_path)

        # Resampling if necessary
        if sample_rate != self.target_sample_rate:
            resampler = transforms.Resample(
                orig_freq=sample_rate, new_freq=self.target_sample_rate
            )
            waveform = resampler(waveform)

        # Apply mel spectrogram transformation
        mel_spec_transform = transforms.MelSpectrogram(
            sample_rate=self.target_sample_rate
        )
        mel_spec = mel_spec_transform(waveform)

        print(mel_spec.shape)
        # Apply other transformations if needed
        if self.transform:
            for transform in self.transform:
                mel_spec = transform(mel_spec)

        label = self.targets[idx]
        if self.target_transform:
            label = self.target_transform(label)


        return mel_spec, label


# Define data directories
if RunningInColab:
    train_dir = "/content/drive/MyDrive/audio_dataset/train"
    val_dir = "/content/drive/MyDrive/audio_dataset/val"
else:
    train_dir = "audio_dataset/train"
    val_dir = "audio_dataset/val"

# Define transformations
transform = [
    transforms.FrequencyMasking(freq_mask_param=FREQ_MASK_PARAM),
    transforms.TimeMasking(time_mask_param=TIME_MASK_PARAM),
]

# Create datasets and dataloaders
train_dataset = AudioDataset(train_dir, transform=transform)
val_dataset = AudioDataset(val_dir)

print(len(train_dataset))
for i in range(len(train_dataset)):
    train_dataset.__getitem__(i)

train_loader = DataLoader(
    train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn
)
val_loader = DataLoader(
    val_dataset, batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate_fn
)

4861




torch.Size([2, 128, 181])
torch.Size([2, 128, 207])
torch.Size([2, 128, 168])
torch.Size([2, 128, 307])
torch.Size([2, 128, 136])
torch.Size([2, 128, 166])
torch.Size([2, 128, 226])
torch.Size([2, 128, 118])
torch.Size([2, 128, 199])
torch.Size([2, 128, 203])
torch.Size([2, 128, 236])
torch.Size([2, 128, 155])
torch.Size([2, 128, 180])
torch.Size([2, 128, 320])
torch.Size([2, 128, 84])
torch.Size([2, 128, 149])
torch.Size([1, 128, 89])
torch.Size([2, 128, 226])
torch.Size([1, 128, 264])
torch.Size([2, 128, 91])
torch.Size([2, 128, 228])
torch.Size([2, 128, 131])
torch.Size([2, 128, 221])
torch.Size([2, 128, 308])
torch.Size([2, 128, 200])
torch.Size([2, 128, 111])
torch.Size([1, 128, 121])
torch.Size([1, 128, 234])
torch.Size([2, 128, 216])
torch.Size([2, 128, 92])
torch.Size([2, 128, 131])
torch.Size([2, 128, 122])
torch.Size([2, 128, 141])
torch.Size([2, 128, 121])
torch.Size([2, 128, 111])
torch.Size([2, 128, 212])
torch.Size([2, 128, 166])
torch.Size([2, 128, 111])
torch.Size([1, 1

KeyboardInterrupt: 

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision.models.resnet import ResNet, BasicBlock

# Hyperparameters
NUM_EPOCHS = 20
LEARNING_RATE = 0.001
NUM_CLASSES = len(train_dataset.classes)  # Replace with the number of classes in your dataset
RESNET_LAYERS = [2, 2, 2, 2]  # Number of layers in each ResNet block

# Assuming you have your data loaders set up
# train_loader, val_loader

# Define the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Define the ResNet model
class ResNetAudio(ResNet):
    def __init__(self, block, layers, num_classes):
        super(ResNetAudio, self).__init__(block, layers)
        self.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.fc = nn.Linear(512 * block.expansion, num_classes)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)

        return x

model = ResNetAudio(BasicBlock, RESNET_LAYERS, NUM_CLASSES)
model = model.to(device)

# Define the loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.RMSprop(model.parameters(), lr=LEARNING_RATE)  # Using RMSprop

# Training function
def train(model, train_loader, optimizer, criterion, epoch):
    model.train()
    running_loss = 0.0

    for inputs, labels in train_loader:
        inputs = inputs.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()

        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    epoch_loss = running_loss / len(train_loader)
    print(f'Epoch {epoch+1}, Training Loss: {epoch_loss:.4f}')

# Validation function
def validate(model, val_loader, criterion):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            loss = criterion(outputs, labels)
            running_loss += loss.item()

            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    epoch_loss = running_loss / len(val_loader)
    accuracy = 100 * correct / total
    print(f'Validation Loss: {epoch_loss:.4f}, Accuracy: {accuracy:.2f}%')
    return accuracy

# Training loop
best_accuracy = 0.0

for epoch in range(NUM_EPOCHS):
    train(model, train_loader, optimizer, criterion, epoch)
    accuracy = validate(model, val_loader, criterion)

    if accuracy > best_accuracy:
        best_accuracy = accuracy
        torch.save(model.state_dict(), 'best_model.pth')

print(f'Best Validation Accuracy: {best_accuracy:.2f}%')

NameError: name 'train_dataset' is not defined