# pip install -r requirements.txt

# Download and set path to music data
https://storage.googleapis.com/qwasar-public/track-ds/classically_punk_music_genres.tar.gz 
## The data will be in a 'genres' directory. Use it in the Music Processor code
dataset_path = '../genres'

# Import statements for rest of code

In [11]:
import ast
import sys
import os
import pickle
import importlib.util
import queue
import threading
import time

# TENSORFLOW IS REQUIRED EVEN IF NOT ACCESSED
import tensorflow as tf

import numpy as np
import pandas as pd
import librosa
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Input
from tensorflow.keras.utils import to_categorical

# Helpers and Constants

In [2]:
GREEN = '\033[32m'
RED = '\033[31m'
RESET = '\033[0m'

# MUSIC PROCESSOR CODE
## Used to extract audio feature data from the genres dataset
## Skip to Model Training code if data is already processed in `df_output`

In [3]:
# Constants
genres_from_dataset = 'blues classical country disco hiphop jazz metal pop reggae rock'.split()
fundamental_features_cols = [
    'mfcc', 'chroma', 'mel', 'contrast', 'tonnetz'
]

df_output_dir = 'df_output'

class MusicDataProcessor:
    def __init__(
            self, 
            dataset_path: str, 
            file_depth_limit: int, 
            file_output_name: str, 
            extract_raw_only: bool,
            compute_kde: bool,
            compute_ecdf: bool,
            pad_and_truncate: bool
        ):
        self.dataset_path = dataset_path
        self.file_depth_limit = file_depth_limit
        self.file_output_name = file_output_name
        self.genres = genres_from_dataset
        self.data = pd.DataFrame(columns=fundamental_features_cols)
        self.extract_raw_only = extract_raw_only
        self.compute_kde = compute_kde
        self.compute_ecdf = compute_ecdf
        self.pad_and_truncate = pad_and_truncate

        if not os.path.exists(df_output_dir):
            os.makedirs(df_output_dir)
            print(f"Directory '{df_output_dir}' created.")
        else:
            print(f"Directory '{df_output_dir}' already exists.")

    def get_data(self):
        def encode_array(x):
            if isinstance(x, np.ndarray):
                # Convert the array to a JSON string
                return json.dumps(x.tolist())
            return x
        encoded_df = self.data.map(encode_array)
        encoded_df.to_csv(f'{df_output_dir}/{self.file_output_name}.csv', index=False)
        return encoded_df

    def compute_stats_and_measures(self, data):
        # Compute basic statistics
        stats_dict = {
            'mean': np.mean(data),
            'stddev': np.std(data),
            'var': np.var(data),
            'min': np.min(data),
            'max': np.max(data),
            'mad': stats.median_abs_deviation(data),
            'kurtosis': kurtosis(data),
            'skewness': skew(data)
        }
        
        # Compute ECDF
        if self.compute_ecdf:
            sorted_data, ecdf = np.sort(data), np.arange(1, len(data) + 1) / len(data)
            stats_dict['ecdf_values'] = sorted_data.tolist()
            stats_dict['ecdf_proportions'] = ecdf.tolist()
        
        # Compute KDE
        if self.compute_kde:
            kde = stats.gaussian_kde(data)
            stats_dict['kde'] = kde
        
        return stats_dict

    def extract_features(self, file_path, verbose=None):
        try:
            target_rows = 13
            target_columns = 1293
            y, sr = librosa.load(file_path, sr=None)
            n_fft = min(1024, len(y))
            
            def pad_or_truncate(feature, target_columns):
                # Truncate
                if feature.shape[1] > target_columns:
                    return feature[:, :target_columns]
                # Pad
                elif feature.shape[1] < target_columns:
                    pad_width = target_columns - feature.shape[1]
                    return np.pad(feature, ((0, 0), (0, pad_width)), mode='constant')
                return feature

            features = {
                'mfcc': librosa.feature.mfcc(y=y, sr=sr, n_mfcc=target_rows, n_fft=n_fft),
                'chroma': librosa.feature.chroma_stft(y=y, sr=sr, hop_length=n_fft // 4),
                'mel': librosa.feature.melspectrogram(y=y, sr=sr, n_fft=n_fft),
                'contrast': librosa.feature.spectral_contrast(y=y, sr=sr, n_fft=n_fft),
                'tonnetz': librosa.feature.tonnetz(y=y, sr=sr),
                'spectral_bandwidth': librosa.feature.spectral_bandwidth(y=y, sr=sr, n_fft=n_fft),
                'spectral_flatness': librosa.feature.spectral_flatness(y=y),
                'spectral_centroid': librosa.feature.spectral_centroid(y=y, sr=sr, n_fft=n_fft),
                'zero_crossing_rate': librosa.feature.zero_crossing_rate(y=y),
                'harmony': librosa.effects.harmonic(y).reshape(1, -1),  # Reshape to 2D array
                'perceptr': librosa.effects.percussive(y).reshape(1, -1),  # Reshape to 2D array
                'tempo': np.array([librosa.beat.beat_track(y=y, sr=sr)[0]]).reshape(1, 1),  # Ensure shape compatibility
                'spectral_rolloff': librosa.feature.spectral_rolloff(y=y, sr=sr, n_fft=n_fft),
                'rms': librosa.feature.rms(y=y, frame_length=n_fft)
            }
            
            if self.pad_and_truncate:
                for key in features:
                    if len(features[key].shape) == 2:
                        features[key] = pad_or_truncate(features[key], target_columns)
                    else:
                        # Handle 1D features (e.g., tempo, harmony)
                        features[key] = pad_or_truncate(features[key].reshape(1, -1), target_columns)

            
            if self.extract_raw_only is not None and self.extract_raw_only:
                if verbose == 'v':
                    for name, array in features.items():
                        print(f"{name.capitalize()} Shape: {array.shape}")
                return features

            # Compute statistics for each feature
            feature_stats = {}
            for feature_name, feature_array in features.items():
                if feature_array.ndim == 1:  # If the feature is 1D
                    feature_stats.update({
                        f'{feature_name}_mean': np.mean(feature_array),
                        f'{feature_name}_stddev': np.std(feature_array),
                        f'{feature_name}_var': np.var(feature_array),
                        f'{feature_name}_min': np.min(feature_array),
                        f'{feature_name}_max': np.max(feature_array)
                    })
                else:  # If the feature is 2D
                    num_features = feature_array.shape[0]
                    for i in range(num_features):
                        feature_i = feature_array[i, :]
                        feature_stats.update({
                            f'{feature_name}_{i+1}_{key}': value
                            for key, value in self.compute_stats_and_measures(feature_i).items()
                        })

            if verbose == 'v':
                for key, value in feature_stats.items():
                    print(f"EXTRACTING: {key}\n{value}")

            return feature_stats

        except Exception as e:
            print(f"Error processing {file_path}: {e}")
            return None



    def load_data(self):
        all_data = []
        total_files_counter = 0
        for genre in self.genres:
            counter = 0
            genre_dir = os.path.join(self.dataset_path, genre)
            for file in os.listdir(genre_dir):
                # print(f'File number: {total_files_counter}')
                if self.file_depth_limit and counter >= self.file_depth_limit:
                    break
                file_path = os.path.join(genre_dir, file)
                features = self.extract_features(file_path, None)
                if features:
                    # Flatten and unpack the data structure
                    stats_flat = features
                    all_data.append({
                        'filename': file,
                        'genre': genre,
                        **stats_flat
                    })                                      
                    counter += 1
                    total_files_counter += 1

        self.data = pd.DataFrame(all_data)
        self.get_data()

# Run Music Processor to Extract Audio Data

In [None]:
start_time = time.time()

dataset_path = '../genres'  # Replace with the path to your audio dataset
file_depth_limit = None  # Number of files to process per genre
file_output_name = 'full_audio_features'

# Create an instance of the MusicDataProcessor
processor = MusicDataProcessor(
    dataset_path=dataset_path,
    file_output_name=file_output_name, 
    file_depth_limit=file_depth_limit,
    extract_raw_only=True,
    pad_and_truncate=True,
    compute_kde=False,
    compute_ecdf=False
)

# Load data
processor.load_data()

# Output the processed data
print(f"Data has been processed and saved to CSV file: {file_output_name}.")
print(processor.data.head())  # Display the first few rows of the processed data

# End the timer
end_time = time.time()
elapsed_time = end_time - start_time
minutes = int(elapsed_time // 60)
seconds = int(elapsed_time % 60)

print(f"Time taken: {minutes} minutes and {seconds} seconds")

# Model Training Code

In [12]:
def convert_string_to_array(value):
    try: 
        if isinstance(value, str):
            value = value.strip('"').strip("'")
            try:
                value = ast.literal_eval(value)
                if isinstance(value, list):
                    value = np.array(value, dtype=float)
                    return value
                else:
                    print("Warning: Evaluated value is not a list.")
            except (ValueError, SyntaxError) as e:
                print(f"Error evaluating string: {e}")
        else:
            print('Value not detected as str')
        return value
    except Exception as e:
        print("General failure in conversion:")
        print(f'Error: {e}')
        return value

def read_raw_str_csv_and_split_df(csv_path):
    try:
        df_input = pd.read_csv(csv_path)
    except Exception as e:
        print(f"Error reading csv into df: {e}")
        return None, None
    if df_input is not None:
        for col in df_input.columns:
            if col not in ['filename', 'genre']:
                df_input[col] = df_input[col].apply(convert_string_to_array)
        return df_input
    else:
        print('Error: df_input is None')
        return None, None

def prepare_data(X, y):
    try:
        # Step 1: Flatten the features
        X_flattened = X.apply(lambda col: col.apply(lambda x: x.flatten()))
        # Step 2: Convert the DataFrame of flattened arrays into a 2D NumPy array
        X_stacked = np.stack(X_flattened.apply(lambda x: np.concatenate(x), axis=1).to_numpy())
        # Step 3: Scale the features
        scaler = StandardScaler()
        X_scaled = scaler.fit_transform(X_stacked)
        # Step 4: Encode the target labels (y)
        encoder = LabelEncoder()
        y_encoded = encoder.fit_transform(y)

        return X_scaled, y_encoded, encoder, scaler
    except Exception as e:
        print(f"Error in prepare_data: {e}")
        return None, None, None, None
    


def build_and_train_model(X_train, y_train, X_test, y_test, num_features, num_classes):
    model = Sequential([
        Input(shape=(num_features,)),
        Dense(64, activation='relu'),
        Dropout(0.2),
        Dense(num_classes, activation='softmax')
    ])
    
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

    history = model.fit(
        X_train, 
        y_train, 
        epochs=3000, 
        batch_size=128, 
        validation_data=(X_test, y_test),
        verbose=0
    )

    return model, history

def predict(model, encoder, scaler, feature_inputs):
    # Scale the feature inputs directly without converting to DataFrame
    feature_inputs_scaled = scaler.transform([feature_inputs])
    # Make predictions
    predictions = model.predict(feature_inputs_scaled)
    # Decode the predictions to category names
    predicted_class_index = np.argmax(predictions, axis=1)[0]
    predicted_class = encoder.inverse_transform([predicted_class_index])[0]
    
    return predicted_class


def evaluate_all_rows(model, X, y, encoder, scaler):
    correct_count = 0
    total_count = len(X)
    
    for i in range(total_count):
        # Extract feature inputs and true label
        feature_inputs = X[i]  # Use standard NumPy indexing
        true_label = y[i]  # Use standard NumPy indexing
        # Make prediction
        predicted_class = predict(model, encoder, scaler, feature_inputs)
        # Check if the prediction matches the true label
        if predicted_class == true_label:
            # print(f"{GREEN}TRUE: {predicted_class} is {true_label}{RESET}")
            correct_count += 1
        # else:
            # print(f"{RED}FALSE: {predicted_class} is NOT {true_label}{RESET}")

    # Calculate accuracy
    accuracy = (correct_count / total_count) * 100
    incorrect_count = total_count - correct_count
    print(f"Accuracy: {accuracy:.2f}%")

# Run Model Training

In [13]:
full_dataset_stable = '../df_output/v5_5.csv'

try:
    df_extract = read_raw_str_csv_and_split_df(full_dataset_stable)
    
    if df_extract is not None:
        # Split into X and y
        X = df_extract.drop(columns=['filename', 'genre'])
        y = df_extract['genre']
        categories = y.unique()
        num_classes = len(categories)

        # Prepare the data
        X_scaled, y_encoded, encoder, scaler = prepare_data(X, y)
        y_encoded_one_hot = to_categorical(y_encoded, num_classes=num_classes)
        if X_scaled is not None and y_encoded is not None:
            X_train, X_test, y_train, y_test = train_test_split(X_scaled, y_encoded_one_hot, test_size=0.2, random_state=42)
        else:
            print("Error in data preparation")
            raise ValueError("X_scaled or y_encoded is None")
    
        model, history = build_and_train_model(X_train, y_train, X_test, y_test, X_scaled.shape[1], num_classes) 
        
        # Evaluate model
        evaluate_all_rows(model, X_scaled, y, encoder, scaler)
    else:
        print("Error: DataFrame is None")
    
except Exception as e:
    print(f"A general error occurred in main block: {e}")

  X_stacked = np.stack(X_flattened.apply(lambda x: np.concatenate(x), axis=1).to_numpy())


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 13ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 8ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 8ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 8ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 7ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 7ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 8ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 7ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 8ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 8ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 7ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 7ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 7ms/step
[1m