# Part 2: Audio Fingerprinting & Bonus
## AARES Project - Sonic Signature

This notebook covers Exercises 4-5 and the Bonus:
1.  **Audio Fingerprinting**: Implementing spectrograms, peak extraction, and hashing (Shazam algorithm).
2.  **Database**: Building a search database from audio files.
3.  **Conformal Prediction**: Quantifying uncertainty in genre classification.

### Requirements
- `librosa` for audio processing.
- `numba` for optimized peak finding.
- `utils_projet.py` (provided) for some helper functions.


In [1]:
import numpy as np
import librosa
import numba
from numba import jit
import os
import pickle
import glob
import pandas as pd
import matplotlib.pyplot as plt
import sys

# Add src/ or parent dir to path to find utils_projet if needed, 
# although we're copying code here or provided external file
if os.path.exists('../utils_projet.py'):
    sys.path.append('..')
elif os.path.exists('utils_projet.py'):
    sys.path.append('.')

# Try importing utils_projet
try:
    from utils_projet import get_maxima, search_song, ConformalPrediction
except ImportError:
    print("Warning: utils_projet.py not found. Please ensure it is in the same directory or parent directory.")
    # Stubs if missing since we can't fully run without it
    def get_maxima(x): return []
    def search_song(db, hashes): return []
    class ConformalPrediction: pass

from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import StandardScaler

# Configure plots
plt.rcParams['figure.figsize'] = (12, 6)


In [2]:
import os

def get_dataset_path(filename):
    # Try current directory first
    if os.path.exists(os.path.join('dataset', filename)):
        return os.path.join('dataset', filename)
    # Try parent directory (if notebook is in a subdirectory)
    elif os.path.exists(os.path.join('..', 'dataset', filename)):
        return os.path.join('..', 'dataset', filename)
    else:
        # Fallback/Error
        raise FileNotFoundError(f"Could not find {{filename}} in dataset/ or ../dataset/")

def get_songs_dir():
    if os.path.isdir('songs'):
        return 'songs'
    elif os.path.isdir('../songs'):
        return '../songs'
    else:
        raise FileNotFoundError("Could not find songs/ directory")


## Exercise 4: Fingerprinting Implementation

We implement the core components of the audio fingerprinting system:
- `compute_spectrogram`: STFT magnitude squared.
- `get_maxima_in_tz`: Finding local peaks in a Target Zone (TZ).
- `get_hashes`: Generating pairs and hashes.


In [3]:
# Fingerprinting Core (adapted from src/fingerprinting.py)

def compute_spectrogram(x, sr=3000):
    '''Computes the energy spectrogram.'''
    n_fft = 2048
    hop_length = 512
    win_length = 1024
    D = librosa.stft(x, n_fft=n_fft, hop_length=hop_length, win_length=win_length)
    S = np.abs(D)**2
    return S

@jit(nopython=True)
def get_maxima_in_tz(S, maxima, anchor):
    '''Finds top maxima in the Target Zone (300 time x 20 freq) around an anchor.'''
    f_anchor, t_anchor = anchor[0], anchor[1]
    
    t_min = t_anchor + 10 
    t_max = t_anchor + 300
    f_min = max(0, f_anchor - 10)
    f_max = min(S.shape[0], f_anchor + 10)
    
    # 1. Count candidates
    count = 0
    for i in range(len(maxima)):
        f = maxima[i][0]
        t = maxima[i][1]
        if (t >= t_min) and (t <= t_max) and (f >= f_min) and (f <= f_max):
            count += 1
            
    if count == 0:
        return np.empty((0, 2), dtype=np.int64)
        
    # 2. Fill candidates
    candidate_indices = np.empty(count, dtype=np.int64)
    current_idx = 0
    for i in range(len(maxima)):
        f = maxima[i][0]
        t = maxima[i][1]
        if (t >= t_min) and (t <= t_max) and (f >= f_min) and (f <= f_max):
            candidate_indices[current_idx] = i
            current_idx += 1
            
    # 3. Get amplitudes
    candidate_amps = np.empty(count, dtype=np.float64)
    candidate_coords = np.empty((count, 2), dtype=np.int64)
    for k in range(count):
        idx = candidate_indices[k]
        f = int(maxima[idx][0])
        t = int(maxima[idx][1])
        candidate_amps[k] = S[f, t]
        candidate_coords[k, 0] = f
        candidate_coords[k, 1] = t
        
    # 4. Sort and take top 10
    order = np.argsort(candidate_amps)[::-1]
    n_top = min(10, count)
    top_indices = order[:n_top]
    
    result = np.empty((n_top, 2), dtype=np.int64)
    for k in range(n_top):
        result[k] = candidate_coords[top_indices[k]]
        
    return result

def get_hashes(anchor, maxima_in_tz):
    '''Generates hashes: anchor_freq * 1M + point_freq * 1K + delta_time.'''
    hashes = []
    f_anchor, t_anchor = int(anchor[0]), int(anchor[1])
    
    for point in maxima_in_tz:
        f_point, t_point = int(point[0]), int(point[1])
        delta_t = t_point - t_anchor
        hash_val = f_anchor * 1000000 + f_point * 1000 + delta_t
        hashes.append((hash_val, t_anchor))
        
    return hashes

def process_signal(x, sr=3000):
    '''Pipeline: Signal -> Spectrogram -> Maxima -> Hashes'''
    S = compute_spectrogram(x, sr=sr)
    maxima = get_maxima(S) # From utils_projet
    
    hashes_dict = {}
    for i in range(len(maxima)):
        anchor = maxima[i]
        maxima_in_tz = get_maxima_in_tz(S, maxima, anchor)
        new_hashes = get_hashes(anchor, maxima_in_tz)
        for h, t in new_hashes:
            hashes_dict[h] = t
            
    return hashes_dict


## Exercise 5: Database and Search

We build a database of fingerprints from the `songs/` directory and implement search functionality.


In [4]:
# Database Functions (adapted from src/database.py)

class AudioDatabase:
    def __init__(self, db_path='dataset/dataset.pickle'):
        # Fix db_path based on location
        if not os.path.exists('dataset') and os.path.exists('../dataset'):
             self.db_path = '../dataset/dataset.pickle'
        else:
             self.db_path = db_path
             
        self.database = []
        self.song_names = []
    
    def create_database(self, songs_dir_name='songs'):
        songs_dir = get_songs_dir()
        print(f"Creating database from {songs_dir}...")
        
        audio_extensions = ['*.mp3', '*.wav', '*.flac']
        audio_files = []
        for ext in audio_extensions:
            audio_files.extend(glob.glob(os.path.join(songs_dir, ext)))
            
        print(f"Found {len(audio_files)} files.")
        
        for file_path in audio_files:
            try:
                print(f"Processing {file_path}...")
                y, sr = librosa.load(file_path, sr=3000)
                hashes = process_signal(y, sr=sr)
                self.database.append(hashes)
                self.song_names.append(os.path.basename(file_path))
            except Exception as e:
                print(f"Error: {e}")
                
        # Ensure dir exists for db_path
        os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
        
        with open(self.db_path, 'wb') as handle:
            pickle.dump({'hashes': self.database, 'names': self.song_names}, handle)
        print("Database saved.")
        
    def load_database(self):
        if not os.path.exists(self.db_path):
            print(f"Database not found at {self.db_path}.")
            return False
        with open(self.db_path, 'rb') as handle:
            data = pickle.load(handle)
            self.database = data['hashes']
            self.song_names = data['names']
        print(f"Loaded {len(self.database)} songs.")
        return True

    def search_song(self, excerpt_path):
        if not self.database: # Load if empty
            if not self.load_database(): return
            
        print(f"Searching for {excerpt_path}...")
        try:
            y, sr = librosa.load(excerpt_path, sr=3000)
            song_hashes = process_signal(y, sr=sr)
            
            top_indices = search_song(self.database, song_hashes)
            
            print("Top Matches:")
            for i in top_indices:
                if i < len(self.song_names):
                    print(f"- {self.song_names[i]}")
        except Exception as e:
            print(f"Search error: {e}")

# Demo
try:
    db = AudioDatabase()
    # To rebuild: db.create_database()
    if db.load_database():
        if len(db.song_names) > 0:
            songs_dir = get_songs_dir()
            test_song = os.path.join(songs_dir, db.song_names[0])
            db.search_song(test_song)
except Exception as e:
    print(e)


Loaded 13 songs.
Searching for ../songs\Classicals.de - Bizet, Georges - Carmen Prelude - Arranged for Solo Piano.mp3...
Top Matches:
- Classicals.de - Bizet, Georges - Carmen Prelude - Arranged for Solo Piano.mp3
- Classicals.de - Debussy - Suite Bergamasque - 2. Menuet - L.75.mp3
- Classicals.de - Debussy - Suite Bergamasque - 4. Passepied - L.75.mp3


## Bonus: Conformal Prediction

We use Conformal Prediction to produce valid prediction intervals with a guaranteed error rate (epsilon).
We use a KNN classifier on the first 1500 samples of the training set.


In [5]:
# Conformal Prediction Analysis (adapted from src/conformal.py)

def run_conformal_analysis():
    print("--- Conformal Prediction ---")
    
    try:
        train_path = get_dataset_path('spotify_dataset_train.csv')
        test_path = get_dataset_path('spotify_dataset_test.csv')
        
        train_df = pd.read_csv(train_path)
        test_df = pd.read_csv(test_path)
        
        # Extract year if missing
        for df in [train_df, test_df]:
            if 'year' not in df.columns and 'release_date' in df.columns:
                 df['year'] = pd.to_datetime(df['release_date'], errors='coerce').dt.year
        
        # Subset
        train_subset = train_df.iloc[:1500]
        
        feature_cols = ['acousticness', 'danceability', 'energy', 'duration_ms', 
                        'instrumentalness', 'valence', 'popularity', 'tempo', 
                        'liveness', 'loudness', 'speechiness', 'year']
        
        X_train = train_subset[feature_cols].fillna(0).values
        y_train = train_subset['genre'].values
        
        X_test = test_df[feature_cols].fillna(0).values
        x_new = X_test[0] # Test point
        
        # Scale
        scaler = StandardScaler()
        X_train_scaled = scaler.fit_transform(X_train)
        x_new_scaled = scaler.transform(x_new.reshape(1, -1))[0]
        
        # Train KNN
        print("Training KNN...")
        knn = KNeighborsClassifier(n_neighbors=5)
        knn.fit(X_train_scaled, y_train)
        
        # Conformal
        predictor = ConformalPrediction(knn, X_train_scaled, y_train)
        predictor.predict(x_new_scaled)
        
        # Intervals
        print("\nIntervals for test point:")
        for eps in [0.05, 0.1, 0.2]:
            interval = predictor.compute_interval(eps)
            print(f"Eps: {eps} (Confidence: {1-eps:.0%}) -> {interval}")
            
    except Exception as e:
        print(e)

run_conformal_analysis()


--- Conformal Prediction ---
Training KNN...
blues
chanson
classical
country
dance
disco
edm
electro
folk
hip hop
jazz
latin
metal
pop
punk
r&b
rap
reggae
rock
salsa
soul
techno

Intervals for test point:
Eps: 0.05 (Confidence: 95%) -> ['blues', 'classical', 'country', 'dance', 'disco', 'edm', 'electro', 'folk', 'hip hop', 'jazz', 'latin', 'pop', 'r&b', 'rap', 'rock', 'soul']
Eps: 0.1 (Confidence: 90%) -> ['classical', 'country', 'dance', 'edm', 'electro', 'folk', 'hip hop', 'jazz', 'pop', 'r&b', 'rap', 'soul']
Eps: 0.2 (Confidence: 80%) -> ['country', 'electro', 'folk', 'hip hop', 'pop', 'r&b', 'rap', 'soul']
