In [1]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder, LabelEncoder
from sklearn.compose import make_column_transformer
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
from sklearn.utils import resample
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Input
import joblib
import warnings

In [None]:
# Suppress warnings for cleaner output
warnings.filterwarnings('ignore')
tf.get_logger().setLevel('ERROR')

In [None]:
def build_full_model():
    print("--- Starting Keras 5-CLASS Model Building Process ---")

    # --- Step 1: Load and Prepare Data (Scikit-learn) ---
    print("Step 1: Loading and Preparing Data...")
    try:
        # Using the filename you specified
        df = pd.read_csv('Combined_Real_Movements.csv')
        df = df.dropna() # Drop any rows with missing data

        # --- Data Balancing for 5 Classes ---
        # Find the size of the smallest class
        class_counts = df['Classes'].value_counts()
        min_class_size = class_counts.min()
        print(f"Original class counts:\n{class_counts}")
        print(f"Smallest class has {min_class_size} samples. Balancing all classes to this size.")

        # Create a new balanced dataframe
        df_balanced = pd.DataFrame()
        for class_name in class_counts.index:
            df_class = df[df['Classes'] == class_name]
            df_class_downsampled = resample(df_class, 
                                             replace=False, # sample without replacement
                                             n_samples=min_class_size, # to match smallest class
                                             random_state=42)
            df_balanced = pd.concat([df_balanced, df_class_downsampled])

        print(f"Data balanced. Total samples: {len(df_balanced)}.")
        print(f"New class counts:\n{df_balanced['Classes'].value_counts()}")

    except FileNotFoundError:
        print("Error: 'Combined_Real_Movements.csv' not found.")
        return
    except Exception as e:
        print(f"An error occurred during data loading: {e}")
        return

    # --- Step 2: Define Features (X) and Target (y) ---
    print("\nStep 2: Defining Features (X) and Target (y)...")
    features = [col for col in df_balanced.columns if col not in ['Classes', 'Subject']]
    target = 'Classes'

    X = df_balanced[features]
    y = df_balanced[target]

    # --- Step 3: Preprocessing (Scikit-learn) ---
    print("Step 3: Preprocessing Data...")
    
    # 3a: Encode Target Variable (y) for Keras
    # Keras needs numerical labels (0, 1, 2, 3, 4) instead of text
    le = LabelEncoder()
    y_encoded = le.fit_transform(y)
    # Save the label encoder so we can decode predictions later
    joblib.dump(le, 'label_encoder.joblib')
    print(f"Target classes encoded: {le.classes_} -> {np.unique(y_encoded)}")

    # 3b: Define Preprocessing for Features (X)
    numeric_features = X.select_dtypes(include='number').columns.tolist()
    
    # Check if 'WaveType' exists.
    if 'WaveType' in X.columns:
        print("Found 'WaveType' column. Will use it as a categorical feature.")
        categorical_features = ['WaveType']
        # We use sparse_output=False to ensure Keras receives a dense array
        preprocessor = make_column_transformer(
            (StandardScaler(), numeric_features),
            (OneHotEncoder(sparse_output=False, handle_unknown='ignore'), categorical_features)
        )
    else:
        print("Warning: 'WaveType' column not found. Proceeding with numeric features only.")
        categorical_features = []
        preprocessor = make_column_transformer(
            (StandardScaler(), numeric_features)
        )

    # 3c: Split and Preprocess Data
    # Stratify ensures we keep the class balance in both train and test sets
    X_train, X_test, y_train, y_test = train_test_split(X, y_encoded, test_size=0.25, random_state=42, stratify=y_encoded)

    X_train_processed = preprocessor.fit_transform(X_train)
    X_test_processed = preprocessor.transform(X_test)
    
    # Save the preprocessor
    joblib.dump(preprocessor, 'data_preprocessor.joblib')
    print("Data preprocessor (scaler/encoder) saved.")

    print(f"Training features shape: {X_train_processed.shape}")
    print(f"Testing features shape: {X_test_processed.shape}")

    # --- Step 4: Build Keras Model ---
    print("\nStep 4: Building Keras Neural Network...")
    
    n_features = X_train_processed.shape[1]
    n_classes = len(le.classes_) # Should be 5

    model = Sequential()
    # Explicit Input layer is cleaner in newer Keras versions
    model.add(Input(shape=(n_features,)))
    model.add(Dense(64, activation='relu'))
    model.add(Dropout(0.3)) 
    model.add(Dense(32, activation='relu'))
    model.add(Dropout(0.3))
    
    # Output layer: n_classes (5) neurons and 'softmax' activation
    model.add(Dense(n_classes, activation='softmax'))

    # Compile the model: 'sparse_categorical_crossentropy' for multi-class
    model.compile(optimizer='adam',
                  loss='sparse_categorical_crossentropy', 
                  metrics=['accuracy'])

    model.summary()

    # --- Step 5: Train Keras Model ---
    print("\nStep 5: Training Keras Model...")
    history = model.fit(
        X_train_processed,
        y_train,
        validation_data=(X_test_processed, y_test),
        epochs=30, 
        batch_size=32,
        verbose=1
    )

    # --- Step 6: Evaluate Keras Model (Scikit-learn) ---
    print("\nStep 6: Evaluating Keras Model...")
    
    # Get probability predictions from Keras
    y_pred_proba = model.predict(X_test_processed)
    
    # Convert probabilities to class labels using argmax
    y_pred = np.argmax(y_pred_proba, axis=1)

    # Use Scikit-learn's tools for a clear report
    y_test_labels = le.inverse_transform(y_test)
    y_pred_labels = le.inverse_transform(y_pred)

    accuracy = accuracy_score(y_test_labels, y_pred_labels)
    print(f"\nModel Accuracy: {accuracy * 100:.2f}%")

    print("\nClassification Report:")
    print(classification_report(y_test_labels, y_pred_labels))

    print("\nConfusion Matrix:")
    cm = confusion_matrix(y_test_labels, y_pred_labels)
    cm_df = pd.DataFrame(cm, 
                       index=[f'Actual: {c}' for c in le.classes_], 
                       columns=[f'Pred: {c}' for c in le.classes_])
    print(cm_df)

    # --- Step 7: Save the Trained Model ---
    print("\nStep 7: Saving Trained Model...")
    model.save('keras_bci_model_5_class.h5')
    print("Keras model saved to 'keras_bci_model_5_class.h5'")
    print("--- Model Building Process Finished ---")

In [None]:
# Run the entire process
if __name__ == "__main__":
    build_full_model()