In [4]:
import pandas as pd
import numpy as np
import matplotlib.pylab as plt
import seaborn as sns

import librosa
import librosa.display
import IPython.display as ipd

from glob import glob

import torch
import torch.nn as nn
import torch.nn.functional as nnF
import torchaudio
import torchaudio.functional as F
import torchaudio.transforms as T

from PIL import Image

import os
import soundfile as sf

# Create Audio Chunks

The following section processes audio files by loading, concatenating, and splitting them into 10-second chunks based on their labels. It saves the concatenated audio data as .npy files and the chunks as .wav files for further analysis.

Original files are found in the RadioLabels folder.


In [2]:
audio_paths = glob('RadioLabels/*/*.wav')

In [3]:
def split_path(path):
    split_path = path.split('\\')
    label = split_path[1]
    filename = split_path[2].split('.')[0]
    
    return label, filename

In [4]:
def load_and_concatenate(files):
    music = []
    not_music = []
    global_sample_rate = None
    
    for file in files:
        label, filename = split_path(file)
        waveform, sample_rate = librosa.load(file)
        
        if global_sample_rate is None:
            global_sample_rate = sample_rate
        else:
            assert global_sample_rate == sample_rate
            
        if label == 'Music':
            music.extend(waveform)
        else:
            not_music.extend(waveform)
            
    # Save music and not_music to files
    output_folder = 'concatenated_audio'
    os.makedirs(output_folder, exist_ok=True)
    
    np.save(os.path.join(output_folder, 'music.npy'), np.array(music))
    np.save(os.path.join(output_folder, 'not_music.npy'), np.array(not_music))
    
    return global_sample_rate

In [14]:
def split_into_chunks(audio_data_npy_path, sr, chunk_duration=10):
    audio_data = np.load(audio_data_npy_path)  # Load the audio data from the numpy file
    label = os.path.splitext(os.path.basename(audio_data_npy_path))[0]
    chunk_samples = int(chunk_duration * sr)  # Calculate the number of samples for each 10-second chunk
    chunks = []
    
    # Split audio into chunks of specified duration
    for i in range(0, len(audio_data), chunk_samples):
        chunk = audio_data[i:i + chunk_samples]
        if len(chunk) == chunk_samples:  # Only keep chunks that are exactly 10 seconds
            chunks.append(chunk)
    
    # Create a new folder to store the chunks
    output_folder = os.path.join('10_second_chunks', f'{label}_audio_chunks')
    os.makedirs(output_folder, exist_ok=True)

    # Save each chunk as a separate file
    for idx, chunk in enumerate(chunks):
        chunk_filename = os.path.join(output_folder, f'{label}_chunk_{idx}.wav')
        sf.write(chunk_filename, chunk, sr)

In [6]:
# Load and concatenate all audio files
sample_rate = load_and_concatenate(audio_paths)

In [15]:
# Split concatenated audio into 10-second chunks
os.makedirs('10_second_chunks', exist_ok=True)

split_into_chunks('concatenated_audio\\music.npy', sample_rate)
split_into_chunks('concatenated_audio\\not_music.npy', sample_rate)

# Feature Engineering

In [16]:
# Load the audio chunks

music_audio_chunk_paths = glob('10_second_chunks/music_audio_chunks/*.wav')
not_music_audio_chunk_paths = glob('10_second_chunks/not_music_audio_chunks/*.wav')

In [17]:
# Verify that all audio chunks have the same sample rate of 22050 Hz

SAMPLE_RATE = 22050

def verify_same_sample_rate(audio_paths):
    for audio_path in audio_paths:
        waveform, sr = torchaudio.load(audio_path)
        assert sr == SAMPLE_RATE

verify_same_sample_rate(music_audio_chunk_paths)
verify_same_sample_rate(not_music_audio_chunk_paths)

In [18]:
# Set up the MFCC Transformer 

mfcc_params = {
    'sample_rate': SAMPLE_RATE,
    'n_mfcc': 20,
    'melkwargs': {
        'n_fft': 2048,
        'n_mels': 128,
        'hop_length': 512,
        'mel_scale': 'htk'
    }
}

mfcc_transform = T.MFCC(**mfcc_params)

In [19]:
def compute_and_store_mfccs(audio_chunk_paths, label):
    os.makedirs(f'mfcc/{label}_mfcc', exist_ok=True)
    
    for path in audio_chunk_paths:
        waveform = torchaudio.load(path)[0]
        mfcc = mfcc_transform(waveform)
        output_path = f'mfcc/{label}_mfcc/mfcc_{os.path.basename(path).split('.')[0]}.npy'        
        np.save(output_path, mfcc.numpy())

In [20]:
os.makedirs('mfcc', exist_ok=True)

compute_and_store_mfccs(music_audio_chunk_paths, 'music')   
compute_and_store_mfccs(not_music_audio_chunk_paths, 'not_music')

# Upload Dataset

In [21]:
from torch.utils.data import Dataset

class NpyDataset(Dataset):
    def __init__(self, root_dir):
        self.root_dir = root_dir
        self.classes = sorted(os.listdir(root_dir))
        self.file_paths = []
        self.labels = []

        for label, class_name in enumerate(self.classes):
            class_dir = os.path.join(root_dir, class_name)
            files = [file for file in os.listdir(class_dir) if file.endswith('.npy')]
            for file in files:
                self.file_paths.append(os.path.join(class_dir, file))
                self.labels.append(label)

    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, idx):
        file_path = self.file_paths[idx]
        tensor = torch.from_numpy(np.load(file_path))  # Load .npy file and convert to tensor
        label = self.labels[idx]
        
        if len(tensor.shape) == 2:  # If the tensor is 2D, add a channel dimension
            tensor = tensor.unsqueeze(0)  # Adds a channel dimension at the beginning
        
        return tensor, label

In [24]:
from torch.utils.data import DataLoader, random_split

dataset = NpyDataset(root_dir='mfcc')

# Set split ratios
train_ratio = 0.7
val_ratio = 0.15
test_ratio = 0.15

# Calculate split lengths
train_size = int(train_ratio * len(dataset))
val_size = int(val_ratio * len(dataset))
test_size = len(dataset) - train_size - val_size

# Split dataset into train, validation, and test sets
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

# Create DataLoaders for each subset
batch_size = 32 
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)


# Build Model

In [25]:
import torch.optim as optim

In [6]:
# input_height is the number of frequency bins in the MFCC

class CRNN(nn.Module):
    def __init__(self, input_height, input_channels=1, conv_channels=16, hidden_size=128, num_classes=2):
        super(CRNN, self).__init__()

        # Convolutional Layers
        self.conv1 = nn.Conv2d(input_channels, conv_channels, kernel_size=3, stride=1, padding=1)
        self.bn1 = nn.BatchNorm2d(conv_channels)
        
        self.conv2 = nn.Conv2d(conv_channels, conv_channels * 2, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(conv_channels * 2)
        
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.dropout = nn.Dropout(p=0.05)

        # Calculate output dimensions after convolution and pooling
        conv_output_height = input_height // 4  # Two pooling layers reduce height

        # Calculate LSTM input size
        self.lstm_input_size = (conv_channels * 2) * conv_output_height

        # Initialize LSTM and Fully Connected Layers
        self.lstm = nn.LSTM(input_size=self.lstm_input_size, hidden_size=hidden_size, num_layers=2, batch_first=True)
        self.fc = nn.Linear(hidden_size, out_features=num_classes)

    def forward(self, x):
        # CNN layers
        x = self.conv1(x)
        x = self.bn1(x)
        x = nnF.relu(x)
        x = self.pool(x)
        # x = self.dropout(x)
        
        x = self.conv2(x)
        x = self.bn2(x)
        x = nnF.relu(x)
        x = self.pool(x)
        # x = self.dropout(x)

        # Reshape for LSTM      
        x = x.permute(0, 3, 1, 2)  # (batch, time_steps, channels, freq)
        batch_size, time_steps, channels, freq = x.shape
        
        # Calculate lstm_input_size based on observed shapes
        x = x.contiguous().view(batch_size, time_steps, -1)

        # LSTM
        x, _ = self.lstm(x)

        # Classification
        x = x[:, -1, :]
        x = self.fc(x)
        return x

In [164]:
from torch.optim.lr_scheduler import StepLR

model = CRNN(input_height=20)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

criterion = torch.nn.CrossEntropyLoss()  # for classification
optimizer = optim.Adam(model.parameters(), lr=0.00001)  # or another optimizer
scheduler = StepLR(optimizer, step_size=100, gamma=0.1)

# Train Model

In [None]:
num_epochs = 50

for epoch in range(num_epochs):
    # Training phase
    model.train()  # Set the model to training mode
    train_loss = 0  # Track total training loss for this epoch
    for tensors, labels in train_loader:
        tensors, labels = tensors.to(device), labels.to(device)  # Move to GPU if available
        
        # Forward pass
        outputs = model(tensors)
        loss = criterion(outputs, labels)
        
        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward() 
        optimizer.step()
        
        train_loss += loss.item()  # Accumulate training loss

    # Calculate average training loss for this epoch
    avg_train_loss = train_loss / len(train_loader)

    # Validation phase
    model.eval()  # Set the model to evaluation mode (disables dropout, etc.)
    val_loss = 0  # Track total validation loss for this epoch
    with torch.no_grad():  # Disable gradient calculation for validation
        for val_tensors, val_labels in val_loader:
            val_tensors, val_labels = val_tensors.to(device), val_labels.to(device)
            val_outputs = model(val_tensors)
            val_loss += criterion(val_outputs, val_labels).item()  # Accumulate validation loss

    # Calculate average validation loss for this epoch
    avg_val_loss = val_loss / len(val_loader)

    # Print training and validation loss for the epoch
    print(f"Epoch [{epoch+1}/{num_epochs}], Training Loss: {avg_train_loss:.4f}, Validation Loss: {avg_val_loss:.4f}")

# Save Model

In [140]:
torch.save(model, 'models/model_4.pth')

In [15]:
import exportsd

f = open("model_weights.dat", "wb")
exportsd.save_state_dict(model.to("cpu").state_dict(), f)
f.close()

  model = torch.load('models/model_4.pth')


# Test Model

In [11]:
# Testing phase after training is complete
model.eval()  # Set model to evaluation mode
test_loss = 0
with torch.no_grad():  # No gradients needed for testing
    for test_tensors, test_labels in test_loader:
        test_tensors, test_labels = test_tensors.to(device), test_labels.to(device)
        
        # Forward pass on the test set
        test_outputs = model(test_tensors)
        test_loss += criterion(test_outputs, test_labels).item()

# Calculate average test loss
avg_test_loss = test_loss / len(test_loader)
print(f"Testing Loss: {avg_test_loss:.4f}")

NameError: name 'test_loader' is not defined

In [168]:
from torchmetrics.classification import Precision, Recall, F1Score

num_classes = 2  # Update this to the actual number of classes in your dataset
precision = Precision(task='multiclass', num_classes=num_classes, average='macro').to(device)
recall = Recall(task='multiclass', num_classes=num_classes, average='macro').to(device)
f1 = F1Score(task='multiclass', num_classes=num_classes, average='macro').to(device)

In [None]:
model.eval()

# Assume test_loader is a DataLoader
for inputs, targets in test_loader:
    # Move data to the appropriate device
    inputs, targets = inputs.to(device), targets.to(device)

    # Forward pass
    outputs = model(inputs)  # Shape: (batch_size, num_classes)

    # Use raw logits/probabilities directly
    precision.update(outputs, targets)
    recall.update(outputs, targets)
    f1.update(outputs, targets)

# Compute metrics
precision_value = precision.compute()
recall_value = recall.compute()
f1_value = f1.compute()

print(f"Precision: {precision_value:.4f}")
print(f"Recall: {recall_value:.4f}")
print(f"F1 Score: {f1_value:.4f}")

# Visualize Model

In [None]:
from torchinfo import summary

model = CRNN(input_height=20, input_channels=1, conv_channels=16, hidden_size=128, num_classes=2)
model.to(device)
summary(model, input_size=(1, 1, 20, 431))  # Adjust for batch and input dimensions

In [None]:
from torchviz import make_dot
import matplotlib.pyplot as plt

# Define a dummy input with the correct shape for your model
dummy_input = torch.randn(1, 1, 20, 431).to(device)  # Adjust dimensions as needed for your model

# Forward pass to get the output and visualize the graph
output = model(dummy_input)
graph = make_dot(output, params=dict(model.named_parameters()))

# Save the graph to a file
graph.format = 'png'  # Specify the format (e.g., png, pdf, svg)
graph.attr(dpi='400')
graph.render('crnn_model_graph')  # This will save 'crnn_model_graph.png' in the current directory

# Display the graph using Matplotlib
img = plt.imread('crnn_model_graph.png')
plt.figure(figsize=(10, 10))
plt.imshow(img)
plt.axis('off')
plt.show()

# Save to ONNX

In [None]:
loaded_model = torch.load('models/model_4.pth')

batch = next(iter(train_loader))  # Get the first batch from the DataLoader

# If the DataLoader returns inputs and labels
inputs, labels = batch  # Unpack the batch
dummy_input = inputs.to(device)     # Use the inputs as the dummy_input

torch.onnx.export(loaded_model, dummy_input, "model_4.onnx")

# Save for TorchSharp

In [2]:
import exportsd

In [7]:
model = torch.load('models/model_4.pth')

  model = torch.load('models/model_4.pth')


In [8]:
with open("model_weights.dat", "wb") as f:
    exportsd.save_state_dict(model.to("cpu").state_dict(), f)

In [10]:
model.eval()

CRNN(
  (conv1): Conv2d(1, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (bn1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv2): Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (bn2): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (dropout): Dropout(p=0.05, inplace=False)
  (lstm): LSTM(160, 128, num_layers=2, batch_first=True)
  (fc): Linear(in_features=128, out_features=2, bias=True)
)

In [27]:
model(torch.randn(1, 1, 20, 431))

tensor([[ 0.0355, -0.0725]], grad_fn=<AddmmBackward0>)