In [1]:
import csv
import torch
import torchaudio
import os
import numpy as np
import pandas as pd
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torchaudio
import glob
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm
import librosa



In [26]:
# audio_file_path = "/Users/zainhazzouri/projects/Bachelor_Thesis/Data1/Kaggle/music_wav/bartok.wav"
# audio_file_path = "/Users/zainhazzouri/Desktop/egp1.mp3"
audio_file_path = "/Users/zainhazzouri/projects/Bachelor_Thesis/Data1/Kaggle/music_wav/bagpipe.wav"

SAMPLE_RATE = 22050 # sample rate of the audio file
bit_depth = 16 # bit depth of the audio file
hop_length = 512
n_mfcc = 20 # number of MFCCs features
n_fft=1024, # window size
n_mels = 256 # number of mel bands to generate
win_length = None # window length



In [27]:
# Set device
if torch.cuda.is_available():
    device = "cuda"
elif torch.backends.mps.is_built():  # if you have apple silicon mac
    device = "mps"  # if it doesn't work try device = torch.device('mps')
else:
    device = "cpu"
print(f"Using {device}")


Using mps


In [28]:
def preprocess(waveform, target_length=8000, sample_rate=SAMPLE_RATE, n_mfcc=n_mfcc):
    waveform_length = waveform.size(1)

    if waveform_length < target_length:
        num_padding = target_length - waveform_length
        padding = torch.zeros(1, num_padding)
        waveform = torch.cat((waveform, padding), 1)
    elif waveform_length > target_length:
        waveform = waveform[:, :target_length]

    mfcc = torchaudio.transforms.MFCC(sample_rate=sample_rate, n_mfcc=n_mfcc)(waveform)
    return mfcc


In [29]:


class WaveUNet(nn.Module):
    def __init__(self, in_channels=1, num_classes=10, num_features=40):
        super(WaveUNet, self).__init__()

        # Encoding layers
        self.encoder = nn.Sequential(
            nn.Conv2d(in_channels, num_features, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2)),
            nn.ReLU(),
            nn.Conv2d(num_features, num_features * 2, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2)),
            nn.ReLU(),
            nn.Conv2d(num_features * 2, num_features * 4, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2)),
            nn.ReLU(),
        )

        # Decoding layers
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(num_features * 4, num_features * 2, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2), output_padding=(1, 1)),
            nn.ReLU(),
            nn.ConvTranspose2d(num_features * 2, num_features, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2), output_padding=(1, 1)),
            nn.ReLU(),
            nn.ConvTranspose2d(num_features, num_features, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2), output_padding=(1, 1)),
            nn.ReLU(),
        )

        # Global average pooling and fully connected layer
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Linear(num_features, num_classes)

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        x = self.avg_pool(x)
        x = x.view(x.size(0), -1)  # Flatten the tensor
        x = self.fc(x)
        return x

In [30]:
model = WaveUNet().to(device)
model.load_state_dict(torch.load("waveunet_speech_music_discrimination.pth"))
model.eval()


WaveUNet(
  (encoder): Sequential(
    (0): Conv2d(1, 40, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2))
    (1): ReLU()
    (2): Conv2d(40, 80, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2))
    (3): ReLU()
    (4): Conv2d(80, 160, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2))
    (5): ReLU()
  )
  (decoder): Sequential(
    (0): ConvTranspose2d(160, 80, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2), output_padding=(1, 1))
    (1): ReLU()
    (2): ConvTranspose2d(80, 40, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2), output_padding=(1, 1))
    (3): ReLU()
    (4): ConvTranspose2d(40, 40, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2), output_padding=(1, 1))
    (5): ReLU()
  )
  (avg_pool): AdaptiveAvgPool2d(output_size=1)
  (fc): Linear(in_features=40, out_features=10, bias=True)
)

In [31]:
def split_waveform(waveform, sample_rate):
    segment_length = sample_rate
    num_segments = waveform.shape[-1] // segment_length
    segments = []

    for i in range(num_segments):
        start = i * segment_length
        end = start + segment_length
        segments.append(waveform[:, start:end])

    return segments

# TODO maybe only use Librosa for all types of files ,, this functions is causing problems
def classify_audio_file_segments(audio_file_path):
    file_ext = os.path.splitext(audio_file_path)[1].lower()

    if file_ext == '.mp3':
        waveform, sample_rate = librosa.load(audio_file_path, sr=SAMPLE_RATE)
        waveform = torch.from_numpy(waveform).unsqueeze(0)
    else:
        waveform, sample_rate = torchaudio.load(audio_file_path)

    segments = split_waveform(waveform, sample_rate)

    segment_classifications = []

    for segment in segments:
        mfcc = preprocess(segment, target_length=sample_rate)
        mfcc = mfcc.to(device).unsqueeze(0)
        output = model(mfcc)
        _, predicted_class = torch.max(output, 1)
        segment_classifications.append(predicted_class.item())

    return segment_classifications



In [32]:

def format_time(seconds):
    hours, remainder = divmod(seconds, 3600)
    minutes, seconds = divmod(remainder, 60)
    return f"{hours:02d}:{minutes:02d}:{seconds:02d}"


def generate_classification_table(classification_results, segment_duration=1):
    table = []
    current_label = classification_results[0]
    start_time = 0

    for i, label in enumerate(classification_results[1:], 1):
        if label != current_label:
            table.append([format_time(start_time), format_time(i * segment_duration), current_label])
            start_time = i * segment_duration
            current_label = label

    table.append([format_time(start_time), format_time(len(classification_results) * segment_duration), current_label])

    return table


def save_classification_table_to_csv(table, output_file):
    with open(output_file, 'w', newline='') as csvfile:
        csvwriter = csv.writer(csvfile)
        csvwriter.writerow(['Start Time', 'End Time', 'Class'])
        for row in table:
            csvwriter.writerow(row)

# print(audio_file_path)
classification_results = classify_audio_file_segments(audio_file_path)
table = generate_classification_table(classification_results)

# Save the table to a CSV file
save_classification_table_to_csv(table, "classification_table.csv")

# Print the table
print("Start Time | End Time | Class")
print("-" * 28)
for row in table:
    print(f"{row[0]} | {row[1]} | {row[2]}")


Start Time | End Time | Class
----------------------------
00:00:00 | 00:00:01 | 1
00:00:01 | 00:00:30 | 0


In [33]:
classifications = classify_audio_file_segments(audio_file_path)
table = generate_classification_table(classifications)
print(table)



[['00:00:00', '00:00:01', 1], ['00:00:01', '00:00:30', 0]]
