<a href="https://colab.research.google.com/github/hwangho-kim/pure-LAD/blob/master/EG_LAD_v4_SECOM_%EB%8D%B0%EC%9D%B4%ED%84%B0%EC%85%8B_%EB%B0%8F_%EB%B2%A0%EC%9D%B4%EC%A6%88_%EC%B5%9C%EC%A0%81%ED%99%94_%EA%B8%B0%EB%B0%98_%EA%B2%80%EC%A6%9D.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
# --- 라이브러리 설치 ---
# 이 코드를 실행하기 전에 터미널에서 다음 명령어를 실행하여 필요한 라이브러리를 설치해주세요.
!pip install scikit-learn pandas numpy matplotlib seaborn lightgbm bayesian-optimization

import numpy as np
import pandas as pd
import random
import time
import warnings
from sklearn.model_selection import StratifiedKFold, train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score, roc_auc_score
from sklearn.preprocessing import LabelEncoder
import matplotlib.pyplot as plt
import seaborn as sns
import lightgbm as lgb
from bayes_opt import BayesianOptimization

# 불필요한 경고 메시지 무시
warnings.filterwarnings('ignore')

# 한국어 폰트 설정 (그래프용)
try:
    import matplotlib.font_manager as fm
    font_path = '/usr/share/fonts/truetype/nanum/NanumGothic.ttf'
    font_prop = fm.FontProperties(fname=font_path)
    plt.rc('font', family=font_prop.get_name())
    plt.rcParams['axes.unicode_minus'] = False
except FileNotFoundError:
    print("나눔고딕 폰트가 설치되어 있지 않아, 그래프의 한글이 깨질 수 있습니다.")
    print("설치 방법: sudo apt-get update -qq && sudo apt-get install fonts-nanum* -qq")


class EnsembleGuidedLAD_v4:
    """
    EG-LAD v4: '이론 형성' 단계를 커버리지 기반의 가중치 부여 방식으로 변경하여 성능 개선을 목표로 하는 프레임워크
    """
    def __init__(self, purity_threshold=0.9, top_features_ratio=0.5):
        self.purity_threshold = purity_threshold
        self.top_features_ratio = top_features_ratio
        self.literals = []
        self.selected_b_feature_names_ = None
        self.final_model_patterns = []

    # --- 1단계: 최적 이진화 ---
    def phase1_optimal_binarization(self, X: pd.DataFrame, y: pd.Series):
        X_b = pd.DataFrame(index=X.index)
        literal_counter = 0

        for col in X.columns:
            if X[col].isnull().any(): continue
            stump = DecisionTreeClassifier(max_depth=1, criterion='entropy')
            stump.fit(X[[col]], y)

            if stump.tree_.node_count > 1:
                threshold = stump.tree_.threshold[0]

                literal_name_le = f"l_{literal_counter}"
                self.literals.append({'name': literal_name_le, 'feature': col, 'op': '<=', 'val': threshold})
                X_b[literal_name_le] = (X[col] <= threshold).astype(int)
                literal_counter += 1

                literal_name_gt = f"l_{literal_counter}"
                self.literals.append({'name': literal_name_gt, 'feature': col, 'op': '>', 'val': threshold})
                X_b[literal_name_gt] = (X[col] > threshold).astype(int)
                literal_counter += 1
        return X_b

    # --- 2단계: 앙상블 기반 특징 선택 ---
    def phase2_ensemble_feature_selection(self, X_b: pd.DataFrame, y: pd.Series):
        rf = RandomForestClassifier(n_estimators=100, random_state=42, n_jobs=-1)
        rf.fit(X_b, y)

        importances = rf.feature_importances_
        n_top_features = int(len(importances) * self.top_features_ratio)
        if n_top_features == 0 and len(importances) > 0: n_top_features = 1

        selected_indices = np.argsort(importances)[::-1][:n_top_features]
        self.selected_b_feature_names_ = X_b.columns[selected_indices]
        return X_b[self.selected_b_feature_names_]

    # --- 3단계: 계층적 패턴 생성 (HGS) ---
    def phase3_hgs_pattern_generation(self, X_b_min: pd.DataFrame, y: pd.Series):
        candidate_patterns = {0: [], 1: []}
        for target_class in [1, 0]:
            uncovered_samples_mask = (y.values == target_class)

            core_patterns = self._discover_patterns_for_subset(X_b_min, y, uncovered_samples_mask, target_class)
            candidate_patterns[target_class].extend(core_patterns)

            newly_covered_mask = self._get_any_pattern_coverage(X_b_min, candidate_patterns[target_class])
            uncovered_samples_mask = (y.values == target_class) & ~newly_covered_mask
            if np.any(uncovered_samples_mask):
                residual_patterns = self._discover_patterns_for_subset(X_b_min, y, uncovered_samples_mask, target_class)
                candidate_patterns[target_class].extend(residual_patterns)

            candidate_patterns[target_class] = self._prune_by_minimal_degree(candidate_patterns[target_class])
        return candidate_patterns

    def _discover_patterns_for_subset(self, X_b_all, y_all, subset_mask, target_class):
        positive_indices = np.where(subset_mask)[0]
        negative_indices = np.where(y_all.values != target_class)[0]

        train_indices = np.concatenate([positive_indices, negative_indices])
        if len(positive_indices) < 5 or len(negative_indices) < 5: return []

        X_train = X_b_all.iloc[train_indices]
        y_train = y_all.iloc[train_indices]

        lgbm = lgb.LGBMClassifier(objective='binary', verbose=-1, n_estimators=50)
        lgbm.fit(X_train, (y_train == target_class))

        trees_df = lgbm.booster_.trees_to_dataframe()
        seeds = self._extract_seeds(trees_df, target_class)

        refined_patterns = []
        unique_patterns = set()
        for seed in seeds:
            prime_pattern = self._refine_to_prime(X_b_all, y_all, seed, target_class)
            pattern_tuple = tuple(sorted(prime_pattern.items()))
            if prime_pattern and pattern_tuple not in unique_patterns:
                refined_patterns.append(prime_pattern)
                unique_patterns.add(pattern_tuple)
        return refined_patterns

    def _extract_seeds(self, trees_df, target_class):
        seeds = []
        for tree_index in trees_df['tree_index'].unique():
            tree = trees_df[trees_df['tree_index'] == tree_index]
            nodes = {node['node_index']: node for _, node in tree.iterrows()}
            if not nodes: continue
            root_index = tree.iloc[0]['node_index']
            def find_paths_recursive(node_index, current_path_rules):
                node = nodes.get(node_index)
                if node is None: return
                is_leaf = pd.isna(node.get('left_child')) and pd.isna(node.get('right_child'))
                if is_leaf:
                    leaf_value = node.get('leaf_value', node.get('value'))
                    if leaf_value is None: return
                    prediction = 1 if leaf_value > 0 else 0
                    if prediction == target_class and current_path_rules:
                        seeds.append(current_path_rules.copy())
                    return
                feature = node.get('split_feature')
                if feature is not None:
                    left_child_index, right_child_index = node.get('left_child'), node.get('right_child')
                    next_path_left = current_path_rules.copy(); next_path_left[feature] = 0
                    find_paths_recursive(left_child_index, next_path_left)
                    next_path_right = current_path_rules.copy(); next_path_right[feature] = 1
                    find_paths_recursive(right_child_index, next_path_right)
            find_paths_recursive(root_index, {})
        return seeds

    def _refine_to_prime(self, X_b, y, pattern, target_class):
        current_pattern = pattern.copy()
        while True:
            removed = False
            if len(current_pattern) <= 1: break
            for literal_col in list(current_pattern.keys()):
                temp_pattern = current_pattern.copy(); del temp_pattern[literal_col]
                mask = self._get_pattern_mask(X_b, temp_pattern)
                if mask.sum() < 3: continue
                purity = y.loc[mask].mean()
                is_pure_enough = (purity >= self.purity_threshold) if target_class == 1 else (purity <= (1 - self.purity_threshold))
                if is_pure_enough:
                    current_pattern = temp_pattern
                    removed = True
                    break
            if not removed: break
        return current_pattern

    def _prune_by_minimal_degree(self, patterns):
        if not patterns: return []
        literal_to_best_pattern = {}
        for pattern in patterns:
            degree = len(pattern)
            for literal_name, literal_value in pattern.items():
                key = (literal_name, literal_value)
                if key not in literal_to_best_pattern or degree < len(literal_to_best_pattern[key]):
                    literal_to_best_pattern[key] = pattern
        unique_patterns = {tuple(sorted(p.items())): p for p in literal_to_best_pattern.values()}
        return list(unique_patterns.values())

    def _get_pattern_mask(self, X_b, pattern_dict):
        mask = pd.Series(True, index=X_b.index)
        for col, val in pattern_dict.items():
            if col in X_b.columns:
                mask &= (X_b[col] == val)
        return mask

    def _get_any_pattern_coverage(self, X_b, candidate_patterns):
        final_mask = pd.Series(False, index=X_b.index)
        for pattern in candidate_patterns:
            final_mask |= self._get_pattern_mask(X_b, pattern)
        return final_mask

    # --- 4단계: 이론 형성 (Theory Formation) ---
    def phase4_theory_formation(self, candidate_patterns, X_b_min, y):
        self.final_model_patterns = []
        for target_class, patterns in candidate_patterns.items():
            total_class_samples = (y == target_class).sum()
            if total_class_samples == 0: continue
            for pattern in patterns:
                mask = self._get_pattern_mask(X_b_min, pattern)
                y_covered = y[mask.index[mask]]
                covered_class_samples = y_covered[y_covered == target_class].count()
                coverage = covered_class_samples / total_class_samples if total_class_samples > 0 else 0
                weight = coverage if target_class == 1 else -coverage
                self.final_model_patterns.append({'pattern': pattern, 'weight': weight, 'class': target_class})
        self.final_model_patterns.sort(key=lambda x: abs(x['weight']), reverse=True)

    def fit(self, X_train, y_train, verbose=False):
        if verbose: print("\n--- 훈련 데이터로 모델 fitting 시작 ---")
        X_train_b = self.phase1_optimal_binarization(X_train, y_train)
        if verbose: print(f"> 1단계 산출물: 이진화된 훈련 데이터 (shape: {X_train_b.shape})")

        X_train_b_min = self.phase2_ensemble_feature_selection(X_train_b, y_train)
        if verbose: print(f"> 2단계 산출물: 특징 선택된 훈련 데이터 (shape: {X_train_b_min.shape})")

        candidate_patterns = self.phase3_hgs_pattern_generation(X_train_b_min, y_train)
        if verbose: print(f"> 3단계 산출물: 후보 패턴 {sum(len(p) for p in candidate_patterns.values())}개 생성")

        self.phase4_theory_formation(candidate_patterns, X_train_b_min, y_train)
        if verbose: print(f"> 4단계 산출물: 최종 모델 패턴 {len(self.final_model_patterns)}개 구성")

    def predict(self, X_test):
        X_test_b = pd.DataFrame(0, index=X_test.index, columns=[l['name'] for l in self.literals])
        for l in self.literals:
            if l['feature'] in X_test.columns:
                 if l['op'] == '<=': X_test_b.loc[X_test[l['feature']] <= l['val'], l['name']] = 1
                 else: X_test_b.loc[X_test[l['feature']] > l['val'], l['name']] = 1

        X_test_b_min = X_test_b[self.selected_b_feature_names_]
        predictions = []
        for i in range(len(X_test_b_min)):
            sample = X_test_b_min.iloc[i]
            total_score = 0.0
            for p_info in self.final_model_patterns:
                if all(sample.get(col) == val for col, val in p_info['pattern'].items()):
                    total_score += p_info['weight']
            predictions.append(1 if total_score > 0 else 0)
        return np.array(predictions)

    def print_final_patterns(self):
        print("\n" + "="*50)
        print("최종 모델 패턴 및 가중치 (상위 5개)")
        print("="*50)
        b_name_to_literal = {l['name']: l for l in self.literals}
        for i, p_info in enumerate(self.final_model_patterns[:5]):
            desc = []
            for b_feature_name, value in p_info['pattern'].items():
                original_literal_info = b_name_to_literal.get(b_feature_name)
                if original_literal_info:
                    op = original_literal_info['op']
                    if value == 0: op = '>' if op == '<=' else '<='
                    desc.append(f"({original_literal_info['feature']} {op} {original_literal_info['val']:.2f})")
            print(f"  - 클래스 {p_info['class']} 패턴 #{i+1} (가중치: {p_info['weight']:.3f}): {' AND '.join(desc)}")

# --- 베이즈 최적화를 위한 설정 ---
X_train_opt, y_train_opt, X_val_opt, y_val_opt = [None] * 4

def black_box_function(purity_threshold, top_features_ratio):
    """베이즈 최적화가 평가할 목적 함수"""
    top_features_ratio = max(0.1, min(1.0, top_features_ratio))
    purity_threshold = max(0.7, min(1.0, purity_threshold))

    model = EnsembleGuidedLAD_v4(
        purity_threshold=purity_threshold,
        top_features_ratio=top_features_ratio
    )
    model.fit(X_train_opt, y_train_opt, verbose=False)
    y_pred = model.predict(X_val_opt)

    # [MODIFICATION] 불균형 데이터셋을 위해 roc_auc_score 사용
    # 단, 검증 세트에 두 클래스가 모두 존재할 때만 계산 가능
    if len(np.unique(y_val_opt)) < 2:
        return 0.5 # 한 클래스만 있으면 AUC는 0.5
    return roc_auc_score(y_val_opt, y_pred)

# --- 메인 실행 ---
if __name__ == '__main__':
    # 1. SECOM 데이터셋 로드 및 전처리
    print("#"*70)
    print("# 데이터셋: SECOM 처리 시작")
    print("#"*70)

    try:
        features_url = "https://archive.ics.uci.edu/ml/machine-learning-databases/secom/secom.data"
        labels_url = "https://archive.ics.uci.edu/ml/machine-learning-databases/secom/secom_labels.data"

        X = pd.read_csv(features_url, delim_whitespace=True, header=None)
        X.columns = [f'feature_{i}' for i in range(X.shape[1])]

        y_df = pd.read_csv(labels_url, delim_whitespace=True, header=None)
        y = y_df[0].map({-1: 0, 1: 1})

    except Exception as e:
        print(f"SECOM 데이터셋을 로드하는 중 오류가 발생했습니다: {e}")
        print("대체 데이터셋으로 Breast Cancer를 사용합니다.")
        from sklearn.datasets import load_breast_cancer
        cancer = load_breast_cancer()
        X = pd.DataFrame(cancer.data, columns=cancer.feature_names)
        y = pd.Series(np.where(cancer.target == 0, 1, 0), name='target')

    # 결측치를 각 열의 중앙값으로 대체
    X = X.fillna(X.median())

    # 2. 70/30 Train/Test 분할
    X_train_full, X_test, y_train_full, y_test = train_test_split(
        X, y, test_size=0.3, random_state=42, stratify=y
    )

    # 훈련 데이터에서 분산이 0인 상수 특징 제거
    constant_features = X_train_full.columns[X_train_full.nunique() <= 1]
    if not constant_features.empty:
        print(f"  > 훈련 데이터에서 분산이 0이거나 단일 값만 갖는 특징 {len(constant_features)}개를 제거합니다.")
        X_train_full = X_train_full.drop(columns=constant_features)
        X_test = X_test.drop(columns=constant_features, errors='ignore')

    # 3. 베이즈 최적화를 위한 내부 분할
    X_train_opt, X_val_opt, y_train_opt, y_val_opt = train_test_split(
        X_train_full, y_train_full, test_size=0.3, random_state=42, stratify=y_train_full
    )

    for df in [X_train_full, X_test, y_train_full, y_test, X_train_opt, X_val_opt, y_train_opt, y_val_opt]:
        df.reset_index(drop=True, inplace=True)

    # 4. 베이즈 최적화 실행
    print("\n" + "#"*70)
    print("# 베이즈 최적화를 통한 하이퍼파라미터 탐색 시작")
    print("#"*70)
    pbounds = {'purity_threshold': (0.8, 1.0), 'top_features_ratio': (0.2, 0.8)}
    optimizer = BayesianOptimization(f=black_box_function, pbounds=pbounds, random_state=42, verbose=2)
    optimizer.maximize(init_points=5, n_iter=10)

    best_params = optimizer.max['params']
    print("\n" + "*"*50)
    print(f"베이즈 최적화 결과, 최적의 파라미터:")
    print(f"  - purity_threshold: {best_params['purity_threshold']:.4f}")
    print(f"  - top_features_ratio: {best_params['top_features_ratio']:.4f}")
    print("*"*50)

    # 5. 최적 파라미터로 최종 모델 훈련 및 평가
    print("\n" + "#"*70)
    print("# 최적 파라미터로 최종 모델 훈련 및 평가")
    print("#"*70)

    start_time = time.time()

    final_lad_model = EnsembleGuidedLAD_v4(
        purity_threshold=best_params['purity_threshold'],
        top_features_ratio=best_params['top_features_ratio']
    )

    final_lad_model.fit(X_train_full, y_train_full, verbose=True)
    y_pred = final_lad_model.predict(X_test)

    final_lad_model.print_final_patterns()

    print("\n[Confusion Matrix]")
    print(confusion_matrix(y_test, y_pred))
    print("\n[Classification Report]")
    print(classification_report(y_test, y_pred, zero_division=0))

    end_time = time.time()
    print(f"\n최종 훈련 및 평가 시간: {end_time - start_time:.2f}초")

나눔고딕 폰트가 설치되어 있지 않아, 그래프의 한글이 깨질 수 있습니다.
설치 방법: sudo apt-get update -qq && sudo apt-get install fonts-nanum* -qq
######################################################################
# 데이터셋: SECOM 처리 시작
######################################################################
  > 훈련 데이터에서 분산이 0이거나 단일 값만 갖는 특징 116개를 제거합니다.

######################################################################
# 베이즈 최적화를 통한 하이퍼파라미터 탐색 시작
######################################################################
|   iter    |  target   | purity... | top_fe... |
-------------------------------------------------
| [39m2        [39m | [39m0.5      [39m | [39m0.8749080[39m | [39m0.7704285[39m |
| [39m3        [39m | [39m0.5      [39m | [39m0.9463987[39m | [39m0.5591950[39m |
| [39m4        [39m | [39m0.5      [39m | [39m0.8312037[39m | [39m0.2935967[39m |
| [39m5        [39m | [39m0.5      [39m | [39m0.8116167[39m | [39m0.7197056[39m |
| [39m6        [39m | [39m0.5      [39m | 