In [22]:
import pandas as pd
import numpy as np
import os
import glob
from pathlib import Path
from tqdm.auto import tqdm
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
import warnings
warnings.filterwarnings('ignore')

In [37]:
class AudioFeatureProcessor:
    
    def __init__(self):
        self.feature_mapping = {
            'pitch_stability': [
                'F0final_sma_stddev', 'F0final_sma_amean',
                'voicingFinalUnclipped_sma_stddev', 'voicingFinalUnclipped_sma_amean'
            ],
            'voice_quality': [
                'jitterLocal_sma_stddev', 'jitterLocal_sma_amean',
                'jitterDDP_sma_stddev', 'jitterDDP_sma_amean',
                'shimmerLocal_sma_stddev', 'shimmerLocal_sma_amean'
            ],
            'harmonicity': [
                'logHNR_sma_stddev', 'logHNR_sma_amean',
                'pcm_fftMag_spectralHarmonicity_sma_stddev', 
                'pcm_fftMag_spectralHarmonicity_sma_amean'
            ],
            'timbre_brightness': [
                'pcm_fftMag_spectralCentroid_sma_stddev', 
                'pcm_fftMag_spectralCentroid_sma_amean',
                'pcm_fftMag_spectralRollOff75.0_sma_stddev',
                'pcm_fftMag_spectralRollOff75.0_sma_amean',
                'pcm_fftMag_spectralRollOff90.0_sma_stddev',
                'pcm_fftMag_spectralRollOff90.0_sma_amean'
            ],
            'timbre_richness': [
                'pcm_fftMag_spectralEntropy_sma_stddev',
                'pcm_fftMag_spectralEntropy_sma_amean',
                'pcm_fftMag_spectralVariance_sma_stddev',
                'pcm_fftMag_spectralVariance_sma_amean'
            ],
            'timbre_texture': [
                'pcm_fftMag_spectralSkewness_sma_stddev',
                'pcm_fftMag_spectralSkewness_sma_amean',
                'pcm_fftMag_spectralKurtosis_sma_stddev',
                'pcm_fftMag_spectralKurtosis_sma_amean',
                'pcm_fftMag_psySharpness_sma_stddev',
                'pcm_fftMag_psySharpness_sma_amean'
            ],
            'dynamics': [
                'pcm_RMSenergy_sma_stddev', 'pcm_RMSenergy_sma_amean',
                'audspec_lengthL1norm_sma_stddev', 'audspec_lengthL1norm_sma_amean',
                'audspecRasta_lengthL1norm_sma_stddev', 'audspecRasta_lengthL1norm_sma_amean'
            ],
            'rhythmic_activity': [
                'pcm_zcr_sma_stddev', 'pcm_zcr_sma_amean',
                'pcm_fftMag_spectralFlux_sma_stddev',
                'pcm_fftMag_spectralFlux_sma_amean'
            ],
            'tonal_balance_low': [
                'pcm_fftMag_fband250-650_sma_stddev',
                'pcm_fftMag_fband250-650_sma_amean',
                'pcm_fftMag_spectralRollOff25.0_sma_stddev',
                'pcm_fftMag_spectralRollOff25.0_sma_amean'
            ],
            'tonal_balance_mid': [
                'pcm_fftMag_fband1000-4000_sma_stddev',
                'pcm_fftMag_fband1000-4000_sma_amean',
                'pcm_fftMag_spectralRollOff50.0_sma_stddev',
                'pcm_fftMag_spectralRollOff50.0_sma_amean'
            ]
        }
        
        # Add MFCC groupings (timbre components)
        for i in range(1, 15):  # MFCC 1-14
            if i <= 4:
                category = 'mfcc_formant'  # Lower MFCCs relate to formants/vowel sounds
            elif i <= 8:
                category = 'mfcc_spectral' # Mid MFCCs capture spectral shape
            else:
                category = 'mfcc_texture'   # Higher MFCCs capture fine texture
            
            if category not in self.feature_mapping:
                self.feature_mapping[category] = []
            
            self.feature_mapping[category].extend([
                f'pcm_fftMag_mfcc_sma[{i}]_stddev',
                f'pcm_fftMag_mfcc_sma[{i}]_amean'
            ])

    def aggregate_temporal_features(self, df, method='comprehensive'):
        if method == 'simple':
            return pd.concat([
                df.mean().add_suffix('_mean'),
                df.std().add_suffix('_std')
            ])
        elif method == 'comprehensive':
            return pd.concat([
                df.mean().add_suffix('_mean'),
                df.std().add_suffix('_std'),
                df.min().add_suffix('_min'),
                df.max().add_suffix('_max'),
                df.quantile(0.25).add_suffix('_q25'),
                df.quantile(0.75).add_suffix('_q75'),
                df.skew().add_suffix('_skew'),
                df.kurtosis().add_suffix('_kurtosis')
            ])
        elif method == 'statistical':
            return pd.concat([
                df.mean().add_suffix('_mean'),
                df.std().add_suffix('_std'),
                (df.max() - df.min()).add_suffix('_range'),
                df.diff().abs().mean().add_suffix('_variation'),
                (df.iloc[-5:].mean() - df.iloc[:5].mean()).add_suffix('_trend')
            ])
        return pd.Series()

    def create_mid_level_features(self, df_aggregated):
        mid_level = {}
        for category, feature_list in self.feature_mapping.items():
            available_features = [f for f in feature_list if any(col.startswith(f) for col in df_aggregated.index)]
            if available_features:
                category_cols = [col for feature in available_features for col in df_aggregated.index if col.startswith(feature)]
                if category_cols:
                    category_data = df_aggregated[category_cols]
                    mid_level[category] = category_data.mean()
        return pd.Series(mid_level)
        
    def create_ultra_compact_features(self, mid_level_features):
        ultra_compact = {}
        
        if 'pitch_stability' in mid_level_features:
            ultra_compact['pitch_consistency'] = mid_level_features['pitch_stability']
            
        voice_features = ['voice_quality', 'harmonicity']
        available_voice = [f for f in voice_features if f in mid_level_features]
        if available_voice:
            ultra_compact['vocal_quality'] = np.mean([mid_level_features[f] for f in available_voice])
            
        timbre_features = ['timbre_brightness', 'timbre_richness', 'timbre_texture', 'mfcc_formant', 'mfcc_spectral', 'mfcc_texture']
        available_timbre = [f for f in timbre_features if f in mid_level_features]
        if available_timbre:
            ultra_compact['timbre_complexity'] = np.mean([mid_level_features[f] for f in available_timbre])
            
        if 'dynamics' in mid_level_features:
            ultra_compact['energy_level'] = mid_level_features['dynamics']
            
        if 'rhythmic_activity' in mid_level_features:
            ultra_compact['rhythmic_intensity'] = mid_level_features['rhythmic_activity']
            
        if 'tonal_balance_low' in mid_level_features and 'tonal_balance_mid' in mid_level_features:
            ultra_compact['tonal_balance'] = mid_level_features['tonal_balance_mid'] - mid_level_features['tonal_balance_low']
            
        return pd.Series(ultra_compact)

    def clean_and_validate_data(self, df):
        # Remove frameTime column
        time_columns = ['frameTime', 'frametime', 'time', 'frame']
        for col in time_columns:
            if col in df.columns:
                df = df.drop(columns=[col])
                break

        df = df.dropna(axis=1, how='all') # Drop empty columns
        df = df.dropna(axis=0, how='all') # Drop empty rows
        df = df.replace([np.inf, -np.inf], np.nan)
        df = df.fillna(df.mean()) # Fill any remaining NaNs
        df = df.fillna(0) # If a whole column was NaN, fill with 0
        return df

    def process_audio_features(self, df, aggregation_method='comprehensive'):

        # Step 1: Aggregate temporal dimension
        df_aggregated = self.aggregate_temporal_features(df, method=aggregation_method)
        
        # Step 2: Create mid-level features
        mid_level_features = self.create_mid_level_features(df_aggregated)
        
        # Step 3: Create ultra-compact representation
        ultra_compact = self.create_ultra_compact_features(mid_level_features)
        
        return {
            'raw_aggregated': df_aggregated,
            'mid_level': mid_level_features,
            'ultra_compact': ultra_compact,
        }
        
    def process_song_directory(self, csv_directory, aggregation_method='comprehensive', file_pattern="*.csv", max_songs=None):
        csv_directory = Path(csv_directory)
        csv_files = list(csv_directory.glob(file_pattern))
        
        if max_songs:
            csv_files = csv_files[:max_songs]
        
        print(f"Found {len(csv_files)} CSV files to process.")
        
        all_results = {
            'raw_aggregated': [], 'mid_level': [], 'ultra_compact': [],
            'song_ids': [], 'failed_songs': []
        }
        
        for csv_file in tqdm(csv_files, desc="Processing Songs"):
            try:
                song_id = csv_file.stem
                df = pd.read_csv(csv_file, sep=';')
                
                df = self.clean_and_validate_data(df)
                
                if df.shape[0] < 5:
                    raise ValueError(f'Too few time frames: {df.shape[0]}')
                if df.shape[1] < 10:
                    raise ValueError(f'Too few features: {df.shape[1]}')
                
                song_results = self.process_audio_features(df, aggregation_method)
                
                all_results['song_ids'].append(song_id)
                all_results['raw_aggregated'].append(song_results['raw_aggregated'])
                all_results['mid_level'].append(song_results['mid_level'])
                all_results['ultra_compact'].append(song_results['ultra_compact'])
                
            except Exception as e:
                all_results['failed_songs'].append({'filename': csv_file.name, 'error': str(e)})
        
        print(f"\nSuccessfully processed {len(all_results['song_ids'])} songs.")
        print(f"Failed to process {len(all_results['failed_songs'])} songs.")

        # Create final DataFrames
        final_results = {
            'failed_songs': all_results['failed_songs']
        }
        
        final_results['song_ids'] = all_results['song_ids']

        if all_results['raw_aggregated']:
            final_results['raw_aggregated'] = pd.DataFrame(
                all_results['raw_aggregated'], 
                index=all_results['song_ids']
            )
            
        if all_results['mid_level']:
            final_results['mid_level'] = pd.DataFrame(
                all_results['mid_level'], 
                index=all_results['song_ids']
            )
            
        if all_results['ultra_compact']:
            final_results['ultra_compact'] = pd.DataFrame(
                all_results['ultra_compact'], 
                index=all_results['song_ids']
            )
        
        return final_results
    
    def save_processed_features(self, results, output_directory):

        output_dir = Path(output_directory)
        output_dir.mkdir(exist_ok=True)
        
        # Save each level of features
        for level in ['raw_aggregated', 'mid_level', 'ultra_compact']:
            if level in results and results[level] is not None:
                output_file = output_dir / f"{level}_features.csv"
                results[level].to_csv(output_file)
                print(f"Saved {level} features to {output_file}")
        
        # Save failed songs log
        if results['failed_songs']:
            failed_df = pd.DataFrame(results['failed_songs'])
            failed_file = output_dir / "failed_songs.csv"
            failed_df.to_csv(failed_file, index=False)
            print(f"Saved failed songs log to {failed_file}")
        
        # Save feature explanations
        explanations_file = output_dir / "feature_explanations.txt"
        with open(explanations_file, 'w') as f:
            f.write("FEATURE EXPLANATIONS\n")
            f.write("=" * 50 + "\n\n")
            
            explanations = self.explain_features()
            f.write("ULTRA-COMPACT FEATURES (Most Important):\n")
            f.write("-" * 40 + "\n")
            for feature, explanation in explanations.items():
                f.write(f"{feature}: {explanation}\n")
            
            f.write(f"\nMID-LEVEL FEATURE CATEGORIES:\n")
            f.write("-" * 40 + "\n")
            for category in self.feature_mapping.keys():
                f.write(f"- {category}\n")
        
        print(f"Saved feature explanations to {explanations_file}")
    
    def clean_and_validate_data(self, df):
        # Clean and validate the input DataFrame

        original_shape = df.shape
        
        # Remove frameTime column if it exists
        time_columns = ['frameTime', 'frametime', 'time', 'frame']
        for col in time_columns:
            if col in df.columns:
                df = df.drop(col, axis=1)
                break
        else:
            # If no explicit time column found, check if first column looks like time
            first_col = df.columns[0]
            if (first_col.lower() in time_columns or 
                df[first_col].dtype in ['int64', 'float64'] and 
                df[first_col].is_monotonic_increasing):
                df = df.drop(first_col, axis=1)
        
        # Remove any completely empty columns
        df = df.dropna(axis=1, how='all')
        
        # Remove any completely empty rows
        df = df.dropna(axis=0, how='all')
        
        # Handle infinite values
        df = df.replace([np.inf, -np.inf], np.nan)
        
        # Fill remaining NaN values with column means (or 0 if all NaN)
        for col in df.columns:
            if df[col].isna().all():
                df[col] = 0
            else:
                df[col] = df[col].fillna(df[col].mean())
        
        return df
    
    def process_audio_features(self, df, aggregation_method='comprehensive'):
        # Full pipeline: aggregate temporal features and create mid-level descriptors

        # Step 1: Aggregate temporal dimension
        df_aggregated = self.aggregate_temporal_features(df, method=aggregation_method)
        
        # Step 2: Create mid-level features
        mid_level_features = self.create_mid_level_features(df_aggregated)
        
        # Step 3: Create ultra-compact representation
        ultra_compact = self.create_ultra_compact_features(mid_level_features)
        
        return {
            'raw_aggregated': df_aggregated,
            'mid_level': mid_level_features,
            'ultra_compact': ultra_compact,
            'feature_mapping': self.feature_mapping
        }

        print(f"Input shape: {df.shape}")
        
        # Step 1: Aggregate temporal dimension
        print("Aggregating temporal features...")
        df_aggregated = self.aggregate_temporal_features(df, method=aggregation_method)
        print(f"After aggregation: {len(df_aggregated)} features")
        
        # Step 2: Create mid-level features
        print("Creating mid-level features...")
        mid_level_features = self.create_mid_level_features(df_aggregated)
        print(f"Mid-level features: {len(mid_level_features)}")
        
        # Step 3: Create ultra-compact representation (optional)
        ultra_compact = self.create_ultra_compact_features(mid_level_features)
        
        return {
            'raw_aggregated': df_aggregated,
            'mid_level': mid_level_features,
            'ultra_compact': ultra_compact,
            'feature_mapping': self.feature_mapping
        }
    
    def create_ultra_compact_features(self, mid_level_features):
        # Create an even more compact representation focusing on music theory concepts

        ultra_compact = {}
        
        # Combine related categories
        if 'pitch_stability' in mid_level_features:
            ultra_compact['pitch_consistency'] = mid_level_features['pitch_stability']
        
        # Combine voice quality aspects
        voice_features = ['voice_quality', 'harmonicity']
        available_voice = [f for f in voice_features if f in mid_level_features]
        if available_voice:
            ultra_compact['vocal_quality'] = np.mean([mid_level_features[f] for f in available_voice])
        
        # Combine timbre aspects
        timbre_features = ['timbre_brightness', 'timbre_richness', 'timbre_texture', 
                          'mfcc_formant', 'mfcc_spectral', 'mfcc_texture']
        available_timbre = [f for f in timbre_features if f in mid_level_features]
        if available_timbre:
            ultra_compact['timbre_complexity'] = np.mean([mid_level_features[f] for f in available_timbre])
        
        # Energy and dynamics
        if 'dynamics' in mid_level_features:
            ultra_compact['energy_level'] = mid_level_features['dynamics']
        
        if 'rhythmic_activity' in mid_level_features:
            ultra_compact['rhythmic_intensity'] = mid_level_features['rhythmic_activity']
        
        # Tonal balance
        tonal_features = ['tonal_balance_low', 'tonal_balance_mid']
        available_tonal = [f for f in tonal_features if f in mid_level_features]
        if len(available_tonal) >= 2:
            ultra_compact['tonal_balance'] = (mid_level_features['tonal_balance_mid'] - 
                                           mid_level_features['tonal_balance_low'])
        
        return pd.Series(ultra_compact)
    
    def explain_features(self):
        # Provide human-readable explanations of the feature categories

        explanations = {
            'pitch_consistency': 'How stable and consistent the pitch/fundamental frequency is',
            'vocal_quality': 'Overall voice quality including clarity and harmonicity',
            'timbre_complexity': 'Richness and complexity of the sound texture/timbre',
            'energy_level': 'Overall loudness and energy of the audio',
            'rhythmic_intensity': 'Amount of rhythmic activity and temporal changes',
            'tonal_balance': 'Balance between low and mid frequencies (negative = bass-heavy, positive = mid-heavy)'
        }
        
        return explanations

In [46]:
# Initialize processor
processor = AudioFeatureProcessor()
    
# Example: Process all CSV files in a directory
csv_directory = "features"
    
print("Processing songs in directory...")
results = processor.process_song_directory(
    csv_directory=csv_directory,
    aggregation_method='comprehensive'
)
    
# Save results
output_directory = "processed_features"
processor.save_processed_features(results, output_directory)

# Display one of the resulting DataFrames
if 'mid_level' in results:
    print("\nDisplaying Mid-Level Features:")
    display(results['mid_level'].head())

Processing songs in directory...
Found 1802 CSV files to process.


Processing Songs:   0%|          | 0/1802 [00:00<?, ?it/s]


Successfully processed 1802 songs.
Failed to process 0 songs.
Saved raw_aggregated features to processed_features/raw_aggregated_features.csv
Saved mid_level features to processed_features/mid_level_features.csv
Saved ultra_compact features to processed_features/ultra_compact_features.csv
Saved feature explanations to processed_features/feature_explanations.txt

Displaying Mid-Level Features:


Unnamed: 0,pitch_stability,voice_quality,harmonicity,timbre_brightness,timbre_richness,timbre_texture,dynamics,rhythmic_activity,tonal_balance_low,tonal_balance_mid,mfcc_formant,mfcc_spectral,mfcc_texture
2004,40.331071,0.086586,4.068164,1056.703654,2033482.0,796.880882,7.981224,6.813384,69.680285,198.517241,7.145349,3.937661,3.862144
648,56.169092,0.291445,3.888904,1595.069921,1382428.0,742.591356,0.812244,7.924149,437.864815,690.486742,6.820626,5.822001,6.009777
224,55.255514,0.299457,2.967502,1252.166116,1726303.0,25.257174,0.980525,5.04732,155.63819,255.484705,2.491134,3.140783,2.586852
960,34.629541,0.404685,2.368314,2061.127189,2470573.0,20.939468,7.856232,2.622839,295.000176,554.294202,4.746452,3.166033,3.538834
903,33.557714,0.241834,0.931947,727.458848,464626.9,10.8344,4.767798,3.381308,133.459834,213.836499,4.735701,3.841645,2.851591


In [43]:
print(f"\nSUMMARY:")
print(f"Successfully processed: {len(results['song_ids'])} songs")
print(f"Failed: {len(results['failed_songs'])} songs")
    
if 'ultra_compact' in results:
    print(f"\nUltra-compact features shape: {results['ultra_compact'].shape}")
    print("Features:", list(results['ultra_compact'].columns))
        
    # Show first few songs
    print(f"\nFirst 3 songs (ultra-compact features):")
    print(results['ultra_compact'].head(3))
    
# Print feature explanations
print(f"\nFEATURE EXPLANATIONS:")
explanations = processor.explain_features()
for feature, explanation in explanations.items():
    print(f"- {feature}: {explanation}")


SUMMARY:
Successfully processed: 1802 songs
Failed: 0 songs

Ultra-compact features shape: (1802, 6)
Features: ['pitch_consistency', 'vocal_quality', 'timbre_complexity', 'energy_level', 'rhythmic_intensity', 'tonal_balance']

First 3 songs (ultra-compact features):
      pitch_consistency  vocal_quality  timbre_complexity  energy_level  \
2004          40.331071       2.077375      339225.152428      7.981224   
648           56.169092       2.090174      230797.460823      0.812244   
224           55.255514       1.633480      287931.485513      0.980525   

      rhythmic_intensity  tonal_balance  
2004            6.813384     128.836956  
648             7.924149     252.621927  
224             5.047320      99.846516  

FEATURE EXPLANATIONS:
- pitch_consistency: How stable and consistent the pitch/fundamental frequency is
- vocal_quality: Overall voice quality including clarity and harmonicity
- timbre_complexity: Richness and complexity of the sound texture/timbre
- energy_lev

In [52]:
# create 3 dataframes
df_low = pd.read_csv('processed_features/raw_aggregated_features.csv')
df_mid = pd.read_csv('processed_features/mid_level_features.csv')
df_high = pd.read_csv('processed_features/ultra_compact_features.csv')

# Load labels and features
df_labels = pd.read_parquet('datasets/DEAM/deam_core.parquet')

# Merge them into the DataFrames
# 'rename' is used to align the song_id column name if it differs
df_master_low = pd.merge(
    df_labels,
    df_master_low.rename(columns={'Unnamed: 0': 'song_id'}),
    on='song_id'
)
df_master_mid = pd.merge(
    df_labels,
    df_mid.rename(columns={'Unnamed: 0': 'song_id'}),
    on='song_id'
)
df_master_high = pd.merge(
    df_labels,
    df_high.rename(columns={'Unnamed: 0': 'song_id'}),
    on='song_id'
)

print("Master DataFrame with raw features created. Shape:", df_master_low.shape)
display(df_master_low.head())
print("Master DataFrame with mid-level features created. Shape:", df_master_mid.shape)
display(df_master_mid.head())
print("Master DataFrame with ultra-compact features created. Shape:", df_master_high.shape)
display(df_master_high.head())

Master DataFrame with raw features created. Shape: (1744, 2102)


Unnamed: 0,song_id,track_name,artist_name,valence_mean,arousal_mean,valence_std,arousal_std,lyrics,track_name_x,artist_name_x,...,pcm_fftMag_mfcc_sma_de[10]_stddev_kurtosis,pcm_fftMag_mfcc_sma_de[10]_amean_kurtosis,pcm_fftMag_mfcc_sma_de[11]_stddev_kurtosis,pcm_fftMag_mfcc_sma_de[11]_amean_kurtosis,pcm_fftMag_mfcc_sma_de[12]_stddev_kurtosis,pcm_fftMag_mfcc_sma_de[12]_amean_kurtosis,pcm_fftMag_mfcc_sma_de[13]_stddev_kurtosis,pcm_fftMag_mfcc_sma_de[13]_amean_kurtosis,pcm_fftMag_mfcc_sma_de[14]_stddev_kurtosis,pcm_fftMag_mfcc_sma_de[14]_amean_kurtosis
0,2,Tonight A Lonely Century,The New Mystikal Troubadours,3.1,3.0,0.94,0.63,,Tonight A Lonely Century,The New Mystikal Troubadours,...,0.393908,0.103912,0.24138,0.066047,0.308101,-0.007233,-0.107877,0.478195,-0.089261,0.486325
1,3,DD Groove,Kevin MacLeod,3.5,3.3,1.75,1.62,,DD Groove,Kevin MacLeod,...,1.632564,2.918847,2.124755,-0.120059,1.315018,3.296956,2.306421,-0.063098,0.846947,0.545423
2,4,Slow Burn,Kevin MacLeod,5.7,5.5,1.42,1.63,,Slow Burn,Kevin MacLeod,...,2.536149,0.140823,3.307728,0.912144,2.000584,-0.105263,1.731926,0.584289,1.943931,0.638256
3,5,Nothing Much,My Bubba & Mi,4.4,5.3,2.01,1.85,,Nothing Much,My Bubba & Mi,...,1.918424,0.564925,1.165674,0.898023,1.63704,0.140036,1.350073,0.462805,1.868504,0.21741
4,7,Hustle,Kevin MacLeod,5.8,6.4,1.47,1.69,,Hustle,Kevin MacLeod,...,0.228862,0.17248,-0.010007,0.055435,0.006095,0.576207,-0.207546,0.701293,1.472712,1.265001


Master DataFrame with mid-level features created. Shape: (1744, 21)


Unnamed: 0,song_id,track_name,artist_name,valence_mean,arousal_mean,valence_std,arousal_std,lyrics,pitch_stability,voice_quality,...,timbre_brightness,timbre_richness,timbre_texture,dynamics,rhythmic_activity,tonal_balance_low,tonal_balance_mid,mfcc_formant,mfcc_spectral,mfcc_texture
0,2,Tonight A Lonely Century,The New Mystikal Troubadours,3.1,3.0,0.94,0.63,,24.142866,0.147134,...,1406.316164,1752169.0,285.0456,8.5657,9.667886,309.188772,532.044889,5.841083,4.414495,4.371717
1,3,DD Groove,Kevin MacLeod,3.5,3.3,1.75,1.62,,18.389873,0.230238,...,299.68675,353123.4,1008.561078,12.2421,14.490791,51.5858,97.137651,7.294479,6.091541,3.961071
2,4,Slow Burn,Kevin MacLeod,5.7,5.5,1.42,1.63,,33.131578,0.200921,...,983.491359,1409604.0,559.251861,0.765598,9.487046,126.49254,263.710771,7.644306,2.884661,3.346964
3,5,Nothing Much,My Bubba & Mi,4.4,5.3,2.01,1.85,,40.401716,0.269176,...,1006.014961,1705751.0,378.14064,1.30919,5.525963,96.05086,210.01283,6.353369,2.412978,1.496175
4,7,Hustle,Kevin MacLeod,5.8,6.4,1.47,1.69,,20.445603,0.289568,...,402.261028,648432.6,870.699173,1.919408,13.539584,52.02389,93.441125,8.994246,4.861684,3.091049


Master DataFrame with ultra-compact features created. Shape: (1744, 14)


Unnamed: 0,song_id,track_name,artist_name,valence_mean,arousal_mean,valence_std,arousal_std,lyrics,pitch_consistency,vocal_quality,timbre_complexity,energy_level,rhythmic_intensity,tonal_balance
0,2,Tonight A Lonely Century,The New Mystikal Troubadours,3.1,3.0,0.94,0.63,,24.142866,1.173289,292312.533469,8.5657,9.667886,222.856116
1,3,DD Groove,Kevin MacLeod,3.5,3.3,1.75,1.62,,18.389873,2.731583,59074.83483,12.2421,14.490791,45.55185
2,4,Slow Burn,Kevin MacLeod,5.7,5.5,1.42,1.63,,33.131578,1.462533,235193.461804,0.765598,9.487046,137.218231
3,5,Nothing Much,My Bubba & Mi,4.4,5.3,2.01,1.85,,40.401716,1.633997,284524.179503,1.30919,5.525963,113.961971
4,7,Hustle,Kevin MacLeod,5.8,6.4,1.47,1.69,,20.445603,2.766098,108287.081674,1.919408,13.539584,41.417235


In [40]:
# bpm calc
import librosa

In [41]:
# helpers
def canonical(b, lo=80, hi=160):
    if pd.isna(b) or b == 0:
        return np.nan
    while b < lo:  b *= 2
    while b >= hi: b /= 2
    return round(b, 2)

def bpm_estimate(fpath: Path):
    try:
        y, sr = librosa.load(fpath, mono=True)
        return round(float(librosa.beat.tempo(y=y, sr=sr)[0]), 2)
    except Exception:
        return np.nan

In [53]:
AUDIO_DIR = Path('audio_files_DEAM/MEMD_audio/')

In [59]:
# Get a unique list of song IDs from the DataFrame index
song_ids = df_master_high['song_id'].unique().tolist()

bpm_results = {}
print(f"Calculating BPM for {len(song_ids)} unique songs...")

for song_id in tqdm(song_ids, desc="BPM Estimation"):
    audio_file_path = AUDIO_DIR / f"{song_id}.mp3"
    if audio_file_path.exists():
        bpm_results[song_id] = bpm_estimate(audio_file_path)
    else:
        bpm_results[song_id] = np.nan

# Convert results to a pandas Series and apply the canonical function
canonical_bpm_series = pd.Series(bpm_results).apply(canonical)

Calculating BPM for 1744 unique songs...


BPM Estimation:   0%|          | 0/1744 [00:00<?, ?it/s]

In [58]:
# List of all DataFrames to update
dataframes_to_update = [df_master_low, df_master_mid, df_master_high]

for df in dataframes_to_update:
    # Use .map() to assign the BPM value based on the DataFrame's index (song_id)
    df['bpm'] = df['song_id'].astype(str).map(canonical_bpm_series)

print("\nBPM column added successfully.")

# Display the head of one of the DataFrames to verify
display(df_master_high[['song_id', 'pitch_consistency', 'bpm']].head())


BPM column added successfully.


Unnamed: 0,pitch_consistency,bpm
0,24.142866,
1,18.389873,
2,33.131578,143.55
3,40.401716,95.7
4,20.445603,86.14


In [56]:
# extract key and mode with madmom (~80% acc)

In [60]:
# --- Compatibility shims for madmom ---
if not hasattr(np, 'float'):
    np.float = float
    np.int = int
import collections, collections.abc
collections.MutableSequence = collections.abc.MutableSequence
# --- End shims ---

from madmom.features.key import CNNKeyRecognitionProcessor, key_prediction_to_label

In [61]:
def estimate_key_with_confidence(path: Path):
    """Estimates key, mode, and confidence from an audio file."""
    try:
        key_proc = CNNKeyRecognitionProcessor()
        probs = key_proc(str(path))
        confidence = float(np.max(probs))
        label = key_prediction_to_label(probs)
        key, mode = label.split()
        return key, mode, confidence
    except Exception:
        return None, None, np.nan

In [68]:
song_ids = df_master_high['song_id'].unique().tolist()

key_results = {}
print(f"Estimating key, mode, and confidence for {len(song_ids)} songs...")

for song_id in tqdm(song_ids, desc="Key Estimation"):
    # Construct file path using the actual song ID
    audio_file_path = AUDIO_DIR / f"{song_id}.mp3" # Adjust extension if needed
    if audio_file_path.exists():
        key_results[str(song_id)] = estimate_key_with_confidence(audio_file_path)
    else:
        key_results[str(song_id)] = (None, None, np.nan)

# Create a DataFrame from the results for easy mapping
results_df = pd.DataFrame.from_dict(
    key_results, 
    orient='index', 
    columns=['key', 'mode', 'key_confidence']
)

# --- Add the new columns to each master DataFrame ---
dataframes_to_update = [df_master_low, df_master_mid, df_master_high]

print("\nAdding key, mode, and confidence columns to all DataFrames...")
for df in dataframes_to_update:
    df['key'] = df['song_id'].astype(str).map(results_df['key'])
    df['mode'] = df['song_id'].astype(str).map(results_df['mode'])
    df['key_confidence'] = df['song_id'].astype(str).map(results_df['key_confidence'])

In [67]:
print("Columns added successfully.")
# Display the head of one DataFrame to verify the fix
display(df_master_high[['song_id', 'bpm', 'key', 'mode', 'key_confidence']].head())

Columns added successfully.


Unnamed: 0,song_id,bpm,key,mode,key_confidence
0,2,,A,minor,0.259305
1,3,,E,minor,0.706457
2,4,143.55,A,minor,0.983184
3,5,95.7,B,major,0.997981
4,7,86.14,C,major,0.778527


In [69]:
df_master_low.to_parquet('df_low.parquet')
df_master_mid.to_parquet('df_mid.parquet')
df_master_high.to_parquet('df_high.parquet')

In [None]:
# feature normalization

In [None]:
from sklearn.preprocessing import StandardScaler
import pandas as pd

# Assume 'results' is the dictionary from your processor
# and you have the ultra_compact DataFrame
ultra_compact_df = results['ultra_compact']

# 1. Initialize the scaler
scaler = StandardScaler()

# 2. Fit the scaler to your data and transform it
#    The output is a NumPy array, so we put it back into a DataFrame
scaled_features_array = scaler.fit_transform(ultra_compact_df)

# 3. Create the new DataFrame with scaled features
df_scaled = pd.DataFrame(
    scaled_features_array, 
    index=ultra_compact_df.index, 
    columns=ultra_compact_df.columns
)

print("Original Features:")
display(ultra_compact_df.head(3))

print("\nScaled Features (Standardized):")
display(df_scaled.head(3))