# Plots for report

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import classification_report, accuracy_score, precision_score, recall_score, f1_score
from sklearn.metrics import precision_recall_fscore_support, roc_curve, auc

from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Bidirectional, LSTM, Dropout, Attention, Concatenate, Dense, Flatten
import tensorflow as tf

In [None]:
filepath = '../../data/dataframes/labels_and_coordinates_preprocessed.csv'

In [None]:
def preprocess_classic_data(filepath):
    data = pd.read_csv(filepath)
    
    # Define the manual mappings for each categorical column
    boulder_mapping = {'W1': 1, 'W2': 2, 'W3': 3, 'W4': 4}
    camera_mapping = {'Cam21': 21, 'Cam22': 22, 'Cam24': 24}
    participant_mapping = {
        'Ai Mori': 1, 'Anastasia Sanders': 2, 'Ayala Kerem': 3, 'Brooke Raboutou': 4,
        'Chaehyun Seo': 5, 'Helene Janicot': 6, 'Jain Kim': 7, 'Janja Garnbret': 8,
        'Jessica Pilz': 9, 'Kyra Condie': 10, 'Laura Rogora': 11, 'Manon Hily': 12,
        'Mia Krampl': 13, 'Miho Nonaka': 14, 'Molly Thompsonsmith': 15,
        'Natalia Grossman': 16, 'Oceania Mackenzie': 17, 'Oriane Bertone': 18,
        'Vita Lukan': 19, 'Yejoo Seo': 20, 'Zelia Avezou': 21
    }
    repetition_mapping = {'V1': 1, 'V2': 2, 'V3': 3, 'V4': 4, 'V5': 5, 'V6': 6, 'V7': 7, 'V8': 8, 'V9': 9, 'V10': 10}

    # Map the categorical columns using the defined mappings
    data['boulder'] = data['boulder'].map(boulder_mapping)
    data['camera'] = data['camera'].map(camera_mapping)
    data['participant'] = data['participant'].map(participant_mapping)
    data['repetition'] = data['repetition'].map(repetition_mapping)

    return data, participant_mapping


def split_data_boulder_specific(data):
    train_boulders = [1, 2, 4]
    test_boulders = [3]

    X_train = data[data['boulder'].isin(train_boulders)]
    y_train = X_train.pop('label')
    X_test = data[data['boulder'].isin(test_boulders)]
    y_test = X_test.pop('label')

    return X_train, y_train, X_test, y_test


def split_data_athlete_specific(data, participant_mapping):
    test_athletes = ['Ai Mori', 'Brooke Raboutou', 'Oceania Mackenzie', 'Mia Krampl']
    test_athlete_ids = [participant_mapping[athlete] for athlete in test_athletes]

    X_train = data[~data['participant'].isin(test_athlete_ids)]
    y_train = X_train.pop('label')
    X_test = data[data['participant'].isin(test_athlete_ids)]
    y_test = X_test.pop('label')

    return X_train, y_train, X_test, y_test


def evaluate_classic_models(X_train, y_train, X_test, y_test):
    models = {
        "Logistic Regression": LogisticRegression(max_iter=1000),
        "Decision Tree": DecisionTreeClassifier(),
        "KNN": KNeighborsClassifier(),
        "Random Forest": RandomForestClassifier()
    }
    
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)

    results = {}
    for name, model in models.items():
        print(f"Training {name} model...")
        if name == "Logistic Regression":
            model.fit(X_train_scaled, y_train)
            y_pred = model.predict(X_test_scaled)
        else:
            model.fit(X_train, y_train)
            y_pred = model.predict(X_test)
        
        results[name] = {
            'Accuracy': accuracy_score(y_test, y_pred),
            'Precision': precision_score(y_test, y_pred, average='weighted', zero_division=0),
            'Recall': recall_score(y_test, y_pred, average='weighted', zero_division=0),
            'F1 Score': f1_score(y_test, y_pred, average='weighted', zero_division=0)
        }
        print(f"{name} results: {results[name]}")
    
    return results


def preprocess_rnn_data(data):
    # Remove missing values
    data = data.dropna()

    # Normalize keypoint data
    scaler = StandardScaler()

    # Gather all keypoint columns
    keypoint_columns = [col for col in data.columns if '_x' in col or '_y' in col or '_z' in col or '_v' in col or '_p' in col or 'com' in col or 'angle' in col]
    data[keypoint_columns] = scaler.fit_transform(data[keypoint_columns])

    # One-hot encode categorical variables
    data = pd.get_dummies(data, columns=['boulder', 'camera', 'participant', 'repetition'])

    # Encode labels
    if 'label' in data.columns:
        label_encoder = LabelEncoder() 
        data['label'] = label_encoder.fit_transform(data['label'])

    return data, label_encoder


def split_rnn_data(data, split_by_boulder=True):
    if split_by_boulder:
        train_data = data[data['boulder_W3'] == 0]
        test_data = data[data['boulder_W3'] == 1]
        
    else:
        test_athletes = ["participant_Ai Mori", "participant_Brooke Raboutou", "participant_Oceania Mackenzie", "participant_Mia Krampl"]
        test_data = data[data[test_athletes].any(axis=1)]
        train_data = data[~data[test_athletes].any(axis=1)]

    timesteps = 2
    total_features = data.drop('label', axis=1).shape[1]
    if total_features % timesteps != 0:
        raise ValueError(f"Number of total features ({total_features}) is not divisible by defined timesteps ({timesteps}).")
    features_per_timestep = total_features // timesteps

    X_train = train_data.drop('label', axis=1).values.reshape(-1, timesteps, features_per_timestep).astype(np.float32)
    y_train = train_data['label'].values.astype(np.int32)
    X_test = test_data.drop('label', axis=1).values.reshape(-1, timesteps, features_per_timestep).astype(np.float32)
    y_test = test_data['label'].values.astype(np.int32)

    train_dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train)).batch(32)
    test_dataset = tf.data.Dataset.from_tensor_slices((X_test, y_test)).batch(32)

    return train_dataset, test_dataset, len(np.unique(y_train))


def create_rnn_model(timesteps, features_per_timestep, nr_classes):
    inputs = Input(shape=(timesteps, features_per_timestep))
    x1 = Bidirectional(LSTM(64, return_sequences=True))(inputs)
    x1 = Dropout(0.3)(x1)
    attention_layer_1 = Attention()([x1, x1])
    x2 = Bidirectional(LSTM(128, return_sequences=True))(x1)
    x2 = Dropout(0.3)(x2)
    attention_layer_2 = Attention()([x2, x2])
    x3 = Bidirectional(LSTM(64, return_sequences=False))(x2)
    x3 = Dropout(0.3)(x3)
    concatenated = Concatenate()([Flatten()(attention_layer_1), Flatten()(attention_layer_2), x3])
    x_final = Dense(128, activation='relu')(concatenated)
    x_final = Dropout(0.3)(x_final)
    x_final = Dense(64, activation='relu')(x_final)
    outputs = Dense(nr_classes, activation='softmax')(x_final)
    model = Model(inputs=inputs, outputs=outputs)
    model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    return model


def train_evaluate_rnn_model(train_dataset, test_dataset, timesteps, features_per_timestep, nr_classes):
    model = create_rnn_model(timesteps, features_per_timestep, nr_classes)
    print("Training RNN model...")
    history = model.fit(train_dataset, epochs=10, verbose=1)
    
    # Evaluate on test dataset
    y_true = np.concatenate([y for x, y in test_dataset], axis=0)
    y_pred_prob = model.predict(test_dataset)
    y_pred = np.argmax(y_pred_prob, axis=1)
    
    # Calculate evaluation metrics
    accuracy = accuracy_score(y_true, y_pred)
    precision, recall, f1_score, _ = precision_recall_fscore_support(y_true, y_pred, average='weighted')
    
    print(f"RNN model results - Accuracy: {accuracy}, Precision: {precision}, Recall: {recall}, F1 Score: {f1_score}")
    
    return {
        'Accuracy': accuracy,
        'Precision': precision,
        'Recall': recall,
        'F1 Score': f1_score,
        'Predictions': y_pred,
        'True Labels': y_true
    }

In [None]:
# ----------------------------------------- main ----------------------------------
# > > > > > > > > > > > > > > > > > > > > > CLASSIC ML < < < < < < < < < < < < < < < < < < < <
# Preprocess data for the classic ml models
data_classic, participant_mapping = preprocess_classic_data(filepath)

# Boulder-specific split
print("Evaluating models on Boulder-specific split...")
X_train_boulder, y_train_boulder, X_test_boulder, y_test_boulder = split_data_boulder_specific(data_classic)
classic_results_boulder = evaluate_classic_models(X_train_boulder, y_train_boulder, X_test_boulder, y_test_boulder)

# Athlete-specific split
print("Evaluating models on Athlete-specific split...")
X_train_athlete, y_train_athlete, X_test_athlete, y_test_athlete = split_data_athlete_specific(data_classic, participant_mapping)
classic_results_athlete = evaluate_classic_models(X_train_athlete, y_train_athlete, X_test_athlete, y_test_athlete)

In [None]:
# > > > > > > > > > > > > > > > > > > > > > RNN < < < < < < < < < < < < < < < < < < < <
# Preprocess data for the rnn model
data_rnn = pd.read_csv(filepath)
data_rnn, label_encoder = preprocess_rnn_data(data_rnn)

# Boulder-specific split for RNN
print("Evaluating RNN on Boulder-specific split...")
train_dataset_boulder, test_dataset_boulder, nr_classes_boulder = split_rnn_data(data_rnn, split_by_boulder=True)
rnn_results_boulder = train_evaluate_rnn_model(train_dataset_boulder, test_dataset_boulder, 2, data_rnn.drop('label', axis=1).shape[1] // 2, nr_classes_boulder)

# Athlete-specific split for RNN
print("Evaluating RNN on Athlete-specific split...")
train_dataset_athlete, test_dataset_athlete, nr_classes_athlete = split_rnn_data(data_rnn, split_by_boulder=False)
rnn_results_athlete = train_evaluate_rnn_model(train_dataset_athlete, test_dataset_athlete, 2, data_rnn.drop('label', axis=1).shape[1] // 2, nr_classes_athlete)

In [None]:
# performance overview - plot
# Define colors
colors = ['#008080', '#20B2AA', '#FF8C00', '#FF6347']

def plot_results(ax, results, title):
    df = pd.DataFrame(results).T
    df.plot(kind='bar', color=colors, ax=ax)
    ax.set_ylim(0, 1)  # Set y-axis limits from 0 to 1
    ax.set_title(title)
    ax.set_ylabel('Score')
    ax.set_xlabel('Model')
    ax.set_xticklabels(df.index, rotation=45)
    ax.legend(loc='upper right', bbox_to_anchor=(1.2, 1))

# Assuming classic_results_boulder and classic_results_athlete are defined as dictionaries

# Create a figure with two subplots
fig, axes = plt.subplots(1, 2, figsize=(14, 6))

# Plot results for classic ML models
plot_results(axes[0], classic_results_boulder, 'Model Performance on Boulder Test Set')
plot_results(axes[1], classic_results_athlete, 'Model Performance on Athlete Test Set')

# Adjust layout
plt.tight_layout()
plt.show()