In [14]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.preprocessing import StandardScaler
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt
from pathlib import Path

def load_features_and_labels(features_path, labels_path, segment_info_path):
    """
    Load features and labels from specified paths.
    
    Args:
        features_path (str): Path to the .npy file containing features
        labels_path (str): Path to the .npy file containing labels
        segment_info_path (str): Path to the CSV file containing segment information
    
    Returns:
        X (np.array): Feature matrix
        y (np.array): Labels
        segment_info (pd.DataFrame): DataFrame containing segment information
    """
    # Load features and labels
    X = np.load(features_path)
    y = np.load(labels_path)
    
    # Load segment info
    segment_info = pd.read_csv(segment_info_path)
    
    # Verify that the number of samples matches
    assert len(X) == len(y) == len(segment_info), \
        "Mismatch in number of samples between features, labels, and segment info"
    
    return X, y, segment_info

In [15]:
def stratified_participant_split(X, y, segment_info, test_size=0.2, random_state=42):
    """
    Split data ensuring:
    1. No augmented versions of test set samples in training set
    2. Each participant has samples of each instrument type they recorded in training set
    3. Stratification by instrument label
    
    Args:
        X (np.array): Feature matrix
        y (np.array): Labels
        segment_info (pd.DataFrame): DataFrame containing segment information
        test_size (float): Proportion of data for testing
        random_state (int): Random seed
    
    Returns:
        X_train, X_test, y_train, y_test, train_indices, test_indices
    """
    # Check if we're dealing with augmented data by looking at segment paths
    is_augmented = segment_info['segment_path'].str.contains('_aug\\d+\\.wav').any()
    
    if is_augmented:
        # For augmented data, extract original segment names
        segment_info['original_name'] = segment_info['segment_path'].apply(
            lambda x: x.split('_aug')[0] + '.wav' if '_aug' in x else x
        )
        
        # Get unique original segments (non-augmented)
        original_segments = segment_info[~segment_info['segment_path'].str.contains('_aug\\d+\\.wav')]
        
        # First, ensure at least one sample from each participant in training
        train_segments = []
        for participant in original_segments['participant_id'].unique():
            participant_mask = original_segments['participant_id'] == participant
            participant_indices = original_segments[participant_mask].index
            
            # For each instrument type the participant has
            participant_instruments = np.unique(y[participant_indices])
            for instrument in participant_instruments:
                participant_instrument_segments = original_segments[
                    participant_mask & (y[original_segments.index] == instrument)
                ]['segment_path'].values
                
                if len(participant_instrument_segments) > 0:
                    train_segments.append(np.random.choice(participant_instrument_segments))
        
        # Then split remaining original segments
        remaining_segments = list(set(original_segments['segment_path']) - set(train_segments))
        additional_train, test_segments = train_test_split(
            remaining_segments,
            test_size=test_size,
            random_state=random_state,
            stratify=y[original_segments[original_segments['segment_path'].isin(remaining_segments)].index]
        )
        train_segments.extend(additional_train)
        
        # Get indices for train and test (including augmented versions for train)
        train_indices = segment_info[
            (segment_info['segment_path'].isin(train_segments)) |
            (segment_info['original_name'].isin(train_segments))
        ].index
        
        # For test, only use original (non-augmented) segments
        test_indices = segment_info[
            segment_info['segment_path'].isin(test_segments)
        ].index
        
    else:
        # For non-augmented data, ensure participant representation directly
        train_indices = []
        
        # First, ensure each participant has at least one sample of each instrument they recorded
        for participant in segment_info['participant_id'].unique():
            participant_mask = segment_info['participant_id'] == participant
            participant_indices = segment_info[participant_mask].index
            
            # For each instrument type the participant has
            participant_instruments = np.unique(y[participant_indices])
            for instrument in participant_instruments:
                participant_instrument_indices = segment_info[
                    participant_mask & (y == instrument)
                ].index
                if len(participant_instrument_indices) > 0:
                    train_indices.append(np.random.choice(participant_instrument_indices))
        
        # Split remaining indices
        remaining_indices = list(set(range(len(y))) - set(train_indices))
        additional_train, test_indices = train_test_split(
            remaining_indices,
            test_size=test_size,
            random_state=random_state,
            stratify=y[remaining_indices]
        )
        train_indices.extend(additional_train)
    
    # Convert to arrays
    train_indices = np.array(train_indices)
    test_indices = np.array(test_indices)
    
    # Verify no overlap between train and test
    assert len(set(train_indices) & set(test_indices)) == 0, \
        "Overlap found between train and test sets"
    
    # Print split statistics
    print(f"\nSplit Statistics:")
    print(f"Training set size: {len(train_indices)}")
    print(f"Test set size: {len(test_indices)}")
    print("\nLabel distribution in training set:")
    print(pd.Series(y[train_indices]).value_counts())
    print("\nLabel distribution in test set:")
    print(pd.Series(y[test_indices]).value_counts())
    print("\nParticipant distribution in training set:")
    print(segment_info.iloc[train_indices]['participant_id'].value_counts().head())
    print("\nParticipant distribution in test set:")
    print(segment_info.iloc[test_indices]['participant_id'].value_counts().head())
    
    return (X[train_indices], X[test_indices], 
            y[train_indices], y[test_indices],
            train_indices, test_indices)

In [16]:
def train_evaluate_knn(X_train, X_test, y_train, y_test, cv=5):
    """
    Train KNN model with hyperparameter tuning and evaluate it.
    
    Args:
        X_train, X_test, y_train, y_test: Training and test data
        cv (int): Number of cross-validation folds
    
    Returns:
        best_model: Trained model with best parameters
        best_params: Best hyperparameters
        cv_results: Cross-validation results
    """
    # Scale the features
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    
    # Encode labels
    from sklearn.preprocessing import LabelEncoder
    label_encoder = LabelEncoder()
    y_train_encoded = label_encoder.fit_transform(y_train)
    y_test_encoded = label_encoder.transform(y_test)
    
    # Store label mapping for later use
    label_mapping = dict(zip(label_encoder.classes_, label_encoder.transform(label_encoder.classes_)))
    print("\nLabel mapping:", label_mapping)
    
    # Define parameter grid
    param_grid = {
        'n_neighbors': [3, 5, 7, 9, 11, 13, 15],
        'weights': ['uniform', 'distance'],
        'metric': ['euclidean', 'manhattan', 'minkowski']
    }
    
    # Initialize and train model with GridSearchCV
    knn = KNeighborsClassifier()
    
    # Ensure all classes are present in each fold
    from sklearn.model_selection import StratifiedKFold
    cv_splitter = StratifiedKFold(n_splits=cv, shuffle=True, random_state=42)
    
    grid_search = GridSearchCV(
        knn, 
        param_grid, 
        cv=cv_splitter,
        scoring='accuracy', 
        n_jobs=-1,
        error_score=0.0  # Return score of 0 for failed fits
    )
    
    grid_search.fit(X_train_scaled, y_train_encoded)
    
    # Get best model and parameters
    best_model = grid_search.best_estimator_
    best_params = grid_search.best_params_
    
    # Evaluate on test set
    y_pred_encoded = best_model.predict(X_test_scaled)
    
    # Convert predictions back to original labels for reporting
    y_pred = label_encoder.inverse_transform(y_pred_encoded)
    
    # Print results
    print("\nBest parameters:", best_params)
    print("\nClassification Report:")
    print(classification_report(y_test, y_pred))
    
    # Plot confusion matrix
    plt.figure(figsize=(8, 6))
    cm = confusion_matrix(y_test, y_pred)
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=label_encoder.classes_,
                yticklabels=label_encoder.classes_)
    plt.title('Confusion Matrix')
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')
    plt.tight_layout()
    
    # Add label encoder to the results
    best_model.label_encoder_ = label_encoder
    
    return best_model, best_params, grid_search.cv_results_

In [17]:

feature_sets = [
        {
            'name': 'mfcc',
            'features_path': '../../extracted_features/features/mfcc_features.npy',
            'labels_path': '../../extracted_features/labels/mfcc_labels.npy',
            'segment_info_path': '../../segment_info/segment_info.csv'
        },
        {
            'name': 'mfcc_env',
            'features_path': '../../extracted_features/features/mfcc_env_features.npy',
            'labels_path': '../../extracted_features/labels/mfcc_env_labels.npy',
            'segment_info_path': '../../segment_info/segment_info.csv'
        },
        {
            'name': 'mfcc_extracted',
            'features_path': '../../extracted_features/features/mfcc_extracted_features.npy',
            'labels_path': '../../extracted_features/labels/mfcc_extracted_labels.npy',
            'segment_info_path': '../../segment_info/segment_info.csv'
        },
        {
            'name': 'mfcc_aug',
            'features_path': '../../extracted_features/features/mfcc_features_aug.npy',
            'labels_path': '../../extracted_features/labels/mfcc_labels_aug.npy',
            'segment_info_path': '../../segment_info/augmented_segment_info.csv'
        },
        {
            'name': 'mfcc_env_aug',
            'features_path': '../../extracted_features/features/mfcc_env_aug_features.npy',
            'labels_path': '../../extracted_features/labels/mfcc_env_aug_labels.npy',
            'segment_info_path': '../../segment_info/augmented_segment_info.csv'
        },
        {
            'name': 'mfcc_extracted_aug',
            'features_path': '../../extracted_features/features/mfcc_extracted_aug_features.npy',
            'labels_path': '../../extracted_features/labels/mfcc_extracted_aug_labels.npy',
            'segment_info_path': '../../segment_info/augmented_segment_info.csv'
        }
]
    
results = {}
    
for feature_set in feature_sets:
    print(f"\nProcessing {feature_set['name']} features...")
        
        # Load data
    X, y, segment_info = load_features_and_labels(
            feature_set['features_path'],
            feature_set['labels_path'],
            feature_set['segment_info_path']
    )
        
        # Split data
    X_train, X_test, y_train, y_test, train_idx, test_idx = stratified_participant_split(X, y, segment_info)
        
        # Train and evaluate
    best_model, best_params, cv_results = train_evaluate_knn(
        X_train, X_test, y_train, y_test
    )
        
        # Store results
    results[feature_set['name']] = {
            'best_model': best_model,
            'best_params': best_params,
            'cv_results': cv_results,
            'test_indices': test_idx
    }
        
        # Save confusion matrix plot
    plt.savefig(f"../../visualization/confusion_matrix_{feature_set['name']}.png")
    plt.close()


Processing mfcc features...

Split Statistics:
Training set size: 4617
Test set size: 1097

Label distribution in training set:
kd     1434
hhc    1212
sd     1158
hho     813
Name: count, dtype: int64

Label distribution in test set:
kd     342
hhc    286
sd     273
hho    196
Name: count, dtype: int64

Participant distribution in training set:
participant_id
P18    239
P22    237
P25    214
P21    196
P28    176
Name: count, dtype: int64

Participant distribution in test set:
participant_id
P18    68
P22    56
P25    52
P28    46
P5     36
Name: count, dtype: int64

Label mapping: {'hhc': 0, 'hho': 1, 'kd': 2, 'sd': 3}

Best parameters: {'metric': 'manhattan', 'n_neighbors': 5, 'weights': 'distance'}

Classification Report:
              precision    recall  f1-score   support

         hhc       0.82      0.81      0.82       286
         hho       0.92      0.91      0.92       196
          kd       0.80      0.84      0.82       342
          sd       0.86      0.82      0.84   

In [24]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.preprocessing import StandardScaler
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt
from pathlib import Path

def load_features_and_labels(features_path, labels_path, segment_info_path):
    """
    Load features and labels from specified paths.
    
    Args:
        features_path (str): Path to the .npy file containing features
        labels_path (str): Path to the .npy file containing labels
        segment_info_path (str): Path to the CSV file containing segment information
    
    Returns:
        X (np.array): Feature matrix
        y (np.array): Labels
        segment_info (pd.DataFrame): DataFrame containing segment information
    """
    # Load features and labels
    X = np.load(features_path)
    y = np.load(labels_path)
    
    # Load segment info
    segment_info = pd.read_csv(segment_info_path)
    
    # Verify that the number of samples matches
    assert len(X) == len(y) == len(segment_info), \
        "Mismatch in number of samples between features, labels, and segment info"
    
    return X, y, segment_info

def participant_independent_split(X, y, segment_info, test_size=0.2, random_state=42):
    """
    Split data ensuring complete participant independence between train and test sets.
    
    Args:
        X (np.array): Feature matrix
        y (np.array): Labels
        segment_info (pd.DataFrame): DataFrame containing segment information
        test_size (float): Proportion of participants to use for testing
        random_state (int): Random seed
    
    Returns:
        X_train, X_test, y_train, y_test, train_indices, test_indices
    """
    # Get unique participants
    participants = segment_info['participant_id'].unique()
    
    # Split participants into train and test
    n_test_participants = max(1, int(len(participants) * test_size))
    np.random.seed(random_state)
    test_participants = np.random.choice(participants, n_test_participants, replace=False)
    train_participants = np.array([p for p in participants if p not in test_participants])
    
    # For augmented data, handle original and augmented segments
    is_augmented = segment_info['segment_path'].str.contains('_aug\\d+\\.wav').any()
    
    if is_augmented:
        # Get indices for train and test (including augmented versions)
        train_indices = segment_info[
            segment_info['participant_id'].isin(train_participants)
        ].index
        
        # For test, only use original (non-augmented) segments from test participants
        test_indices = segment_info[
            (segment_info['participant_id'].isin(test_participants)) &
            (~segment_info['segment_path'].str.contains('_aug\\d+\\.wav'))
        ].index
    else:
        # Simple split for non-augmented data
        train_indices = segment_info[
            segment_info['participant_id'].isin(train_participants)
        ].index
        test_indices = segment_info[
            segment_info['participant_id'].isin(test_participants)
        ].index
    
    # Convert to arrays
    train_indices = np.array(train_indices)
    test_indices = np.array(test_indices)
    
    # Print split statistics
    print(f"\nParticipant-Independent Split Statistics:")
    print(f"Training participants: {sorted(train_participants)}")
    print(f"Test participants: {sorted(test_participants)}")
    print(f"\nTraining set size: {len(train_indices)}")
    print(f"Test set size: {len(test_indices)}")
    print("\nLabel distribution in training set:")
    print(pd.Series(y[train_indices]).value_counts())
    print("\nLabel distribution in test set:")
    print(pd.Series(y[test_indices]).value_counts())
    
    return (X[train_indices], X[test_indices], 
            y[train_indices], y[test_indices],
            train_indices, test_indices)

def train_evaluate_knn_participant_independent(X_train, X_test, y_train, y_test, cv=5):
    """
    Train and evaluate KNN model with participant-independent validation.
    
    Args:
        X_train, X_test, y_train, y_test: Training and test data
        cv (int): Number of cross-validation folds
    
    Returns:
        best_model: Trained model with best parameters
        best_params: Best hyperparameters
        cv_results: Cross-validation results
    """
    # Scale the features
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    
    # Encode labels
    from sklearn.preprocessing import LabelEncoder
    label_encoder = LabelEncoder()
    y_train_encoded = label_encoder.fit_transform(y_train)
    y_test_encoded = label_encoder.transform(y_test)
    
    # Store label mapping for later use
    label_mapping = dict(zip(label_encoder.classes_, label_encoder.transform(label_encoder.classes_)))
    print("\nLabel mapping:", label_mapping)
    
    # Define parameter grid
    param_grid = {
        'n_neighbors': [3, 5, 7, 9, 11, 13, 15],
        'weights': ['uniform', 'distance'],
        'metric': ['euclidean', 'manhattan', 'minkowski']
    }
    
    # Initialize and train model with GridSearchCV
    knn = KNeighborsClassifier()
    
    # Use StratifiedKFold for cross-validation
    from sklearn.model_selection import StratifiedKFold
    cv_splitter = StratifiedKFold(n_splits=cv, shuffle=True, random_state=42)
    
    grid_search = GridSearchCV(
        knn, 
        param_grid, 
        cv=cv_splitter,
        scoring='accuracy', 
        n_jobs=-1,
        error_score=0.0
    )
    
    grid_search.fit(X_train_scaled, y_train_encoded)
    
    # Get best model and parameters
    best_model = grid_search.best_estimator_
    best_params = grid_search.best_params_
    
    # Evaluate on test set
    y_pred_encoded = best_model.predict(X_test_scaled)
    y_pred = label_encoder.inverse_transform(y_pred_encoded)
    
    # Print results
    print("\nBest parameters:", best_params)
    print("\nClassification Report:")
    print(classification_report(y_test, y_pred))
    
    # Plot confusion matrix
    plt.figure(figsize=(8, 6))
    cm = confusion_matrix(y_test, y_pred)
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=label_encoder.classes_,
                yticklabels=label_encoder.classes_)
    plt.title('Confusion Matrix (Participant-Independent)')
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')
    plt.tight_layout()
    
    return best_model, best_params, grid_search.cv_results_

def main_participant_independent():
    """
    Main function to run participant-independent evaluation on all feature sets.
    """
    # Define paths for different feature sets
    feature_sets = [
        {
            'name': 'mfcc',
            'features_path': '../../extracted_features/features/mfcc_features.npy',
            'labels_path': '../../extracted_features/labels/mfcc_labels.npy',
            'segment_info_path': '../../segment_info/segment_info.csv'
        },
        {
            'name': 'mfcc_env',
            'features_path': '../../extracted_features/features/mfcc_env_features.npy',
            'labels_path': '../../extracted_features/labels/mfcc_env_labels.npy',
            'segment_info_path': '../../segment_info/segment_info.csv'
        },
        {
            'name': 'mfcc_extracted',
            'features_path': '../../extracted_features/features/mfcc_extracted_features.npy',
            'labels_path': '../../extracted_features/labels/mfcc_extracted_labels.npy',
            'segment_info_path': '../../segment_info/segment_info.csv'
        },
        {
            'name': 'mfcc_aug',
            'features_path': '../../extracted_features/features/mfcc_features_aug.npy',
            'labels_path': '../../extracted_features/labels/mfcc_labels_aug.npy',
            'segment_info_path': '../../segment_info/augmented_segment_info.csv'
        },
        {
            'name': 'mfcc_env_aug',
            'features_path': '../../extracted_features/features/mfcc_env_aug_features.npy',
            'labels_path': '../../extracted_features/labels/mfcc_env_aug_labels.npy',
            'segment_info_path': '../../segment_info/augmented_segment_info.csv'
        },
        {
            'name': 'mfcc_extracted_aug',
            'features_path': '../../extracted_features/features/mfcc_extracted_aug_features.npy',
            'labels_path': '../../extracted_features/labels/mfcc_extracted_aug_labels.npy',
            'segment_info_path': '../../segment_info/augmented_segment_info.csv'
        }
    ]
    
    results = {}
    
    for feature_set in feature_sets:
        print(f"\nProcessing {feature_set['name']} features...")
        
        # Load data
        X, y, segment_info = load_features_and_labels(
            feature_set['features_path'],
            feature_set['labels_path'],
            feature_set['segment_info_path']
        )
        
        # Split data
        X_train, X_test, y_train, y_test, train_idx, test_idx = \
            participant_independent_split(X, y, segment_info)
        
        # Train and evaluate
        best_model, best_params, cv_results = train_evaluate_knn_participant_independent(
            X_train, X_test, y_train, y_test
        )
        
        # Store results
        results[feature_set['name']] = {
            'best_model': best_model,
            'best_params': best_params,
            'cv_results': cv_results,
            'test_indices': test_idx
        }
        
        # Save confusion matrix plot
        plt.savefig(f"../../visualization/confusion_matrix_participant_independent_{feature_set['name']}.png")
        plt.close()
    
    return results

if __name__ == "__main__":
    results = main_participant_independent()


Processing mfcc features...

Participant-Independent Split Statistics:
Training participants: ['AFRI', 'AZiI', 'AZiP', 'BeaI', 'BeaP', 'BicI', 'BicP', 'CatI', 'CatP', 'CavI', 'CavP', 'CraI', 'CraP', 'IsaI', 'IsaP', 'JOlI', 'JOlP', 'JSiI', 'JSiP', 'JoSP', 'MCoI', 'MCoP', 'MafI', 'MafP', 'NorI', 'NorP', 'P1', 'P10', 'P11', 'P12', 'P15', 'P16', 'P17', 'P19', 'P2', 'P21', 'P22', 'P23', 'P25', 'P26', 'P27', 'P28', 'P3', 'P4', 'P5', 'P6', 'P8', 'P9', 'RicI', 'RicP', 'RobI', 'RobP', 'SofP', 'ZgaI', 'ZizI']
Test participants: ['AFRP', 'JSoI', 'MarI', 'MarP', 'P13', 'P14', 'P18', 'P20', 'P24', 'P7', 'SofI', 'ZgaP', 'ZizP']

Training set size: 4557
Test set size: 1157

Label distribution in training set:
kd     1439
hhc    1169
sd     1146
hho     803
Name: count, dtype: int64

Label distribution in test set:
kd     337
hhc    329
sd     285
hho    206
Name: count, dtype: int64

Label mapping: {'hhc': 0, 'hho': 1, 'kd': 2, 'sd': 3}

Best parameters: {'metric': 'manhattan', 'n_neighbors': 3, 'we