## Loading and Extracting the melody

In [None]:
"""
 This script processes all MP3 files in a specified folder, converts them to mono,
 resamples them to a specified sample rate, computes their spectrograms, and saves
 the spectrogram tensors in a specified output folder.                              """

import os
import torch
import torchaudio

def process_mp3_files(folder_path, output_folder, sample_rate=16000, n_fft=1024, hop_length=512):
    os.makedirs(output_folder, exist_ok=True)
    
    for filename in os.listdir(folder_path):
        if filename.lower().endswith('.mp3'):
            file_path = os.path.join(folder_path, filename)
            print(f"Processing: {file_path}")

            # Load audio
            waveform, sr = torchaudio.load(file_path)

            # Resample if necessary
            if sr != sample_rate:
                resampler = torchaudio.transforms.Resample(orig_freq=sr, new_freq=sample_rate)
                waveform = resampler(waveform)

            # Convert to mono
            if waveform.shape[0] > 1:
                waveform = torch.mean(waveform, dim=0, keepdim=True)

            # Compute spectrogram
            spectrogram_transform = torchaudio.transforms.Spectrogram(n_fft=n_fft, hop_length=hop_length)
            spectrogram = spectrogram_transform(waveform)

            # Save tensor
            tensor_filename = os.path.splitext(filename)[0] + "_spec.pt"
            torch.save(spectrogram, os.path.join(output_folder, tensor_filename))
            print(f"Saved: {tensor_filename}")

# Example usage
folder_path = "./mp3_folder"        # Change this to your folder path
output_folder = "./spectrograms"    # Output folder to save tensors
process_mp3_files(folder_path, output_folder)



# Saving projected spectrogram averaged over hoplength

In [None]:
import os
import torch
import torchaudio

def process_mp3_files(folder_path, output_folder, sample_rate=16000, n_fft=1024, hop_length=512):
    os.makedirs(output_folder, exist_ok=True)
    
    for filename in os.listdir(folder_path):
        if filename.lower().endswith('.mp3'):
            file_path = os.path.join(folder_path, filename)
            print(f"Processing: {file_path}")

            # Load audio
            waveform, sr = torchaudio.load(file_path)

            # Resample if necessary
            if sr != sample_rate:
                resampler = torchaudio.transforms.Resample(orig_freq=sr, new_freq=sample_rate)
                waveform = resampler(waveform)

            # Convert to mono
            if waveform.shape[0] > 1:
                waveform = torch.mean(waveform, dim=0, keepdim=True)

            # Compute spectrogram
            spectrogram_transform = torchaudio.transforms.Spectrogram(n_fft=n_fft, hop_length=hop_length)
            spectrogram = spectrogram_transform(waveform)  # Shape: [1, freq_bins, time_steps]

            # Compute average frequency magnitudes across time (mean over time axis)
            avg_frequencies = torch.mean(spectrogram, dim=-1)  # Shape: [1, freq_bins]

            # Save both spectrogram and averaged frequencies
            base_name = os.path.splitext(filename)[0]
            torch.save(spectrogram, os.path.join(output_folder, f"{base_name}_spec.pt"))
            torch.save(avg_frequencies, os.path.join(output_folder, f"{base_name}_avg_freq.pt"))
            print(f"Saved spectrogram and average frequencies for: {filename}")

# Example usage
folder_path = "./Test"        # Change this to your folder path
output_folder = "./spectrograms"    # Output folder to save tensors
process_mp3_files(folder_path, output_folder)


# Creating a basic embedding using CNN

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np

# Example hyperparameters
n_bins = 128
hop_length = 128
embedding_dim = 256
batch_size = 8
num_epochs = 5

# === CNN Model ===
class SpectrogramCNNEmbedder(nn.Module):
    def __init__(self, embedding_dim=256):
        super(SpectrogramCNNEmbedder, self).__init__()
        
        self.cnn = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2),  # 64x64

            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2),  # 32x32
        )

        self.flatten = nn.Flatten()
        self.fc = nn.Linear(64 * (n_bins // 4) * (hop_length // 4), embedding_dim)

    def forward(self, x):
        x = x.unsqueeze(1)  # [B, 1, n_bins, hop_length]
        x = self.cnn(x)
        x = self.flatten(x)
        x = self.fc(x)
        return F.normalize(x, p=2, dim=1)

# === Toy Dataset ===
class PairDataset(Dataset):
    def __init__(self, num_samples=100):
        self.data = []
        for _ in range(num_samples):
            x1 = np.random.rand(n_bins, hop_length).astype(np.float32)
            x2 = np.random.rand(n_bins, hop_length).astype(np.float32)
            label = 1 if np.random.rand() > 0.5 else -1  # half similar, half dissimilar
            self.data.append((x1, x2, label))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        x1, x2, label = self.data[idx]
        return torch.tensor(x1), torch.tensor(x2), torch.tensor(label, dtype=torch.float32)

# === Training Setup ===
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = SpectrogramCNNEmbedder(embedding_dim).to(device)
optimizer = optim.Adam(model.parameters(), lr=1e-4)
loss_fn = nn.CosineEmbeddingLoss()

train_loader = DataLoader(PairDataset(), batch_size=batch_size, shuffle=True)

# === Training Loop ===
for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    for x1, x2, label in train_loader:
        x1 = x1.to(device)
        x2 = x2.to(device)
        label = label.to(device)

        emb1 = model(x1)
        emb2 = model(x2)

        loss = loss_fn(emb1, emb2, label)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        total_loss += loss.item()

    avg_loss = total_loss / len(train_loader)
    print(f"Epoch {epoch+1}/{num_epochs} - Loss: {avg_loss:.4f}")


# Embedding the data

In [None]:
import os
import torch
import random

def create_chunk_pairs(
    spectrogram_folder,
    output_file,
    chunk_width=64,
    num_pairs_per_file=20,
    overlap=True,
    seed=42
):
    random.seed(seed)
    dataset = []

    for filename in os.listdir(spectrogram_folder):
        if filename.endswith('_spec.pt'):
            spec_path = os.path.join(spectrogram_folder, filename)
            spec = torch.load(spec_path)

            # Ensure spectrogram shape is [1, freq_bins, time_frames]
            if spec.dim() == 2:
                spec = spec.unsqueeze(0)

            _, freq_bins, time_len = spec.shape
            max_start = time_len - chunk_width
            if max_start < 1:
                continue  # skip short spectrograms

            for _ in range(num_pairs_per_file):
                # First chunk
                start1 = random.randint(0, max_start)
                chunk1 = spec[:, :, start1:start1 + chunk_width]

                # Second chunk
                if overlap:
                    start2 = random.randint(0, max_start)
                else:
                    # Ensure non-overlapping with start1
                    valid_ranges = list(range(0, start1 - chunk_width + 1)) + list(range(start1 + chunk_width, max_start + 1))
                    if not valid_ranges:
                        continue  # can't find a non-overlapping chunk
                    start2 = random.choice(valid_ranges)
                chunk2 = spec[:, :, start2:start2 + chunk_width]

                dataset.append((chunk1, chunk2))

    # Save dataset
    torch.save(dataset, output_file)
    print(f"Saved {len(dataset)} chunk pairs to {output_file}")

# Example usage
spectrogram_folder = "./spectrograms"
output_dataset = "./bert_dataset.pt"
create_chunk_pairs(spectrogram_folder, output_dataset, chunk_width=64, num_pairs_per_file=30, overlap=True)


# Embedding Audio using predefined models

In [4]:
import torch
import torchaudio
from transformers import Wav2Vec2Processor, Wav2Vec2Model
from transformers import HubertModel, Wav2Vec2Processor
from transformers import WhisperProcessor, WhisperModel

# Load and preprocess the audio (mono, 16 kHz)
def load_audio(file_path, target_sr=16000):
    waveform, sr = torchaudio.load(file_path)
    if sr != target_sr:
        waveform = torchaudio.transforms.Resample(orig_freq=sr, new_freq=target_sr)(waveform)
    return waveform.squeeze(0), target_sr  # [samples], 16000
def get_wav2vec2_embedding(waveform, sampling_rate=16000):
    processor = Wav2Vec2Processor.from_pretrained("facebook/wav2vec2-base-960h")
    model = Wav2Vec2Model.from_pretrained("facebook/wav2vec2-base-960h")

    inputs = processor(waveform, sampling_rate=sampling_rate, return_tensors="pt")
    with torch.no_grad():
        outputs = model(**inputs)
    
    return outputs.last_hidden_state  # shape: [1, time_steps, hidden_dim]
def get_hubert_embedding(waveform, sampling_rate=16000):
    processor = Wav2Vec2Processor.from_pretrained("facebook/hubert-base-ls960")
    model = HubertModel.from_pretrained("facebook/hubert-base-ls960")

    inputs = processor(waveform, sampling_rate=sampling_rate, return_tensors="pt")
    with torch.no_grad():
        outputs = model(**inputs)

    return outputs.last_hidden_state

def get_whisper_embedding(waveform, sampling_rate=16000):
    processor = WhisperProcessor.from_pretrained("openai/whisper-small")
    model = WhisperModel.from_pretrained("openai/whisper-small")

    inputs = processor(waveform, sampling_rate=sampling_rate, return_tensors="pt")
    with torch.no_grad():
        outputs = model.encoder(inputs.input_features)

    return outputs.last_hidden_state  # shape: [1, time_steps, hidden_dim]
file_path = "output_wave.wav"
waveform, sr = load_audio(file_path)

# Get embeddings
wav2vec2_feat = get_wav2vec2_embedding(waveform, sr)
# hubert_feat = get_hubert_embedding(waveform, sr)
whisper_feat = get_whisper_embedding(waveform, sr)

print(wav2vec2_feat.shape, whisper_feat.shape)


Some weights of Wav2Vec2Model were not initialized from the model checkpoint at facebook/wav2vec2-base-960h and are newly initialized: ['masked_spec_embed']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


torch.Size([1, 15652, 768]) torch.Size([1, 1500, 768])


# Training the tokenizer based on pairs and similarity

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from transformers import Wav2Vec2Model, Wav2Vec2Processor
import torchaudio

# Custom Dataset
class AudioPairDataset(Dataset):
    def __init__(self, data_pairs, sampling_rate=16000):
        self.data_pairs = data_pairs
        self.sampling_rate = sampling_rate

    def __len__(self):
        return len(self.data_pairs)

    def __getitem__(self, idx):
        item = self.data_pairs[idx]
        audio1, _ = torchaudio.load(item['path1'])  # path to audio file 1
        audio2, _ = torchaudio.load(item['path2'])  # path to audio file 2
        label = 1 if item['label'] == 1 else -1     # Convert to +1/-1
        return audio1.squeeze(0), audio2.squeeze(0), torch.tensor(label, dtype=torch.float)

# Wav2Vec-based Embedding Model
class Wav2VecEmbedder(nn.Module):
    def __init__(self):
        super(Wav2VecEmbedder, self).__init__()
        self.model = Wav2Vec2Model.from_pretrained("facebook/wav2vec2-base")

    def forward(self, input_values, attention_mask=None):
        output = self.model(input_values=input_values, attention_mask=attention_mask)
        embeddings = output.last_hidden_state.mean(dim=1)
        return F.normalize(embeddings, p=2, dim=1)

# Collate function
def collate_fn(batch):
    audio1, audio2, labels = zip(*batch)
    return list(audio1), list(audio2), torch.stack(labels)

# Helper function
def get_embeddings(processor, model, audios, sampling_rate):
    inputs = processor(audios, sampling_rate=sampling_rate, return_tensors="pt", padding=True)
    with torch.no_grad():
        embeddings = model(input_values=inputs.input_values.to(device),
                           attention_mask=inputs.attention_mask.to(device))
    return embeddings

# Dummy data
data_pairs = [
    {'path1': 'sample1.wav', 'path2': 'sample2.wav', 'label': 1},
    {'path1': 'sample3.wav', 'path2': 'sample4.wav', 'label': 0},
    # Add more entries...
]

# Setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
dataset = AudioPairDataset(data_pairs)
dataloader = DataLoader(dataset, batch_size=2, shuffle=True, collate_fn=collate_fn)

processor = Wav2Vec2Processor.from_pretrained("facebook/wav2vec2-base")
embedder = Wav2VecEmbedder().to(device)
optimizer = torch.optim.Adam(embedder.parameters(), lr=1e-5)
loss_fn = nn.CosineEmbeddingLoss(margin=0.5)

# Training loop
for epoch in range(3):  # Adjust epochs as needed
    embedder.train()
    for audio1, audio2, labels in dataloader:
        emb1 = get_embeddings(processor, embedder, audio1, 16000)
        emb2 = get_embeddings(processor, embedder, audio2, 16000)

        loss = loss_fn(emb1, emb2, labels.to(device))
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        print(f"Loss: {loss.item():.4f}")
    

# Building the Data_pairs based on artist similarity and based on Maqam similarity

In [None]:
import os
from itertools import combinations
from rapidfuzz import fuzz

def extract_artist_genre(filename):
    """
    Extract genre and artist from filenames like:
    'Genre--Artist_Name--Song_Name--Subgenre.mp3'
    """
    base = os.path.splitext(os.path.basename(filename))[0]
    parts = base.split('--')
    if len(parts) < 4:
        return None, None  # Not enough metadata
    genre = parts[0].strip().lower()
    artist = parts[1].strip().lower()
    return artist, genre

def build_separate_similarity_lists(folder_path, artist_threshold=90, genre_threshold=90):
    """
    Reads .mp3 files in the folder and returns:
    - artist_similar_pairs: list of dicts with high artist similarity
    - genre_similar_pairs: list of dicts with high genre similarity
    """
    files = [f for f in os.listdir(folder_path) if f.endswith('.mp3')]
    file_paths = [os.path.join(folder_path, f) for f in files]

    artist_similar_pairs = []
    genre_similar_pairs = []

    for f1, f2 in combinations(file_paths, 2):
        artist1, genre1 = extract_artist_genre(f1)
        artist2, genre2 = extract_artist_genre(f2)

        if not artist1 or not artist2 or not genre1 or not genre2:
            continue

        artist_similarity = fuzz.ratio(artist1, artist2)
        genre_similarity = fuzz.ratio(genre1, genre2)

        if artist_similarity >= artist_threshold:
            artist_similar_pairs.append({
                'path1': f1,
                'path2': f2,
                'similarity': artist_similarity,
                'label': 1
            })

        if genre_similarity >= genre_threshold:
            genre_similar_pairs.append({
                'path1': f1,
                'path2': f2,
                'similarity': genre_similarity,
                'label': 1
            })

    return artist_similar_pairs, genre_similar_pairs



artist_pairs, genre_pairs = build_separate_similarity_lists(r"E:\Dataset\mp3_folder")

print(f"Found {len(artist_pairs)} artist-similar pairs")
print(f"Found {len(genre_pairs)} genre-similar pairs")


Found 822 artist-similar pairs
Found 645 genre-similar pairs


In [None]:
import os
import torch
import torchaudio

def process_mp3_files(folder_path, output_folder, sample_rate=16000, n_fft=1024, hop_length=512):
    os.makedirs(output_folder, exist_ok=True)
    
    for filename in os.listdir(folder_path):
        if filename.lower().endswith('.mp3'):
            file_path = os.path.join(folder_path, filename)
            print(f"Processing: {file_path}")

            # Load audio
            waveform, sr = torchaudio.load(file_path)

            # Resample if necessary
            if sr != sample_rate:
                resampler = torchaudio.transforms.Resample(orig_freq=sr, new_freq=sample_rate)
                waveform = resampler(waveform)

            # Convert to mono
            if waveform.shape[0] > 1:
                waveform = torch.mean(waveform, dim=0, keepdim=True)

            # Compute spectrogram
            spectrogram_transform = torchaudio.transforms.Spectrogram(n_fft=n_fft, hop_length=hop_length)
            spectrogram = spectrogram_transform(waveform)  # Shape: [1, freq_bins, time_steps]

            # Remove channel dimension -> [freq_bins, time_steps]
            spec = spectrogram.squeeze(0)

            # Chunk and compute average frequency per 512-hop window
            chunk_size = 10
            num_hops = spec.shape[1]
            chunks = []

            for start in range(0, num_hops, chunk_size):
                end = start + chunk_size
                if end > num_hops:
                    break  # Skip incomplete chunks (or pad if you prefer)
                chunk = spec[:, start:end]               # [freq_bins, 512]
                avg_chunk = torch.mean(chunk, dim=1)     # [freq_bins]
                chunks.append(avg_chunk)

            if chunks:
                avg_frequencies_per_chunk = torch.stack(chunks)  # Shape: [num_chunks, freq_bins]
            else:
                avg_frequencies_per_chunk = torch.empty((0, spec.shape[0]))

            # Save outputs
            base_name = os.path.splitext(filename)[0]
            torch.save(spectrogram, os.path.join(output_folder, f"{base_name}_spec.pt"))
            torch.save(avg_frequencies_per_chunk, os.path.join(output_folder, f"{base_name}_avg_freq_chunks.pt"))

            print(f"Saved spectrogram     : {spectrogram.shape}")
            print(f"Avg per-chunk shape   : {avg_frequencies_per_chunk.shape}")

# Example usage
folder_path = r"E:\AUB\Research Bakarji\LAYAN\ai-for-arabic-music-main\ai-for-arabic-music-main\Test"
output_folder = r"E:\AUB\Research Bakarji\LAYAN\ai-for-arabic-music-main\ai-for-arabic-music-main\Test"
process_mp3_files(folder_path, output_folder)
