# Audio Exploration: Characteristics, Distribution, Frequency and Spectrograms:

In [None]:
import librosa
from librosa.display import waveshow, specshow, cmap
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from pathlib import Path, PurePath
import os

import warnings
warnings.filterwarnings("ignore")

In [None]:
DATA_DIR = Path("./dataset/batches")

In [None]:
# random files for research
def get_samples_path(data_directory, n_samples):
    if n_samples == 1:
        return [Path(data_directory, file) for file in np.random.choice(os.listdir(data_directory), n_samples)][0]
    return [Path(data_directory, file) for file in np.random.choice(os.listdir(data_directory), n_samples)]

In [None]:
# get random files for exploration
sample_paths = get_samples_path(data_directory = DATA_DIR, n_samples = 300)

In [None]:
def get_audio_stats(audio_paths):
    audio_stats = []
    for audio in audio_paths:
        signal, sr = librosa.load(audio, sr = None, mono = False)
        # Channel determination
        channels = "stereo" if len(signal.shape) > 1 and signal.shape[0] == 2 else "mono"
        duration = librosa.get_duration(y=signal, sr = sr)
        mean_amplitude = np.mean(np.abs(signal))
        median_amplitude = np.median(np.abs(signal))
        std_amplitude = np.std(signal)
        # Root-Mean-Square-Energy
        rmse = np.mean(librosa.feature.rms(y=signal))
        # Average Zero crossing rate
        zcr = np.mean(librosa.zero_crossings(y=signal, pad = False)) / len(signal)
        
        audio_stats.append( 
            { "file_name": audio.parts[-1],
              "sample_rate" : sr,
              "duration": duration,
              "channels": channels,
              "root_mean_square_energy": rmse,
              "zero_crossing_rate_avg": zcr,
              "mean_amplitude": mean_amplitude,
              "median_amplitude": median_amplitude,
              "std_amplitude": std_amplitude})
    
    return pd.DataFrame(audio_stats)

In [None]:
stats = get_audio_stats(sample_paths)

In [None]:
stats.head()

In [None]:
fig, ax = plt.subplots(2, 2, constrained_layout = True)

plt.subplot(2, 2, 1) 
plt.hist(stats.root_mean_square_energy)
plt.title("Root-Mean-Square-Energy")
plt.ylabel("Count")
plt.xlabel("zcr")

plt.subplot(2, 2, 2) 
plt.hist(stats.zero_crossing_rate_avg)
plt.title("Zero-Crossing-Rate, Avg.")
plt.ylabel("Count")
plt.xlabel("zcr")


plt.subplot(2, 2, 3) 
plt.hist(stats.mean_amplitude)
plt.title("Mean Amplitude")
plt.ylabel("Count")
plt.xlabel("Mean Amplitude, Hz")

plt.subplot(2, 2, 4) 
plt.hist(stats.std_amplitude)
plt.title("Std Amplitude")
plt.ylabel("Count")
plt.xlabel("Median Amplitude, Hz")
plt.show();


 If all the files are from a very similar environment (so we have), it does make sense that the statistics across the audio samples would be quite consistent. This is especially true if the driving conditions, vehicle, and recording equipment were roughly the same across all recordings. **But nevertheless it is the strangest pattern in data in my experience lol, especially working with audio files.
 
FYI: it's essential to be aware that while this consistency can be beneficial for training a model (**as it reduces variability**), it's crucial to ensure that the model generalizes well to other environments or variations when deploying in real-world scenarios...


In [None]:
signal, sr = librosa.load(sample_paths[2], sr = None)

plt.figure(figsize=(10, 4))
librosa.display.waveshow(signal, sr=sr)
plt.title("Waveform of a Segment from the First Audio Sample")
plt.xlabel("Time (s)")
plt.ylabel("Amplitude")
plt.tight_layout()
plt.show()

In [None]:
# Plot the spectrogram
D = librosa.amplitude_to_db(librosa.stft(signal), ref=np.max)

plt.figure(figsize=(10, 4))
librosa.display.specshow(D, sr=sr, x_axis='time', y_axis='log')
plt.colorbar(format='%+2.0f dB')
plt.title('Spectrogram of Audio Sample')
plt.tight_layout()
plt.show()

### Preprocessing (draft):

* Convert stereo to mono (in any case)
* Normalize the audio volume levels
* Extract real and imaginary spectrograms for each audio file
* Silence segmentation (?)

In [None]:
def preprocess_audio(audio_path):
    """
    Preprocessing of an audio file:
    - Converts stereo to mono
    - Normalize the volume
    - Extract the real and imaginary spectrograms
    
    :param audio_path: the path to the audio file to read 
    :return: 
        - Real part of the spectrogram
        - Imaginary part of the spectrogram
    """
    # load audio
    y, sr = librosa.load(audio_path, sr = None, mono = True)
    # Normalize 
    y = y / np.max(np.abs(y))
    # Compute STFT to get the complex spectrogram
    complex_spectrogram = librosa.stft(y)
    
    # Split the complex spectrogram into real and imaginary parts
    real_spectrogram = np.real(complex_spectrogram)
    imag_spectrogram = np.imag(complex_spectrogram)
    
    return sr, real_spectrogram, imag_spectrogram

In [None]:
def detect_silent_segments(audio_path, threshold=0.01, frame_length=2048, hop_length=512):
    """
    Detect silent segments in an audio file. Updated to handle both mono and stereo audio and
    audio lengths that aren't exact multiples of the hop length.
    
    Parameters:
    - audio_path: Path to the audio file.
    - threshold: Amplitude threshold below which audio is considered silent.
    - frame_length: Number of audio samples between successive frames.
    - hop_length: Number of audio samples between starts of consecutive frames.
    
    Returns:
    - silent_segments: List of start and end times for silent segments.
    """
    y, sr = librosa.load(audio_path, sr=None, mono=False)
    
    # If stereo, average the two channels
    if len(y.shape) > 1 and y.shape[0] == 2:
        y = np.mean(y, axis=0)
        
    amplitude = np.abs(y)
    
    # Adjust length of amplitude array for reshaping
    num_segments = len(amplitude) // hop_length
    adjusted_length = num_segments * hop_length
    amplitude_adjusted = amplitude[:adjusted_length]
    
    # Detect silent frames
    silent_frames = np.where(np.mean(amplitude_adjusted.reshape(-1, hop_length), axis=1) < threshold)[0]
    
    silent_segments = []
    
    if len(silent_frames) > 0:
        current_segment = [silent_frames[0]]
        for i in range(1, len(silent_frames)):
            if silent_frames[i] != silent_frames[i-1] + 1:
                current_segment.append(silent_frames[i-1])
                silent_segments.append(current_segment)
                current_segment = [silent_frames[i]]
        current_segment.append(silent_frames[-1])
        silent_segments.append(current_segment)
    
    # Convert frame numbers to time
    silent_segments = [(librosa.frames_to_time(seg[0], sr=sr, hop_length=hop_length),
                        librosa.frames_to_time(seg[1], sr=sr, hop_length=hop_length)) for seg in silent_segments]
    
    return silent_segments

In [None]:
# Detecting silent segments in the provided audio samples using the updated function
from pprint import pprint
silent_segments_info = {}

for file in sample_paths:
    silent_segments = detect_silent_segments(file)
    if silent_segments:
        silent_segments_info[file.parts[-1]] = silent_segments

pprint(silent_segments_info)

As there is a small length of segments it is not mandatory to preprocess and remove them (but it is better to see performance of the model with and without them). Just my thoughts. 

 In general, I'd recommend keeping the entire audio unless the silent segments are significantly long, as these brief pauses could be inherent characteristics of the environment and might provide context during noise cancellation.

In [None]:
sr, real, imag = preprocess_audio(sample_paths[0])

In [None]:
# Display the real and imaginary spectrogram for the first audio sample
plt.figure(figsize=(14, 6))

plt.subplot(1, 2, 1)

librosa.display.specshow(real, sr=sr, x_axis='time', y_axis='linear')
plt.colorbar()
plt.title("Real Part of Spectrogram")

plt.subplot(1, 2, 2)

librosa.display.specshow(imag, sr=sr, x_axis='time', y_axis='linear')
plt.colorbar()
plt.title("Imaginary Part of Spectrogram")
plt.tight_layout()

plt.show()

In [None]:
# Preprocessing class for module 
class AudioPreprocessor:
    def __init__(self, n_fft=2048, hop_length=512, segment_length=44100):
        self.n_fft = n_fft
        self.hop_length = hop_length
        self.segment_length = segment_length

    def get_spectrogram(self, y):
        spectrogram = librosa.amplitude_to_db(np.abs(librosa.stft(y, n_fft=self.n_fft, hop_length=self.hop_length)), ref=np.max)
        return spectrogram

    def normalize_spectrogram(self, spectrogram):
        mean = np.mean(spectrogram)
        std = np.std(spectrogram)
        return (spectrogram - mean) / std

    def pitch_shift(self, y, sr, steps=2):
        return librosa.effects.pitch_shift(y, sr, n_steps=steps)

    def add_noise(self, y, noise_level=0.005):
        noise = np.random.normal(size=y.shape)
        return y + noise_level * noise

    def segment_audio(self, y):
        num_segments = len(y) // self.segment_length
        segments = [y[i * self.segment_length: (i + 1) * self.segment_length] for i in range(num_segments)]
        return segments

    def preprocess(self, audio_path, augment=False, noise_level=0.005, pitch_steps=2):
        y, sr = librosa.load(audio_path, sr=None, mono=True)
        
        if augment:
            y = self.add_noise(y, noise_level)
            y = self.pitch_shift(y, sr, pitch_steps)
        
        segments = self.segment_audio(y)
        spectrograms = [self.get_spectrogram(segment) for segment in segments]
        normalized_spectrograms = [self.normalize_spectrogram(spectrogram) for spectrogram in spectrograms]
        
        return normalized_spectrograms

# Model's architecture: Deep Concolutional Neural Network:

In [None]:
import torch
import torch.nn as nn
from torchsummary import summary


class ANCRN(nn.Module):
    def __init__(self):
        super(ANCRN, self).__init__()
        
        # Encoder part
        self.encoder = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size = 3, stride = 1, padding = 1),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size = 2, stride = 2),
            
            nn.Conv2d(16, 32, kernel_size = 3, stride = 1, padding = 1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size = 2, stride = 2),
            
            nn.Conv2d(32, 64, kernel_size = 3, stride = 1, padding = 1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size = 2, stride = 2),
            
            nn.Conv2d(64, 128, kernel_size = 3, stride = 1, padding = 1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            
            nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        
        # LSTM middle part
        self.lstm = nn.LSTM(input_size = 256, hidden_size = 128, num_layers = 2, batch_first = True)
        
        # Decoder part with transposed conolutions
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(256, 128, kernel_size = 2, stride = 2),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            
            nn.ConvTranspose2d(256, 64, kernel_size=2, stride=2),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            
            nn.ConvTranspose2d(128, 32, kernel_size=2, stride=2),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            
            nn.ConvTranspose2d(64, 16, kernel_size=2, stride=2),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            
            nn.ConvTranspose2d(32, 1, kernel_size=2, stride=2),
            nn.BatchNorm2d(1),
            nn.ReLU()
        )
        
    def forward(self, x):
        # Encoder part
        encoder_outputs = []
        for layer in self.encoder:
            x = layer(x)
            encoder_outputs.append(x)
        
        # LSTM middle part
        print(x.shape)
        x = x.squeeze(-1).permute(0, 2, 1)
        x, _ = self.lstm(x)
        x  = x.permute(0, 2, 1).unsqueeze(-1)
        
        # Decoder with skip connections
        for i, layer in enumerate(self.decoder):
            if isinstance(layer, nn.ConvTranspose2d):
                x = torch.cat([x, encoder_outputs[-(1//3)-1]], dim = 1)
            x = layer(x)
        
        return x

In [None]:
def model_summary(model):
    print("Model Summary")
    print("--------------")
    total_params = 0
    for name, parameter in model.named_parameters():
        num_params = parameter.numel()
        total_params += num_params
        print(f"{name.ljust(25)}: {str(num_params).ljust(12)} parameters")
    print("--------------")
    print(f"Total Parameters: {total_params}")

In [None]:
model_instance = ANCRN()
model_instance.parameters

In [None]:
model_summary(model_instance)

In [None]:
del(model_instance)

In [None]:
def collate_fn(batch):
    # Flatten the batch since each audio file might produce multiple segments (spectrograms)
    all_spectrograms = [spectrogram for segments in batch for spectrogram in segments]
    
    # Find the maximum width (time dimension) among the spectrograms
    max_width = max([item.shape[1] for item in all_spectrograms])
    
    # Create a placeholder tensor filled with zeros, with the shape of the largest spectrogram
    padded_batch = torch.zeros((len(all_spectrograms), all_spectrograms[0].shape[0], max_width))
    
    # Pad each spectrogram and place it in the placeholder tensor
    for i, item in enumerate(all_spectrograms):
        padded_batch[i, :, :item.shape[1]] = torch.tensor(item)

    return torch.unsqueeze(padded_batch, 1)

In [None]:
from torch.utils.data import Dataset, DataLoader, random_split

class AudioDataset(Dataset):
    def __init__(self, audio_files, preprocessor):
        self.audio_files = audio_files
        self.preprocessor = preprocessor
        self.data = [self.preprocessor.preprocess(file) for file in audio_files]
        
    def __len__(self):
        return len(self.audio_files)
    
    def __getitem__(self, idx):
        return self.data[idx]

# Instantiate the AudioPreprocessor
audio_preprocessor = AudioPreprocessor()
dataset = AudioDataset(sample_paths, audio_preprocessor)

In [None]:
train_size = int(0.8 * len(dataset))
val_size = int(0.1 * len(dataset))
test_size = len(dataset) - train_size - val_size

train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

In [None]:
batch_size = 8

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)

In [None]:
import torch.optim as optim
import torch
import torch.nn as nn
import librosa
import numpy as np

model = ANCRN()
criterion = nn.MSELoss()
optimizer = optim.RMSprop(model.parameters(), lr = 0.001) # using by default (without custom hyperparameters)

device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = model.to(device)

In [None]:
num_epochs = 10 

for epoch in range(num_epochs):
    model.train() 
    # Training
    for batch in train_loader:
        inputs = batch.to(device)
        optimizer.zero_grad()  # Zero the parameter gradients
        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, inputs)
        # Backward pass and optimization
        loss.backward()
        optimizer.step()
    
    # Validation
    model.eval()  # Set model to evaluation mode
    total_val_loss = 0
    with torch.no_grad():
        for val_batch in val_loader:
            val_inputs = val_batch.to(device)
            val_outputs = model(val_inputs)
            val_loss = criterion(val_outputs, val_inputs)
            total_val_loss += val_loss.item()
    
    avg_val_loss = total_val_loss / len(val_loader)
    print(f"Epoch [{epoch+1}/{num_epochs}], Training Loss: {loss.item():.4f}, Validation Loss: {avg_val_loss:.4f}")

In [None]:
del(model)
del(loss)
del(optimizer)
torch.cuda.empty_cache()

In [None]:
model.eval()
total_test_loss = 0
with torch.no_grad():
    for test_batch in test_loader:
        test_inputs = test_batch.to(device)
        test_outputs = model(test_inputs)
        test_loss = criterion(test_outputs, test_inputs)
        total_test_loss += test_loss.item()

avg_test_loss = total_test_loss / len(test_loader)
print(f"Average Test Loss: {avg_test_loss:.4f}")