<a href="https://colab.research.google.com/github/hwangho-kim/Utility-OAC/blob/main/CSV_Input_Based_Automated_FDC_Analysis_(Multi_Step).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import DBSCAN
from sklearn.decomposition import PCA
from sklearn.neighbors import NearestNeighbors

# --- 그래프 한글 폰트 설정 ---
try:
    import koreanize_matplotlib
except ImportError:
    pass

def create_sample_long_format_csv(filepath='sample_fdc_long_data.csv'):
    """
    [개선됨] 분석을 시연하기 위한 대규모 샘플 'Long Format' CSV 파일을 생성합니다.
    - Wafer 수: 100개
    - Step 수: 10개
    - Sensor 수: 10개
    """
    print(f"'{filepath}' 이름으로 데모용 샘플 CSV 파일을 생성합니다.")
    num_wafers = 100
    num_steps = 10
    num_sensors = 10
    time_points = 10
    anomaly_start_wafer = 80
    np.random.seed(42)

    records = []
    for wafer_id in range(1, num_wafers + 1):
        for step_id in range(1, num_steps + 1):
            record_base = {'wafer_id': wafer_id, 'step_id': step_id}

            # 10개의 센서 데이터 생성
            for i in range(num_sensors):
                sensor_name = f'Sensor_{chr(65+i)}'
                record_base[sensor_name] = np.random.randn(time_points) * (i * 0.1 + 0.5) + (i * 5 + step_id)

            # 이상 상태 주입 (80번 Wafer부터, 5번 Step에서)
            if wafer_id >= anomaly_start_wafer and step_id == 5:
                record_base['Sensor_C'] += 3.0  # Sensor C의 평균이 증가
                record_base['Sensor_G'] *= 1.5 # Sensor G의 변동성(분산)이 증가

            for t_idx in range(time_points):
                row = {
                    'wafer_id': record_base['wafer_id'],
                    'step_id': record_base['step_id'],
                    'time': t_idx
                }
                for i in range(num_sensors):
                    sensor_name = f'Sensor_{chr(65+i)}'
                    row[sensor_name] = record_base[sensor_name][t_idx]
                records.append(row)

    df = pd.DataFrame(records)
    df.to_csv(filepath, index=False)
    print("샘플 파일 생성 완료.")
    return df

def find_columns(df):
    """
    DataFrame에서 ID 및 센서 컬럼들을 자동으로 탐지합니다.
    """
    id_candidates = {
        'wafer': ['wafer_id', 'waferid', 'wafer', 'wfr_id', 'wafer_no', 'lot_wafer'],
        'step': ['step_id', 'stepid', 'step', 'step_no'],
        'time': ['time', 'timestamp', 'time_sec']
    }

    detected_cols = {}
    remaining_cols = list(df.columns)

    for id_type, candidates in id_candidates.items():
        found = False
        for col in remaining_cols:
            if col.lower() in candidates:
                detected_cols[id_type] = col
                remaining_cols.remove(col)
                found = True
                break
        if not found:
            print(f"경고: '{id_type}' ID 컬럼을 찾지 못했습니다.")
            detected_cols[id_type] = None

    sensor_cols = [col for col in remaining_cols if pd.api.types.is_numeric_dtype(df[col])]

    return detected_cols['wafer'], detected_cols['step'], sensor_cols

def extract_and_pivot_features(df, wafer_id_col, step_id_col, sensor_cols):
    """
    Long format 데이터에서 통계 특징을 추출하고, Wafer 단위의 Wide format으로 변환합니다.
    """
    print("\n--- Step 1-2: 특징 추출 및 데이터 재구조화(Pivot) ---")

    features = df.groupby([wafer_id_col, step_id_col])[sensor_cols].agg(['mean', 'std', 'max', 'min']).reset_index()
    features.columns = ['_'.join(col).strip() if isinstance(col, tuple) and col[1] != '' else col[0] for col in features.columns.values]

    feature_pivot = features.pivot(index=wafer_id_col, columns=step_id_col)
    feature_pivot.columns = [f"S{int(col[1])}_{col[0]}" for col in feature_pivot.columns.values]
    feature_pivot.fillna(0, inplace=True)

    print(f"최종 특징 벡터 생성 완료. Shape: {feature_pivot.shape}")
    return feature_pivot.reset_index()

def diagnose_top_wafers_by_health_index(df, golden_df, wafer_id_col, feature_cols, top_n=3):
    """
    Health Index가 높은 상위 Wafer들에 대해 원인 특징을 진단합니다.
    """
    print("\n--- Step 5: Health Index 상위 Wafer 원인 분석 ---")

    if golden_df.empty:
        print("정상 군집 데이터가 없어 원인 분석을 생략합니다.")
        return

    normal_stats = golden_df[feature_cols].agg(['mean', 'std'])
    df_sorted = df.sort_values(by='health_index', ascending=False)

    print(f"Health Index 기준 상위 {top_n}개 Wafer 진단 결과:")

    for i, row in enumerate(df_sorted.head(top_n).itertuples()):
        wafer_id = getattr(row, wafer_id_col)
        z_scores = {}
        for feature in feature_cols:
            val = getattr(row, feature)
            mean = normal_stats.loc['mean', feature]
            std = normal_stats.loc['std', feature]
            if std > 1e-6:
                z_scores[feature] = (val - mean) / std

        sorted_features = sorted(z_scores.items(), key=lambda item: abs(item[1]), reverse=True)

        print(f"\n[{i+1}] Wafer ID: {wafer_id} (Health Index: {row.health_index:.2f})")
        print("  > 상위 원인 특징 (Z-score 기준):")
        for feature, score in sorted_features[:3]:
            print(f"    - {feature}: {score:.2f}")

def analyze_fdc_from_csv(filepath):
    """
    Long Format CSV 파일을 입력받아 FDC 분석을 자동으로 수행하는 메인 함수
    """
    try:
        from kneed import KneeLocator
    except ImportError:
        print("오류: 'kneed' 라이브러리가 설치되지 않았습니다. 'pip install kneed'를 실행해주세요.")
        return

    print("\n--- Step 1: 데이터 로드 및 컬럼 탐지 ---")
    try:
        df_long = pd.read_csv(filepath)
    except FileNotFoundError:
        print(f"오류: '{filepath}' 파일을 찾을 수 없습니다.")
        return

    wafer_id_col, step_id_col, sensor_cols = find_columns(df_long)

    if not all([wafer_id_col, step_id_col, sensor_cols]):
        print("오류: Wafer ID, Step ID, Sensor 컬럼을 모두 탐지해야 분석을 진행할 수 있습니다.")
        return

    print(f"ID 컬럼 탐지: Wafer='{wafer_id_col}', Step='{step_id_col}'")
    print(f"총 {len(sensor_cols)}개의 센서 컬럼 탐지: {sensor_cols}")

    df_wide = extract_and_pivot_features(df_long, wafer_id_col, step_id_col, sensor_cols)
    feature_cols = [col for col in df_wide.columns if col != wafer_id_col]

    print("\n--- Step 2: 자동 eps 추정 및 DBSCAN 군집화 ---")
    X = df_wide[feature_cols]
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)

    min_samples = 5
    nearest_neighbors = NearestNeighbors(n_neighbors=min_samples)
    neighbors = nearest_neighbors.fit(X_scaled)
    distances, _ = neighbors.kneighbors(X_scaled)
    sorted_distances = np.sort(distances[:, min_samples-1])

    kneedle = KneeLocator(x=range(1, len(sorted_distances)+1), y=sorted_distances, S=1.0, curve="convex", direction="increasing")
    optimal_eps = kneedle.elbow_y

    if optimal_eps is None:
        print("경고: 최적 eps 값을 찾지 못했습니다. 기본값으로 대체합니다.")
        optimal_eps = np.median(sorted_distances)

    print(f"자동으로 찾은 최적의 eps 값: {optimal_eps:.4f}")

    dbscan = DBSCAN(eps=optimal_eps, min_samples=min_samples)
    clusters = dbscan.fit_predict(X_scaled)
    df_wide['dbscan_cluster'] = clusters

    cluster_counts = pd.Series(clusters).value_counts()
    print("\nDBSCAN 군집화 결과:\n", cluster_counts)

    if -1 in cluster_counts.index and len(cluster_counts) > 1:
        main_cluster_label = cluster_counts.drop(-1).idxmax()
    else:
        main_cluster_label = cluster_counts.idxmax()
    print(f"\n가장 큰 군집(정상 상태로 추정): Cluster {main_cluster_label}")

    print("\n--- Step 3: PCA 모델 학습 및 상태 인덱스 계산 ---")
    golden_df = df_wide[df_wide['dbscan_cluster'] == main_cluster_label].copy()

    if golden_df.empty:
        print("경고: 정상 군집이 없어 전체 데이터로 PCA 모델을 학습합니다.")
        scaler_pca = StandardScaler()
        golden_features_scaled = scaler_pca.fit_transform(df_wide[feature_cols])
    else:
        print(f"정상으로 식별된 데이터 개수: {len(golden_df)}")
        scaler_pca = StandardScaler()
        golden_features_scaled = scaler_pca.fit_transform(golden_df[feature_cols])

    pca_model = PCA(n_components=0.95)
    pca_model.fit(golden_features_scaled)
    print(f"PCA 모델 학습 완료. 선택된 주성분 개수: {pca_model.n_components_}")

    all_features_scaled = scaler_pca.transform(df_wide[feature_cols])
    all_pca_scores = pca_model.transform(all_features_scaled)

    # --- [수정됨] Health Index 계산 방식을 절댓값으로 변경 ---
    raw_health_index = all_pca_scores[:, 0]
    df_wide['health_index'] = np.abs(raw_health_index)
    print("Health Index 계산 완료 (절댓값 기준).")

    print("\n--- Step 4: 최종 결과 시각화 ---")
    plt.figure(figsize=(16, 8))
    sns.lineplot(x=wafer_id_col, y='health_index', data=df_wide, marker='o', color='gray', zorder=1, label='_nolegend_')
    sns.scatterplot(x=wafer_id_col, y='health_index', data=df_wide, hue='dbscan_cluster', palette='viridis', s=80, zorder=2)

    if not golden_df.empty:
        normal_mean_index = df_wide[df_wide['dbscan_cluster'] == main_cluster_label]['health_index'].mean()
        plt.axhline(y=normal_mean_index, color='dodgerblue', linestyle=':', linewidth=2, label=f'Normal Cluster Mean Index')

    plt.title('Unsupervised FDC Health Index (Multi-Step)', fontsize=18)
    plt.xlabel('Wafer ID', fontsize=12)
    plt.ylabel('Health Index (Absolute PC1 Score)', fontsize=12) # Y축 라벨 변경
    plt.legend(title='DBSCAN Cluster')
    plt.grid(True, which='both', linestyle='--', linewidth=0.5)
    plt.tight_layout()
    plt.show()

    diagnose_top_wafers_by_health_index(df_wide, golden_df, wafer_id_col, feature_cols, top_n=3)


if __name__ == '__main__':
    create_sample_long_format_csv(filepath='sample_fdc_long_data.csv')
    csv_file_path = 'sample_fdc_long_data.csv'
    analyze_fdc_from_csv(csv_file_path)