In [1]:
import os
import random
import pandas as pd
import numpy as np
import joblib
from typing import Tuple, List, Dict, Optional
from datetime import datetime

from sklearn.ensemble import GradientBoostingRegressor
from sklearn.model_selection import train_test_split, RandomizedSearchCV, cross_val_score
from sklearn.preprocessing import StandardScaler, PolynomialFeatures
from sklearn.pipeline import Pipeline
from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_error
from sklearn.impute import SimpleImputer
from sklearn.feature_selection import SelectKBest, f_regression
import matplotlib.pyplot as plt
import seaborn as sns

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset

import xgboost as xgb 

In [2]:
def set_seed(seed: int = 42):
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_seed(42)

DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {DEVICE}')

def create_directories(directories: List[str]):
    for directory in directories:
        if not os.path.exists(directory):
            os.makedirs(directory)
            print(f"Directory '{directory}' created.")
        else:
            print(f"Directory '{directory}' already exists.")

MODEL_DIR = 'models'
RESULT_DIR = 'results'

create_directories([MODEL_DIR, RESULT_DIR])

def load_data(data_path: str,selected_features: Optional[List[str]] = None,target_column: str = 'accuracy'):
    data = pd.read_csv(data_path)
    print("Data loaded successfully.")

    if selected_features is None:
        selected_features = data.columns.drop(target_column).tolist()
        print("Using all available features except the target.")

    missing_features = set(selected_features + [target_column]) - set(data.columns)
    if missing_features:
        raise KeyError(f"Missing columns in data: {missing_features}")

    # Transformation target variable transformation if skewed
    if data[target_column].skew() > 1:
        print("Target variable is skewed. Applying log transformation.")
        data[target_column] = np.log1p(data[target_column])

    features = data[selected_features]
    target = data[target_column].values
    print(f"Selected features: {selected_features}")
    print(f"Target column: {target_column}")

    return features, target

DATA_PATH = 'data/fr_DID_mean.csv'

# Feature set
SELECTED_FEATURES = [
    'Shape_GG', 'AvgStress', 'cross', 
    'PathLength', 'pShape_KNN', 'pAvgStress', 'pCrossNo', 
    'pMinAng', 'pContinu', 'pGeode', 
    '|V|', '|E|',
    'Shape_RNG', 'Shape_EMST', 'Shape_AlphaShape', 'Shape_KNN',
    'RegularStress', 'AvgScaledStress', 'angleM', 'angleD', 'minAng',
    'edgeM', 'edgeD', 'minVertx', 'finVertx', 'minPtEdg',
    'pShape_RNG', 'pShape_EMST', 'pRegularStress', 'pAvgScaledStress',
    'pAngMean', 'pContinu', 'pGeode', 'YN', 'Effort', 'Time'
]

TARGET_COLUMN = 'accuracy'

features, target = load_data(DATA_PATH, selected_features=SELECTED_FEATURES, target_column=TARGET_COLUMN)

print(f"Loaded data with {features.shape[0]} samples and {features.shape[1]} features.")

def create_preprocessing_pipeline(selected_features: List[str], k_best: int = 30) -> Pipeline:
    # Preprocessing pipeline with imputation, polynomial features, scaling, and feature selection.
    preprocessing_steps = [
        ('imputer', SimpleImputer(strategy='mean')),
        ('poly', PolynomialFeatures(degree=2, include_bias=False)),
        ('scaler', StandardScaler()),
        ('feature_selection', SelectKBest(score_func=f_regression, k=k_best))
    ]

    preprocessing_pipeline = Pipeline(preprocessing_steps)
    return preprocessing_pipeline

class GraphDataset(Dataset):
    def __init__(self, X: np.ndarray, y: np.ndarray):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.float32).unsqueeze(1)
        
    def __len__(self) -> int:
        return len(self.X)
    
    def __getitem__(self, idx: int) -> Tuple[torch.Tensor, torch.Tensor]:
        return self.X[idx], self.y[idx]

# Neural Network architecture
class ImprovedNN(nn.Module):
    def __init__(self, input_size: int, output_size: int = 1):
        super(ImprovedNN, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(input_size, 256),
            nn.BatchNorm1d(256),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(256, 128),
            nn.BatchNorm1d(128),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(128, 64),
            nn.BatchNorm1d(64),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, output_size)
        )
        
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.model(x)

# Evaluation and plotting
def evaluate_and_plot(
    y_true: np.ndarray,
    y_pred: np.ndarray,
    model_name: str,
    transform_back: bool = False
):

    if transform_back:
        y_true = np.expm1(y_true)
        y_pred = np.expm1(y_pred)
    
    mse = mean_squared_error(y_true, y_pred)
    mae = mean_absolute_error(y_true, y_pred)
    r2 = r2_score(y_true, y_pred)
    
    print(f"{model_name} - MSE: {mse:.4f}, MAE: {mae:.4f}, R²: {r2:.4f}")
    
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

    # Plot actual vs Predicted
    plt.figure(figsize=(8,6))
    sns.scatterplot(x=y_true, y=y_pred, alpha=0.6)
    plt.plot([y_true.min(), y_true.max()], [y_true.min(), y_true.max()], 'r--')
    plt.xlabel('Actual')
    plt.ylabel('Predicted')
    plt.title(f'{model_name}: Actual vs Predicted')
    plt.tight_layout()
    plot_filename = f'{model_name}_actual_vs_predicted_{timestamp}.png'
    plt.savefig(os.path.join(RESULT_DIR, plot_filename))
    plt.close()
    print(f"Saved plot: {plot_filename}")
    
    # Plot residuals vs Predicted
    residuals = y_true - y_pred
    plt.figure(figsize=(8,6))
    sns.scatterplot(x=y_pred, y=residuals, alpha=0.6)
    plt.axhline(0, color='r', linestyle='--')
    plt.xlabel('Predicted')
    plt.ylabel('Residuals')
    plt.title(f'{model_name}: Residuals vs Predicted')
    plt.tight_layout()
    plot_filename = f'{model_name}_residuals_{timestamp}.png'
    plt.savefig(os.path.join(RESULT_DIR, plot_filename))
    plt.close()
    print(f"Saved plot: {plot_filename}")

    # Feature Importance for tree based models
    if model_name in ['Gradient_Boosting', 'XGBoost']:
        try:
            if hasattr(best_model, 'feature_importances_'):
                importances = best_model.feature_importances_
                feature_names = preprocessing_pipeline.named_steps['feature_selection'].get_support(indices=True)
                selected_feature_names = [SELECTED_FEATURES[i] for i in feature_names]
                indices = np.argsort(importances)[::-1]
                plt.figure(figsize=(10, 8))
                sns.barplot(x=importances[indices][:20], y=np.array(selected_feature_names)[indices][:20])
                plt.title('Top Feature Importances')
                plt.tight_layout()
                plt.show()
            else:
                print("Model does not have feature_importances_ attribute.")
        except Exception as e:
            print(f"Error plotting feature importances: {e}")

# Train Neural Network
def train_nn_model(
    preprocessing_pipeline: Pipeline,
    X_train: np.ndarray,
    y_train: np.ndarray,
    X_val: np.ndarray,
    y_val: np.ndarray,
    X_test: np.ndarray,
    y_test: np.ndarray,
    epochs: int = 300,
    batch_size: int = 128,
    learning_rate: float = 0.001,
    patience: int = 20
):
    
    X_train_processed = preprocessing_pipeline.fit_transform(X_train, y_train)
    X_val_processed = preprocessing_pipeline.transform(X_val)
    X_test_processed = preprocessing_pipeline.transform(X_test)
    
    print(f"Training data processed shape: {X_train_processed.shape}")
    print(f"Validation data processed shape: {X_val_processed.shape}")
    print(f"Test data processed shape: {X_test_processed.shape}")
    
    train_dataset = GraphDataset(X_train_processed, y_train)
    val_dataset = GraphDataset(X_val_processed, y_val)
    test_dataset = GraphDataset(X_test_processed, y_test)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    input_size = X_train_processed.shape[1]
    model = ImprovedNN(input_size=input_size).to(DEVICE)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=1e-5)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', 
                                                     factor=0.5, patience=5, 
                                                     verbose=True)
    
    best_model_state = None
    best_val_loss = float('inf')
    epochs_no_improve = 0
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(DEVICE), labels.to(DEVICE)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        
        avg_train_loss = running_loss / len(train_loader)
        
        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(DEVICE), labels.to(DEVICE)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
        
        avg_val_loss = val_loss / len(val_loader)
        
        print(f"Epoch [{epoch+1}/{epochs}], Train Loss: {avg_train_loss:.6f}, Val Loss: {avg_val_loss:.6f}")
        
        scheduler.step(avg_val_loss)

        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            epochs_no_improve = 0
            best_model_state = model.state_dict().copy()
            print("Validation loss improved. Best model updated.")
        else:
            epochs_no_improve += 1
            print(f"No improvement in validation loss for {epochs_no_improve} epoch(s).")
            if epochs_no_improve >= patience:
                print("Early stopping triggered.")
                break

    model.load_state_dict(best_model_state)

    model_filename = f'best_nn_model_{timestamp}.pth'
    torch.save(model.state_dict(), os.path.join(MODEL_DIR, model_filename))
    print(f"Best Neural Network model saved to {os.path.join(MODEL_DIR, model_filename)}.")

    model.eval()
    y_pred = []
    y_true = []
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs = inputs.to(DEVICE)
            outputs = model(inputs)
            y_pred.append(outputs.cpu().numpy())
            y_true.append(labels.numpy())
    y_pred = np.concatenate(y_pred, axis=0).flatten()
    y_true = np.concatenate(y_true, axis=0).flatten()

    evaluate_and_plot(y_true, y_pred, "Neural_Network", transform_back=True)

    return model

# Train Gradient Boosting
def train_gradient_boosting(X_train: np.ndarray,y_train: np.ndarray,param_distributions: Optional[Dict] = None,n_iter: int = 50,use_xgboost: bool = True):

    if use_xgboost:
        model = xgb.XGBRegressor(objective='reg:squarederror', random_state=42, n_jobs=-1)
    else:
        model = GradientBoostingRegressor(random_state=42)
    
    if param_distributions:
        print("Starting Randomized Search for Gradient Boosting...")
        randomized_search = RandomizedSearchCV(
            estimator=model,
            param_distributions=param_distributions,
            n_iter=n_iter,
            cv=5,
            scoring='r2',  
            verbose=2,
            random_state=42,
            n_jobs=-1
        )
        randomized_search.fit(X_train, y_train)
        best_model = randomized_search.best_estimator_
        print(f"Best Gradient Boosting Params: {randomized_search.best_params_}")
    else:
        model.fit(X_train, y_train)
        best_model = model
        print("Gradient Boosting model trained with default parameters.")
    
    return best_model

# Experiment runner
def run_experiment(
    features: pd.DataFrame,
    target: np.ndarray,
    model_type: str = 'nn',
    param_distributions: Optional[Dict] = None,
    n_iter: int = 50
):

    X_train_val, X_test, y_train_val, y_test = train_test_split(
        features, target, test_size=0.2, random_state=42
    )

    X_train, X_val, y_train, y_val = train_test_split(
        X_train_val, y_train_val, test_size=0.25, random_state=42  # 0.25 x 0.8 = 0.2
    )
    
    if model_type == 'nn':
        print(f"\nRunning Neural Network with features: {SELECTED_FEATURES}")
        preprocessing_pipeline = create_preprocessing_pipeline(SELECTED_FEATURES, k_best=30)
        model = train_nn_model(
            preprocessing_pipeline,
            X_train.values, y_train,
            X_val.values, y_val,
            X_test.values, y_test
        )
    elif model_type in ['boosting', 'xgboost']:
        print(f"\nRunning {'XGBoost' if model_type == 'xgboost' else 'Gradient Boosting'} with features: {SELECTED_FEATURES}")
        preprocessing_pipeline = create_preprocessing_pipeline(SELECTED_FEATURES, k_best=30)
        X_train_processed = preprocessing_pipeline.fit_transform(X_train, y_train)
        X_val_processed = preprocessing_pipeline.transform(X_val)
        X_test_processed = preprocessing_pipeline.transform(X_test)

        best_model = train_gradient_boosting(
            X_train_processed,
            y_train,
            param_distributions=param_distributions,
            n_iter=n_iter,
            use_xgboost=(model_type == 'xgboost')
        )

        y_pred = best_model.predict(X_test_processed)
        evaluate_and_plot(y_test, y_pred, "XGBoost" if model_type == 'xgboost' else "Gradient_Boosting", transform_back=True)

        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        model_filename = f'best_xgboost_model_{timestamp}.pkl' if model_type == 'xgboost' else f'best_gradient_boosting_model_{timestamp}.pkl'
        joblib.dump(best_model, os.path.join(MODEL_DIR, model_filename))
        print(f"{'XGBoost' if model_type == 'xgboost' else 'Gradient Boosting'} model saved to {os.path.join(MODEL_DIR, model_filename)}.")
    else:
        print(f"Unsupported model type: {model_type}")

Using device: cuda
Directory 'models' already exists.
Directory 'results' already exists.
Data loaded successfully.
Selected features: ['Shape_GG', 'AvgStress', 'cross', 'PathLength', 'pShape_KNN', 'pAvgStress', 'pCrossNo', 'pMinAng', 'pContinu', 'pGeode', '|V|', '|E|', 'Shape_RNG', 'Shape_EMST', 'Shape_AlphaShape', 'Shape_KNN', 'RegularStress', 'AvgScaledStress', 'angleM', 'angleD', 'minAng', 'edgeM', 'edgeD', 'minVertx', 'finVertx', 'minPtEdg', 'pShape_RNG', 'pShape_EMST', 'pRegularStress', 'pAvgScaledStress', 'pAngMean', 'pContinu', 'pGeode', 'YN', 'Effort', 'Time']
Target column: accuracy
Loaded data with 5542 samples and 36 features.


In [3]:
if __name__ == "__main__":
    print("Starting experiments...")

    # Hyperparameters for XGBoost
    PARAM_DISTRIBUTIONS_XGB = {
        'n_estimators': [100, 200, 300, 400, 500],
        'learning_rate': np.linspace(0.01, 0.3, 30),
        'max_depth': [3, 4, 5, 6, 7, 8, 9, 10],
        'min_child_weight': [1, 2, 3, 4, 5],
        'subsample': [0.6, 0.7, 0.8, 0.9, 1.0],
        'colsample_bytree': [0.6, 0.7, 0.8, 0.9, 1.0],
        'gamma': [0, 0.1, 0.2, 0.3, 0.4]
    }

    # Neural Network
    print("\nExperiment: Predicting Accuracy - Neural Network")
    run_experiment(features, target, model_type='nn')

    # XGBoost
    print("\nExperiment: Predicting Accuracy - XGBoost")
    run_experiment(features, target, model_type='xgboost', param_distributions=PARAM_DISTRIBUTIONS_XGB, n_iter=50)

    print("\nExperiments completed.")

Starting experiments...

Experiment: Predicting Accuracy - Neural Network

Running Neural Network with features: ['Shape_GG', 'AvgStress', 'cross', 'PathLength', 'pShape_KNN', 'pAvgStress', 'pCrossNo', 'pMinAng', 'pContinu', 'pGeode', '|V|', '|E|', 'Shape_RNG', 'Shape_EMST', 'Shape_AlphaShape', 'Shape_KNN', 'RegularStress', 'AvgScaledStress', 'angleM', 'angleD', 'minAng', 'edgeM', 'edgeD', 'minVertx', 'finVertx', 'minPtEdg', 'pShape_RNG', 'pShape_EMST', 'pRegularStress', 'pAvgScaledStress', 'pAngMean', 'pContinu', 'pGeode', 'YN', 'Effort', 'Time']
Training data processed shape: (3324, 30)
Validation data processed shape: (1109, 30)
Test data processed shape: (1109, 30)
Epoch [1/300], Train Loss: 0.193560, Val Loss: 0.023254
Validation loss improved. Best model updated.
Epoch [2/300], Train Loss: 0.049258, Val Loss: 0.020495
Validation loss improved. Best model updated.
Epoch [3/300], Train Loss: 0.044559, Val Loss: 0.015216
Validation loss improved. Best model updated.
Epoch [4/300], T