<a href="https://colab.research.google.com/github/hwangho-kim/LLM-AD/blob/master/%EC%98%A4%ED%86%A0%EC%9D%B8%EC%BD%94%EB%8D%94%20%EB%B0%8F%20SHAP%20%EA%B8%B0%EB%B0%98%20FDC_%EB%B6%84%EC%84%9D_%EB%AA%A8%EB%8D%B8_(%EB%8D%B0%EC%9D%B4%ED%84%B0%ED%94%84%EB%A0%88%EC%9E%84_%EB%B2%84%EC%A0%84).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# ==============================================================================
# 패키지 설치 (Package Installation)
# ==============================================================================
# 아래 명령어를 사용하여 이 스크립트 실행에 필요한 주요 라이브러리들을 설치합니다.
# In a command line or terminal, run the following command:
# pip install pandas numpy tensorflow scikit-learn shap matplotlib seaborn koreanize-matplotlib

# ==============================================================================
# 라이브러리 임포트 (Library Imports)
# ==============================================================================
# 데이터 처리, 모델링, 시각화, 설명가능 AI에 필요한 라이브러리들을 임포트합니다.
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.metrics import confusion_matrix, classification_report
import shap
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
# Matplotlib에서 한글 폰트가 깨지지 않도록 설정
import koreanize_matplotlib

# 경고 메시지 무시
warnings.filterwarnings('ignore')

# ==============================================================================
# 1단계: 샘플 데이터 생성 (Sample Data Generation)
# ==============================================================================
# FDC 데이터를 딕셔너리가 아닌 데이터프레임 형식으로 생성하도록 수정합니다.
def generate_wafer_data_df(num_wafers=500, timesteps=100):
    """
    하나의 공정 스텝에 대한 웨이퍼 데이터를 생성합니다.
    - FDC: 4개 센서 데이터를 포함하는 단일 데이터프레임 (Long Format)
    - MES: 각 웨이퍼의 공정 조건 및 최종 수율 정보
    """
    print(f"{num_wafers}개 웨이퍼에 대한 샘플 데이터 생성을 시작합니다...")

    # --- MES 데이터 생성 (정적 특징) ---
    wafers = []
    for i in range(num_wafers):
        wafer_id = f'WAF_{i+1:04d}'
        equipment_id = np.random.choice(['Etcher_A', 'Etcher_B', 'Etcher_C'])
        recipe_id = np.random.choice(['Recipe_1', 'Recipe_2'])
        wafers.append([wafer_id, equipment_id, recipe_id])
    mes_df = pd.DataFrame(wafers, columns=['wafer_id', 'equipment_id', 'recipe_id'])

    # --- FDC 데이터프레임 생성 ---
    fdc_records = []
    anomaly_types = []
    final_yields = []

    for i, row in mes_df.iterrows():
        wafer_id = row['wafer_id']

        # 기본 센서 값 설정
        pressure_mean, rf_power_mean, gas_flow_mean, temp_mean = 100, 500, 200, 80
        pressure = np.random.normal(pressure_mean, 1, timesteps)
        rf_power = np.random.normal(rf_power_mean, 5, timesteps)
        gas_flow = np.random.normal(gas_flow_mean, 2, timesteps)
        temperature = np.random.normal(temp_mean, 0.5, timesteps)
        anomaly_type, final_yield = 'Normal', 1

        if np.random.rand() < 0.2:
            final_yield = 0
            anomaly_choice = np.random.choice(['Spike', 'Variance', 'Drift'])
            anomaly_type = anomaly_choice
            if anomaly_choice == 'Spike':
                spike_sensor = np.random.choice(['pressure', 'rf_power'])
                spike_idx = np.random.randint(20, 80)
                if spike_sensor == 'pressure': pressure[spike_idx] += pressure_mean * 0.1 * np.sign(np.random.randn())
                else: rf_power[spike_idx] += rf_power_mean * 0.05 * np.sign(np.random.randn())
            elif anomaly_choice == 'Variance':
                var_sensor = np.random.choice(['gas_flow', 'temperature'])
                if var_sensor == 'gas_flow': gas_flow = np.random.normal(gas_flow_mean, 6, timesteps)
                else: temperature = np.random.normal(temp_mean, 1.5, timesteps)
            elif anomaly_choice == 'Drift':
                drift_sensor = np.random.choice(['pressure', 'rf_power'])
                drift_amount = np.linspace(0, (np.random.rand() - 0.5) * 10, timesteps)
                if drift_sensor == 'pressure': pressure += drift_amount
                else: rf_power += drift_amount * 5

        # 해당 웨이퍼의 FDC 데이터를 레코드 리스트에 추가
        for t in range(timesteps):
            fdc_records.append({
                'wafer_id': wafer_id,
                'timestep': t,
                'pressure': pressure[t],
                'rf_power': rf_power[t],
                'gas_flow': gas_flow[t],
                'temperature': temperature[t]
            })

        anomaly_types.append(anomaly_type)
        final_yields.append(final_yield)

    # 최종 데이터프레임 생성
    fdc_df = pd.DataFrame(fdc_records)
    mes_df['anomaly_type'] = anomaly_types
    mes_df['final_yield'] = final_yields

    print("샘플 데이터 생성이 완료되었습니다.")
    return mes_df, fdc_df

# 데이터 생성 실행
mes_df, fdc_df = generate_wafer_data_df(num_wafers=500, timesteps=100)
print("\n[생성된 FDC 데이터프레임 샘플 (상위 5개)]")
print(fdc_df.head())

# ==============================================================================
# 2단계: 시계열 특징 공학 (피처 강화)
# ==============================================================================
# 데이터프레임을 입력으로 받도록 특징 공학 함수를 수정합니다.
def feature_engineering_enhanced_df(mes_df, fdc_df, timesteps=100):
    """ 데이터프레임 형식의 시계열 데이터로부터 특징을 추출합니다. """
    print("\n강화된 시계열 특징 공학을 시작합니다 (데이터프레임 입력)...")

    features_list = []
    stable_start, stable_end = int(timesteps * 0.2), int(timesteps * 0.8)

    # wafer_id로 그룹화하여 웨이퍼별로 특징 추출
    grouped = fdc_df.groupby('wafer_id')

    for wafer_id, group_df in grouped:
        wafer_features = {'wafer_id': wafer_id}
        for sensor in ['pressure', 'rf_power', 'gas_flow', 'temperature']:
            series = group_df[sensor].values

            # 기본 통계 피처
            wafer_features[f'{sensor}_mean'] = np.mean(series)
            wafer_features[f'{sensor}_std'] = np.std(series)
            wafer_features[f'{sensor}_max'] = np.max(series)
            wafer_features[f'{sensor}_min'] = np.min(series)
            wafer_features[f'{sensor}_skew'] = pd.Series(series).skew()
            wafer_features[f'{sensor}_kurt'] = pd.Series(series).kurtosis()

            # 도메인 특화 피처
            wafer_features[f'{sensor}_stable_std'] = np.std(series[stable_start:stable_end])
            wafer_features[f'{sensor}_auc'] = np.trapz(series)
            wafer_features[f'{sensor}_slope'] = np.polyfit(range(timesteps), series, 1)[0]
            peak_threshold = np.mean(series) + 3 * np.std(series)
            wafer_features[f'{sensor}_peak_count'] = np.sum(series > peak_threshold)

        features_list.append(wafer_features)

    features_df = pd.DataFrame(features_list)
    print("특징 추출이 완료되었습니다.")

    final_df = pd.merge(mes_df, features_df, on='wafer_id')
    print("동적/정적 데이터 통합이 완료되었습니다.")
    return final_df

# 강화된 특징 공학 실행
final_df = feature_engineering_enhanced_df(mes_df, fdc_df)
print("\n[최종 통합 데이터셋 샘플 (상위 5개)]")
print(final_df.head())


# ==============================================================================
# 3단계: 비선형 이상 탐지 (오토인코더 모델)
# ==============================================================================
# 이 단계는 2단계의 결과물인 final_df를 사용하므로 이전과 동일하게 작동합니다.

# --- 데이터 전처리 ---
features_for_model = [col for col in final_df.columns if col not in ['wafer_id', 'anomaly_type', 'final_yield']]
categorical_features = ['equipment_id', 'recipe_id']
numerical_features = [col for col in features_for_model if col not in categorical_features]
preprocessor = ColumnTransformer(
    transformers=[
        ('num', MinMaxScaler(), numerical_features),
        ('cat', OneHotEncoder(), categorical_features)
    ], remainder='passthrough')

normal_df = final_df[final_df['final_yield'] == 1]
X_train, X_val = train_test_split(normal_df[features_for_model], test_size=0.2, random_state=42)
preprocessor.fit(final_df[features_for_model])
X_train_scaled = preprocessor.transform(X_train)
X_val_scaled = preprocessor.transform(X_val)
X_all_scaled = preprocessor.transform(final_df[features_for_model])

# --- 오토인코더 모델 정의 및 학습 ---
input_dim = X_train_scaled.shape[1]
encoding_dim = int(input_dim / 2)
input_layer = Input(shape=(input_dim,)); encoder = Dense(encoding_dim, activation="relu")(input_layer)
encoder = Dense(int(encoding_dim/2), activation="relu")(encoder)
decoder = Dense(int(encoding_dim/2), activation='relu')(encoder); decoder = Dense(encoding_dim, activation='relu')(decoder)
output_layer = Dense(input_dim, activation='sigmoid')(decoder)
autoencoder = Model(inputs=input_layer, outputs=output_layer)
autoencoder.compile(optimizer='adam', loss='mae')
print("\n오토인코더 모델 학습을 시작합니다...")
history = autoencoder.fit(X_train_scaled, X_train_scaled, epochs=50, batch_size=16, shuffle=True, validation_data=(X_val_scaled, X_val_scaled),
                          callbacks=[tf.keras.callbacks.EarlyStopping(monitor="val_loss", patience=5, mode="min")], verbose=1).history

# --- 이상 점수 계산 및 평가 ---
X_pred_scaled = autoencoder.predict(X_all_scaled)
mae = np.mean(np.abs(X_all_scaled - X_pred_scaled), axis=1)
final_df['reconstruction_error'] = mae
train_mae = np.mean(np.abs(X_train_scaled - autoencoder.predict(X_train_scaled)), axis=1)
threshold = np.quantile(train_mae, 0.99)
print(f"\n계산된 이상 탐지 임계치: {threshold:.4f}")

y_true = final_df['final_yield'] == 0
y_pred = final_df['reconstruction_error'] > threshold
print("\n[이상 탐지 모델 평가 결과 (오토인코더)]")
print(classification_report(y_true, y_pred, target_names=['정상 (Normal)', '이상 (Anomaly)']))
cm = confusion_matrix(y_true, y_pred)
plt.figure(figsize=(8, 6)); sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['정상 (Normal)', '이상 (Anomaly)'], yticklabels=['정상 (Normal)', '이상 (Anomaly)'])
plt.title('혼동 행렬 (Confusion Matrix) - 오토인코더', fontsize=16); plt.ylabel('실제 값 (Actual)', fontsize=12); plt.xlabel('예측 값 (Predicted)', fontsize=12); plt.show()


# ==============================================================================
# 4단계: 원인 변수 설명 (Explainable AI with SHAP)
# ==============================================================================
# 이 단계 역시 final_df를 사용하므로 이전과 동일하게 작동합니다.
print("\nSHAP 분석을 시작합니다 (상위 5개 이상 샘플 대상)...")
ohe_feature_names = list(preprocessor.named_transformers_['cat'].get_feature_names_out(categorical_features))
all_feature_names = numerical_features + ohe_feature_names
def model_predict_loss(data): return np.mean(np.abs(data - autoencoder.predict(data)), axis=1)
background_data = shap.sample(X_train_scaled, 100)
explainer = shap.KernelExplainer(model_predict_loss, background_data)
anomaly_samples_df = final_df[y_pred].head(5)
if not anomaly_samples_df.empty:
    anomaly_samples_scaled = preprocessor.transform(anomaly_samples_df[features_for_model])
    shap_values = explainer.shap_values(anomaly_samples_scaled)
    X_anomaly_df = pd.DataFrame(anomaly_samples_scaled, columns=all_feature_names)
    print("\n각 이상 샘플에 대한 SHAP Force Plot:")
    for i in range(len(shap_values)):
        shap.force_plot(explainer.expected_value, shap_values[i, :], X_anomaly_df.iloc[i, :], matplotlib=True, show=True)
    print("\n전체 이상 샘플에 대한 SHAP Summary Plot:")
    shap.summary_plot(shap_values, X_anomaly_df)
else:
    print("\n탐지된 이상 샘플이 없어 SHAP 분석을 건너뜁니다.")