In [None]:
import pandas as pd
import numpy as np
from sklearn.feature_selection import mutual_info_classif, chi2, SelectKBest
from sklearn.ensemble import RandomForestClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import GridSearchCV, cross_val_score, StratifiedKFold
from sklearn.metrics import classification_report, confusion_matrix, make_scorer
from sklearn.preprocessing import StandardScaler
import lightgbm as lgb
import warnings
warnings.filterwarnings('ignore')

class MLProject:
    def __init__(self):
        self.data = None
        self.feature_ranking = None
        self.models = {
            'RandomForest': RandomForestClassifier(random_state=42),
            'DecisionTree': DecisionTreeClassifier(random_state=42),
            'LightGBM': lgb.LGBMClassifier(random_state=42, verbose=-1),
            'LogisticRegression': LogisticRegression(random_state=42, max_iter=1000)
        }
        self.best_params = {}
        self.trained_models = {}
        
    def load_data(self, csv_path):
        """載入CSV資料"""
        self.data = pd.read_csv(csv_path)
        print(f"資料載入成功！資料形狀: {self.data.shape}")
        print(f"欄位: {list(self.data.columns)}")
        return self.data
    
    def get_feature_ranking(self, target_column, exclude_columns=None, top_k=5):
        """
        使用Mutual Information和Chi2進行特徵排名
        
        Parameters:
        target_column: 目標變數欄位名稱
        exclude_columns: 不參與排名的欄位列表
        top_k: 前k個重要特徵
        
        Returns:
        [[T0特徵], [T1特徵]]
        """
        if self.data is None:
            raise ValueError("請先載入資料!")
        
        # 準備特徵和目標變數
        feature_columns = [col for col in self.data.columns if col != target_column]
        if exclude_columns:
            feature_columns = [col for col in feature_columns if col not in exclude_columns]
        
        X = self.data[feature_columns]
        y = self.data[target_column]
        
        print(f"使用特徵: {feature_columns}")
        print(f"目標變數: {target_column}")
        
        # Mutual Information Score
        mi_scores = mutual_info_classif(X, y, random_state=42)
        mi_ranking = sorted(zip(feature_columns, mi_scores), key=lambda x: x[1], reverse=True)
        mi_top_features = [feature for feature, score in mi_ranking[:top_k]]
        
        # Chi2 Score (確保非負值)
        X_chi2 = X.copy()
        X_chi2 = X_chi2 - X_chi2.min() + 1e-8  # 確保所有值都是正數
        chi2_scores, _ = chi2(X_chi2, y)
        chi2_ranking = sorted(zip(feature_columns, chi2_scores), key=lambda x: x[1], reverse=True)
        chi2_top_features = [feature for feature, score in chi2_ranking[:top_k]]
        
        # 分類T0和T1
        T0 = list(set(mi_top_features) & set(chi2_top_features))  # 兩者都有
        T1 = list((set(mi_top_features) | set(chi2_top_features)) - set(T0))  # 只有其中一個
        
        self.feature_ranking = [T0, T1]
        
        print(f"\n=== 特徵排名結果 ===")
        print(f"Mutual Information前{top_k}名: {mi_top_features}")
        print(f"Chi2前{top_k}名: {chi2_top_features}")
        print(f"T0特徵 (兩者都有): {T0}")
        print(f"T1特徵 (僅其中一個): {T1}")
        
        return self.feature_ranking
    
    def calculate_negative_metrics(self, y_true, y_pred):
        """計算針對負向(0)的指標"""
        tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
        
        # Specificity (True Negative Rate)
        specificity = tn / (tn + fp) if (tn + fp) > 0 else 0
        
        # Negative Predictive Value
        npv = tn / (tn + fn) if (tn + fn) > 0 else 0
        
        return {
            'specificity': specificity,
            'npv': npv
        }
    
    def get_param_grids(self):
        """定義各模型的參數網格"""
        param_grids = {
            'RandomForest': {
                'n_estimators': [],
                'criterion': [],
                'max_depth':[],
                'min_samples_split': [],
                'min_samples_leaf': [],
                'max_features': [],
                'bootstrap': [],
                'class_weight': [],
                'n_jobs': []
            },
            'DecisionTree': {
                'criterion': [],
                'splitter': [],
                'max_depth': [],
                'min_samples_split': [],
                'min_samples_leaf': [],
                'min_weight_fraction_leaf': [],
                'max_features': [],
                'max_leaf_nodes': [],
                'min_impurity_decrease': [],
                'class_weight': []
            },
            'LightGBM': {
                'boosting_type': [],  
                'objective': [],
                'learning_rate': [],
                'n_estimators': [],
                'num_leaves': [],
                'max_depth': [],
                'min_data_in_leaf': [],
                'feature_fraction': [],
                'bagging_fraction': [],
                'bagging_freq': [], 
                'lambda_l1': [],
                'lambda_l2': [],
                'class_weight': []
            },
            'LogisticRegression': {
                'penalty': [],
                'C': [],
                'solver': [],
                'max_iter': [],
                'tol': [],
                'fit_intercept': [],
                'class_weight': [],
                'random_state': [],
                'multi_class': []
            }
        }
        return param_grids

    def train_models(self, target_column, selected_models=None, features_list=None, 
                    use_feature_selection=True):
        """
        訓練模型並進行參數調優
        
        Parameters:
        target_column: 目標變數
        selected_models: 要訓練的模型列表
        features_list: 使用的特徵列表，如果為None則使用所有特徵
        use_feature_selection: 是否使用特徵選擇結果
        """
    
        random_seed = 42
        
        if self.data is None:
            raise ValueError("請先載入資料!")
        
        # 確定使用的特徵
        if use_feature_selection and self.feature_ranking:
            if features_list is None:
                # 使用T0+T1特徵
                features_list = self.feature_ranking[0] + self.feature_ranking[1]
            print(f"使用特徵選擇結果: {features_list}")
        else:
            if features_list is None:
                features_list = [col for col in self.data.columns if col != target_column]
            print(f"使用所有特徵: {len(features_list)}個特徵")
        
        X = self.data[features_list]
        y = self.data[target_column]
        
        # 確定要訓練的模型
        if selected_models is None:
            selected_models = list(self.models.keys())
        
        print(f"訓練模型: {selected_models}")
        
        param_grids = self.get_param_grids()
        
        # 定義評分函數
        def specificity_scorer(y_true, y_pred):
            metrics = self.calculate_negative_metrics(y_true, y_pred)
            return metrics['specificity']
        
        def npv_scorer(y_true, y_pred):
            metrics = self.calculate_negative_metrics(y_true, y_pred)
            return metrics['npv']
        
        scorers = {
            'specificity': make_scorer(specificity_scorer),
            'npv': make_scorer(npv_scorer)
        }
        
        results = {}
        
        for model_name in selected_models:
            print(f"\n=== 訓練 {model_name} ===")
            model = self.models[model_name]
            param_grid = param_grids[model_name]
            
            model_results = {}
            
            # 針對每個指標找最佳參數
            for metric_name, scorer in scorers.items():
                print(f"優化指標: {metric_name}")
                
                grid_search = GridSearchCV(
                    model, param_grid, cv=5, scoring=scorer,
                    n_jobs=-1, verbose=0
                )
                
                grid_search.fit(X, y)
                
                model_results[metric_name] = {
                    'best_params': grid_search.best_params_,
                    'best_score': grid_search.best_score_,
                    'best_model': grid_search.best_estimator_
                }
                
                print(f"  最佳參數: {grid_search.best_params_}")
                print(f"  最佳分數: {grid_search.best_score_:.4f}")
            
            results[model_name] = model_results
        
        self.best_params = results
        return results
    
    def cross_validate_models(self, target_column, features_list=None, cv_folds=5):
        """使用交叉驗證檢查過擬合"""
        if not self.best_params:
            raise ValueError("請先訓練模型!")
        
        if features_list is None and self.feature_ranking:
            features_list = self.feature_ranking[0] + self.feature_ranking[1]
        elif features_list is None:
            features_list = [col for col in self.data.columns if col != target_column]
        
        X = self.data[features_list]
        y = self.data[target_column]
        
        cv_results = {}
        
        for model_name, metrics_results in self.best_params.items():
            print(f"\n=== {model_name} 交叉驗證結果 ===")
            model_cv_results = {}
            
            for metric_name, result in metrics_results.items():
                best_model = result['best_model']
                
                # 進行交叉驗證
                cv_scores = cross_val_score(
                    best_model, X, y, cv=cv_folds, 
                    scoring=None # 可以根據需要調整
                )
                
                model_cv_results[metric_name] = {
                    'cv_scores': cv_scores,
                    'mean_score': cv_scores.mean(),
                    'std_score': cv_scores.std(),
                    'training_score': result['best_score']
                }
                
                print(f"  {metric_name}優化模型:")
                print(f"    訓練分數: {result['best_score']:.4f}")
                print(f"    交叉驗證分數: {cv_scores.mean():.4f} (+/- {cv_scores.std() * 2:.4f})")
                
            
            cv_results[model_name] = model_cv_results
        
        return cv_results
    
    def train_by_groups(self, target_column, group_column, selected_models=None, 
                       use_feature_selection=True):
        """針對不同群組分別訓練模型"""
        if self.data is None:
            raise ValueError("請先載入資料!")
        
        unique_groups = self.data[group_column].unique()
        print(f"發現 {len(unique_groups)} 個群組: {unique_groups}")
        
        all_results = {}
        
        for group in unique_groups:
            print(f"\n{'='*50}")
            print(f"處理群組: {group}")
            print(f"{'='*50}")
            
            # 篩選群組資料
            group_data = self.data[self.data[group_column] == group].copy()
            print(f"群組資料大小: {group_data.shape}")
            
            # 暫存原始資料
            original_data = self.data
            self.data = group_data
            
            group_results = {}
            
            try:
                # 步驟2: 特徵選擇 (如果啟用)
                if use_feature_selection:
                    exclude_cols = [group_column] if group_column != target_column else []
                    feature_ranking = self.get_feature_ranking(
                        target_column, exclude_columns=exclude_cols
                    )
                    group_results['feature_ranking'] = feature_ranking
                
                # 步驟3: 模型訓練
                training_results = self.train_models(
                    target_column, selected_models=selected_models,
                    use_feature_selection=use_feature_selection
                )
                group_results['training_results'] = training_results
                
                # 步驟4: 交叉驗證
                cv_results = self.cross_validate_models(target_column)
                group_results['cv_results'] = cv_results
                
            except Exception as e:
                print(f"群組 {group} 處理失敗: {str(e)}")
                group_results['error'] = str(e)
            
            all_results[f'group_{group}'] = group_results
            
            # 恢復原始資料
            self.data = original_data
        
        return all_results

# 使用範例
def main():
    # 初始化專案
    ml_project = MLProject()
    
    ml_project.load_data(r"C:\User\Mental Health Dataset (US).csv")
    
    # 特徵選擇
    feature_ranking = ml_project.get_feature_ranking(
        target_column='treatment',
        exclude_columns=None,  # 不參與排名的欄位
        top_k=5
    )
    
    # 訓練模型
    results = ml_project.train_models(
        target_column='treatment',
        selected_models=[],  # 選擇要訓練的模型
        use_feature_selection=True  # 是否使用特徵選擇
    )
    
    # 交叉驗證
    cv_results = ml_project.cross_validate_models('treatment')
    
    # 分群訓練
    group_results = ml_project.train_by_groups(
        target_column='treatment',
        group_column='Occupation',
        selected_models=[],
        use_feature_selection=True
    )

if __name__ == "__main__":
    main()