# 03_feature_selection_and_classification

In [None]:
import pandas as pd
import numpy as np
import os
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.ensemble import RandomForestClassifier, StackingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.feature_selection import mutual_info_classif
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sklearn.preprocessing import LabelEncoder
from catboost import CatBoostClassifier

DATA_PATH = r"c:/Users/asus/OneDrive/Desktop/ITML Project/processed_data/symbolized_data.csv"
OUTPUT_DIR = r"c:/Users/asus/OneDrive/Desktop/ITML Project/results"
PLOTS_DIR = r"c:/Users/asus/OneDrive/Desktop/ITML Project/plots"
MODELS_DIR = r"c:/Users/asus/OneDrive/Desktop/ITML Project/models"

os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(PLOTS_DIR, exist_ok=True)
os.makedirs(MODELS_DIR, exist_ok=True)

def compute_transition_matrix(sequence, n_states=8):
    transitions = np.zeros((n_states, n_states))
    for (i, j) in zip(sequence[:-1], sequence[1:]):
        transitions[i, j] += 1
    transitions += 1e-5
    row_sums = transitions.sum(axis=1)
    return transitions / row_sums[:, np.newaxis]

def compute_stationary_distribution(P):
    eigvals, eigvecs = np.linalg.eig(P.T)
    idx = np.argmin(np.abs(eigvals - 1))
    pi = np.real(eigvecs[:, idx])
    return pi / pi.sum()

def main():
    print("Loading data...")
    df = pd.read_csv(DATA_PATH)
    
    # Feature Engineering: Markov-based Likelihoods
    print("Engineering Markov features...")
    classes = df['device_class'].unique()
    n_states = 8
    
    class_models = {}
    for cls in classes:
        cls_seq = df[df['device_class'] == cls]['symbol'].values
        P = compute_transition_matrix(cls_seq, n_states)
        pi = compute_stationary_distribution(P)
        class_models[cls] = {'P': P, 'pi': pi}
        
    # Feature: log(P(symbol | class)) = log(pi_class[symbol])
    for cls in classes:
        pi = class_models[cls]['pi']
        log_pi = np.log(pi + 1e-10)
        df[f'log_lik_{cls}'] = df['symbol'].map(lambda s: log_pi[s])
        
    # Prepare Data
    drop_cols = ['device_class', 'filename', 'device', 'symbol']
    X = df.drop(columns=drop_cols).fillna(0)
    y = df['device_class']
    
    le = LabelEncoder()
    y_encoded = le.fit_transform(y)
    
    # Feature Selection (Mutual Information)
    print("Performing Feature Selection...")
    mi_scores = mutual_info_classif(X, y_encoded, random_state=42)
    mi_df = pd.DataFrame({'Feature': X.columns, 'MI_Score': mi_scores}).sort_values(by='MI_Score', ascending=False)
    
    print("\nTop 10 Features:\n", mi_df.head(10))
    mi_df.to_csv(os.path.join(OUTPUT_DIR, 'feature_importance_mi.csv'), index=False)
    
    plt.figure(figsize=(10, 8))
    sns.barplot(data=mi_df.head(20), x='MI_Score', y='Feature', palette='viridis')
    plt.title('Top 20 Features by Mutual Information')
    plt.tight_layout()
    plt.savefig(os.path.join(PLOTS_DIR, 'mi_feature_importance.png'))
    plt.close()
    
    # Select Top K Features
    top_k = 20
    top_features = mi_df['Feature'].head(top_k).tolist()
    X_selected = X[top_features]
    print(f"Selected top {top_k} features.")
    
    # Classification
    X_train, X_test, y_train, y_test = train_test_split(X_selected, y_encoded, test_size=0.2, random_state=42, stratify=y_encoded)
    
    models = {
        'RandomForest': RandomForestClassifier(n_estimators=100, random_state=42),
        'CatBoost': CatBoostClassifier(verbose=0, random_state=42)
    }
    
    results = {}
    
    for name, model in models.items():
        print(f"Training {name}...")
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)
        acc = accuracy_score(y_test, y_pred)
        results[name] = acc
        print(f"{name} Accuracy: {acc:.4f}")
        
        cm = confusion_matrix(y_test, y_pred)
        plt.figure(figsize=(10, 8))
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=le.classes_, yticklabels=le.classes_)
        plt.title(f'Confusion Matrix - {name}')
        plt.ylabel('True Label')
        plt.xlabel('Predicted Label')
        plt.tight_layout()
        plt.savefig(os.path.join(PLOTS_DIR, f'confusion_matrix_{name}.png'))
        plt.close()
        
    # Stacking
    print("Training Stacking Classifier...")
    estimators = [
        ('rf', RandomForestClassifier(n_estimators=100, random_state=42)),
        ('cat', CatBoostClassifier(verbose=0, random_state=42))
    ]
    stacking_clf = StackingClassifier(estimators=estimators, final_estimator=LogisticRegression(), cv=5)
    stacking_clf.fit(X_train, y_train)
    y_pred_stack = stacking_clf.predict(X_test)
    acc_stack = accuracy_score(y_test, y_pred_stack)
    results['Stacking'] = acc_stack
    print(f"Stacking Accuracy: {acc_stack:.4f}")
    
    results_df = pd.DataFrame(list(results.items()), columns=['Model', 'Accuracy'])
    results_df.to_csv(os.path.join(OUTPUT_DIR, 'classification_results.csv'), index=False)
    
    plt.figure(figsize=(8, 6))
    sns.barplot(data=results_df, x='Model', y='Accuracy', palette='magma')
    plt.title('Classification Accuracy Comparison')
    plt.ylim(0, 1.0)
    plt.tight_layout()
    plt.savefig(os.path.join(PLOTS_DIR, 'model_comparison.png'))
    plt.close()
    
    print("Classification complete.")

main()