In [3]:
import torch
from torch import nn, optim
import matplotlib.pyplot as plt
import numpy as np
import librosa
import torch.nn.functional as F
from torch.nn import ReLU
from torch.optim.lr_scheduler import ReduceLROnPlateau

import sys, os
sys.path.insert(0, os.path.pardir)
from data_loader import SpectLoader
from config import hdf5_path, spec_minmax_scaler_path, spec_log_transformer_path


In [None]:
def data_actual_dist(pprocessor, train_keys, num_bins=50):
    bin_edges = np.linspace(start=-1.0, stop=1.0, num=num_bins + 1)  # Adjust the range based on your data's expected value range

    bin_counts = np.zeros(num_bins)
    for x_batch, _ in pprocessor.batch_generator(train_keys):
        resized_tensor = F.interpolate(x_batch, size=(128, 512), mode='bilinear', align_corners=False, antialias=True)
        
        data = resized_tensor.numpy() if hasattr(resized_tensor, 'numpy') else resized_tensor

        flattened_data = data.flatten()

        hist, _ = np.histogram(flattened_data, bins=bin_edges)
        bin_counts += hist  

    # Normalize the bin counts to get probabilities (optional)
    bin_counts_normalized = bin_counts / bin_counts.sum()

    plt.figure(figsize=(10, 6))
    plt.bar(
        bin_edges[:-1], 
        bin_counts_normalized, 
        width=np.diff(bin_edges), 
        align="edge", 
        color="blue", 
        alpha=0.7
    )
    plt.title("Value Distribution in x_batch")
    plt.xlabel("Value")
    plt.ylabel("Frequency (Normalized)")
    plt.grid(True)
    plt.show()

    return bin_edges

def plot_spect(sepct):
    plt.figure(figsize=(10, 5))  # Adjust figure size as needed
    plt.imshow(sepct, aspect='auto', cmap='hot', origin='lower')
    plt.colorbar(label='Intensity')  # Optional: Add a colorbar
    plt.title('Spectrogram Heatmap')
    plt.xlabel('Time')
    plt.ylabel('Frequency')
    plt.show()

def plot_several(batch, n_show):
    for i in range(n_show):
        matrix = batch[i].squeeze(0).numpy()
        plot_spect(matrix)




In [9]:
data_path = hdf5_path
spec_minmax_scaler_path = spec_minmax_scaler_path

In [10]:
paths = {
        "data_path": data_path,
        "scaler_path": spec_minmax_scaler_path
    }

pprocessor = SpectLoader(paths, batch_size=32)
train_keys, val_keys, test_keys = pprocessor.split_data()
pprocessor.setup_pipeline(scaler_type="normalizer",load_model=True)

{'feature_range': (-1, 1), 'min_': -100.0, 'scale_': 212.16319274902344}
feature_range
min_
scale_


In [11]:
sample1_shape = pprocessor.spect_data[pprocessor.train_keys[0]]['spectrogram'].shape
input_shape=(1, sample1_shape[0], sample1_shape[1])
print(input_shape)

(1, 256, 646)


In [5]:
print(len(test_keys))

1596


In [6]:
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels,stride = 1, downsample = None):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Sequential(
                        nn.Conv2d(in_channels, out_channels, kernel_size = 3, stride = stride, padding = 1),
                        nn.BatchNorm2d(out_channels),
                        nn.ReLU())
        self.conv2 = nn.Sequential(
                        nn.Conv2d(out_channels, out_channels, kernel_size = 3, stride = stride, padding = 1),
                        nn.BatchNorm2d(out_channels),
        )
        self.downsample = downsample
        self.relu = nn.ReLU()
        self.maxpool = nn.MaxPool2d(kernel_size = 4)
        self.out_channels = out_channels

    def forward(self, x):
        residual = x
        out = self.conv1(x)
        out = self.conv2(out)
        if self.downsample:
            residual = self.downsample(x)
        out += residual
        out = self.relu(out)
        out = self.maxpool(out)
        return out

In [7]:
class SkipNetwork(nn.Module):
    def __init__(self, block, channels, input_shape):
        super(SkipNetwork, self).__init__()
        
        self.channels = channels
        self.conv_layers = nn.ModuleList()
        
        for layer_id, out_channels in enumerate(self.channels):
            conv_layer = self._make_conv_layer(block, out_channels, layer_id, stride = 1)
            self.conv_layers.append(conv_layer)
                
        self.outconv = nn.Sequential(
            nn.Conv2d(in_channels=channels[-1], out_channels=512, kernel_size=1, stride=1, padding=0),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.Dropout(0.4)
        )
        self._to_linear = self._calculate_flatten_size(input_shape)

        self.fc1 = nn.Sequential(
                nn.Linear(self._to_linear, 1024),
                nn.BatchNorm1d(1024),
                nn.ReLU(),
                nn.Dropout(0.5)
            )

        self.head = nn.Linear(1024, 8)


    def _calculate_flatten_size(self, input_shape):
        x = torch.zeros(1, *input_shape)
        for layer in self.conv_layers:
            x = layer(x)
        x = self.outconv(x)
        #x = self.avgpool(x)
        return x.numel()

    def _make_conv_layer(self, block, out_channels, layer_id, stride=1):
        downsample = None
        if layer_id == 0:
            in_channels = 1 # first layer
        else:
            in_channels = self.channels[layer_id-1] 
            
        if stride != 1 or in_channels != out_channels:
            downsample = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride),
                nn.BatchNorm2d(out_channels),
            )
        layers = []
        layers.append(block(in_channels, out_channels, stride=stride, downsample=downsample))

        return nn.Sequential(*layers)

    def forward(self, x):
        for layer in self.conv_layers:
            x = layer(x)
        
        x = self.outconv(x)
        x = x.view(x.size(0), -1)
        
        x = self.fc1(x)
        x = self.head(x) 
        return x

In [None]:
model = BasicModel(input_shape=input_shape)
total_params = sum(p.numel() for p in model.parameters())
print(total_params)

In [None]:
model = SkipNetwork(ResidualBlock, channels=[128,128,256,256], input_shape=input_shape)
total_params = sum(p.numel() for p in model.parameters())
print(total_params)

In [None]:
#model = ConvNetwork(input_shape=input_shape)
model = SkipNetwork(ResidualBlock, channels=[128,128,256,256], input_shape=input_shape)
#model = BasicModel(input_shape=input_shape)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

batch_size = 16

criterion = nn.CrossEntropyLoss()  # Use CrossEntropyLoss for multi-class classification
optimizer = optim.Adam(model.parameters(), lr=0.0001)
scheduler = ReduceLROnPlateau(optimizer, 'min', factor=0.7, patience=1)
print(f"1: Used CUDA memory: {torch.cuda.memory_allocated() / 1e6} MB")


best_val_loss = float('inf')  # Keep track of the best validation loss
best_model_state = None       # To store the state_dict of the best model

for i in range(20):
    train_accs = []
    train_losses = []

    val_accs = []
    val_losses = []
    for x, y in pprocessor.batch_generator(train_keys, batch_size=batch_size):
        optimizer.zero_grad()    

        outputs = model(x)
        loss = criterion(outputs, y)

        loss.backward()
        optimizer.step()

        model.eval()
        with torch.no_grad():
            _, preds = torch.max(outputs, 1)
            train_accuracy = (preds == y).float().mean().item()

        train_accs.append(train_accuracy)
        train_losses.append(loss.item())

        
    model.eval()
    with torch.no_grad():
        for x, y in pprocessor.batch_generator(val_keys, batch_size=batch_size):
            outputs = model(x)
            val_loss = criterion(outputs, y).item()
        
            _, preds = torch.max(outputs, 1)
            val_accuracy = (preds == y).float().mean().item()
            
            val_accs.append(val_accuracy)
            val_losses.append(val_loss)
        
        scheduler.step(np.mean(val_losses))
        print(f"3: Used CUDA memory: {torch.cuda.memory_allocated() / 1e6} MB")
        
        # Track the best model based on validation loss
        mean_val_loss = np.mean(val_losses)
        if mean_val_loss < best_val_loss:
            best_val_loss = mean_val_loss
            best_model_state = model.state_dict() 

    
    print(f'Avg. Training Loss: {np.mean(train_losses):.4f}, Avg. Train Accuracy: {np.mean(train_accs):.4f}, Val Loss: {np.mean(val_losses):.4f}, Val Accuracy: {np.mean(val_accs):.4f},')


# Save the best model at the end of training
if best_model_state is not None:
    torch.save(best_model_state, "best_model.pth")
    print(f"Training complete. Best model saved with Val Loss: {best_val_loss:.4f}")