## 주조 공정최적화 AI 데이터셋
### 분류
https://www.kamp-ai.kr/aidataDetail?AI_SEARCH=%EC%A3%BC%EC%A1%B0+%EA%B3%B5%EC%A0%95%EC%B5%9C%EC%A0%81%ED%99%94+AI+%EB%8D%B0%EC%9D%B4%ED%84%B0%EC%85%8B&page=1&DATASET_SEQ=53&DISPLAY_MODE_SEL=CARD&EQUIP_SEL=&GUBUN_SEL=&FILE_TYPE_SEL=&WDATE_SEL=

출처 : 중소벤처기업부, Korea AI Manufacturing Platform(KAMP)

In [None]:
import pandas as pd
import koreanize_matplotlib

# 1. 데이터 준비 및 전처리

# 데이터 불러오기 (Unnamed: 0 컬럼을 인덱스로 지정)
df = pd.read_csv('data/casting.csv', encoding='cp949', index_col='Unnamed: 0')
df

In [None]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import MinMaxScaler, LabelEncoder
from sklearn.metrics import (confusion_matrix, accuracy_score, precision_score, 
                             recall_score, f1_score, roc_auc_score, cohen_kappa_score, 
                             roc_curve, classification_report)
import matplotlib.pyplot as plt
import seaborn as sns

# --- 데이터 클리닝 및 전처리 시작 ---
print("--- 원본 데이터 ---")
print(df.head(3))
print(f"원본 데이터 형태: {df.shape}")

In [None]:
# 1. 'passorfail' 컬럼 결측치 제거
# 현재 데이터에는 'passorfail'에 결측치가 없지만, 있다면 이 단계에서 제거됩니다.
initial_rows = len(df)
df.dropna(subset=['passorfail'], inplace=True)
print(f"\n'passorfail' 컬럼에 결측치가 있는 {initial_rows - len(df)}개의 행이 제거되었습니다.")
print(f"'passorfail' 결측치 제거 후 데이터 형태: {df.shape}")

# 실제 타겟 값 분리 (오토인코더 학습에는 사용 안 함, 인덱스 재설정 중요)
y_true_original = df['passorfail'].copy().reset_index(drop=True) 

# 2. 학습에 불필요한 컬럼 및 'passorfail' 컬럼 제거
cols_to_drop = [
    'passorfail', 'line', 'name', 'mold_name', 'date', 'time', 
    'working', 'emergency_st', 'EMS_operatio', 'stration_ti', 
    'heating_furnace'
]
X = df.drop(columns=cols_to_drop, errors='ignore') # errors='ignore'는 이미 없는 컬럼을 제거 시도할 때 오류 방지

print("\n--- 특성 선택 후 데이터 (X) ---")
X.head(3)


In [None]:
# 3. 범주형 변수 인코딩 (Label Encoding)
print("\n--- 범주형 변수 인코딩 ---")
categorical_cols = X.select_dtypes(include='object').columns
for col in categorical_cols:
    le = LabelEncoder()
    X[col] = le.fit_transform(X[col])
    print(f"컬럼 '{col}' Label Encoding 완료. 고유값 개수: {len(le.classes_)}")
    # print(f"  매핑: {dict(zip(le.classes_, le.transform(le.classes_)))}") # 매핑 정보가 길 수 있어 주석 처리

# 4. 수치형 컬럼 결측치 처리 (중앙값으로 대체)
print("\n--- 수치형 컬럼 결측치 처리 ---")
numerical_cols = X.select_dtypes(include=np.number).columns
for col in numerical_cols:
    if X[col].isnull().sum() > 0:
        median_val = X[col].median()
        X[col].fillna(median_val, inplace=True)
        print(f"컬럼 '{col}'의 결측치 {X[col].isnull().sum()}개를 중앙값 ({median_val:.2f})으로 대체했습니다.")
    else:
        print(f"컬럼 '{col}'에는 결측치가 없습니다.")


In [None]:
# 5. 이상치 탐지 및 로그 변환
print("\n--- 이상치 처리 (로그 변환) ---")
X_transformed = X.copy()
for col in numerical_cols:
    # IQR 방법으로 이상치 탐지
    Q1 = X_transformed[col].quantile(0.25)
    Q3 = X_transformed[col].quantile(0.75)
    IQR = Q3 - Q1
    
    # IQR이 0인 경우 (컬럼 값들이 거의 유사한 경우) 이상치 탐지 로직을 건너뜀
    if IQR == 0:
        print(f"컬럼 '{col}'의 IQR이 0입니다. 이상치 탐지를 건너뜁니다.")
        continue

    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    
    outliers_mask = (X_transformed[col] < lower_bound) | (X_transformed[col] > upper_bound)
    num_outliers = outliers_mask.sum()
    
    if num_outliers > 0:
        print(f"컬럼 '{col}'에서 이상치 {num_outliers}개 발견. 로그 변환(log1p) 적용.")
        # 음수 값 확인 (log1p는 0에는 적용 가능하지만, 음수에는 직접 적용 불가)
        if (X_transformed[col] < 0).any():
            min_val = X_transformed[col].min()
            if min_val < 0:
                 # 음수를 양수로 변환 (예: min_val만큼 더해서 최소값을 0으로 만들고 log1p)
                print(f"  경고: 컬럼 '{col}'에 음수 값({min_val}) 포함. 최소값을 0으로 조정 후 log1p 적용.")
                X_transformed[col] = np.log1p(X_transformed[col] - min_val)
            else: # 0은 괜찮음
                 X_transformed[col] = np.log1p(X_transformed[col])
        else:
            X_transformed[col] = np.log1p(X_transformed[col])
    else:
        print(f"컬럼 '{col}'에서 이상치 발견되지 않음.")

# 6. 데이터 스케일링 (Min-Max Scaler)
print("\n--- 데이터 스케일링 ---")
scaler = MinMaxScaler()
X_scaled = scaler.fit_transform(X_transformed)
print(f"스케일링 완료. 최종 특성 데이터 형태: {X_scaled.shape}")


In [None]:
# !!! 중요: 평가를 위한 불량 데이터(passorfail=1) 인위적 생성 !!!
# 실제 데이터는 모두 '정상(0)'이므로, 모델 평가 시연을 위해 일부를 '불량(1)'으로 가정합니다.
y_true = y_true_original.copy()
# 데이터가 적으므로, 예시로 2개를 불량으로 설정 (인덱스는 y_true_original 기준)
num_samples = len(y_true)
if num_samples >= 10: # 충분한 샘플이 있을 때
    y_true.iloc[[7, 10]] = 1 
elif num_samples >=2: # 샘플이 2개 이상 10개 미만
    y_true.iloc[0] = 1 # 첫번째 샘플 불량
    y_true.iloc[-1] = 1 # 마지막 샘플 불량
elif num_samples == 1: # 샘플이 1개면
     y_true.iloc[0] = 1 # 그냥 불량으로 처리 (평가 지표 의미 없음)
else: # 샘플이 없으면 (이 경우는 거의 발생 안함)
    pass 
      
print("\n--- 실제 값 (y_true) 분포 (불량 데이터 인위적 생성 후) ---")
print(y_true.value_counts())

# 학습 데이터가 있는지 최종 확인
if X_scaled.shape[0] == 0:
    print("\n오류: 전처리 후 학습할 데이터가 없습니다. 프로그램을 종료합니다.")
    exit()


In [None]:
# 2. 파이토치 오토인코더 모델 정의
class Autoencoder(nn.Module):
    def __init__(self, input_dim, encoding_dim1=16, encoding_dim2=8, latent_dim=4):
        super(Autoencoder, self).__init__()
        # 인코더
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, encoding_dim1),
            nn.ReLU(),
            nn.Linear(encoding_dim1, encoding_dim2),
            nn.ReLU(),
            nn.Linear(encoding_dim2, latent_dim) # 잠재 공간
        )
        # 디코더
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, encoding_dim2),
            nn.ReLU(),
            nn.Linear(encoding_dim2, encoding_dim1),
            nn.ReLU(),
            nn.Linear(encoding_dim1, input_dim),
            nn.Sigmoid() # MinMaxScaler 사용 시 Sigmoid가 일반적
        )
    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded

# 모델 파라미터 설정
input_dim = X_scaled.shape[1]
# 잠재 공간 차원 등은 데이터의 복잡성에 따라 조절 가능
model = Autoencoder(input_dim=input_dim, latent_dim=max(2, input_dim // 4)) # 잠재 차원 동적 조절
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)

# DataLoader 준비
tensor_x = torch.Tensor(X_scaled)
dataset = TensorDataset(tensor_x, tensor_x) 
# 배치 크기는 샘플 수보다 작거나 같아야 함
batch_size = min(8, len(tensor_x)) if len(tensor_x) > 0 else 1
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# 3. 모델 학습
print("\n--- 모델 학습 시작 ---")
num_epochs = 100
model.train()
if len(dataloader) > 0:
    for epoch in range(num_epochs):
        epoch_loss = 0
        for data_batch in dataloader:
            inputs, _ = data_batch
            outputs = model(inputs)
            loss = criterion(outputs, inputs)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()
        if (epoch + 1) % 20 == 0 or epoch == num_epochs -1 : # 20 에폭마다 또는 마지막 에폭
            print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss/len(dataloader):.6f}')
    print("--- 모델 학습 완료 ---")
else:
    print("오류: DataLoader에 데이터가 없어 학습을 진행할 수 없습니다.")
    exit()

In [None]:
# 4. 불량 탐지 (재구성 오류 계산 및 임계값 설정)
model.eval() 
with torch.no_grad():
    reconstructions = model(tensor_x)
    reconstruction_errors = np.mean((tensor_x.numpy() - reconstructions.numpy())**2, axis=1)

print("\n--- 재구성 오류 계산 완료 ---")
plt.figure(figsize=(10, 6))
sns.histplot(reconstruction_errors, bins=max(10, len(reconstruction_errors)//2), kde=True) # bins 동적 조절
plt.title('재구성 오류 분포')
plt.xlabel('재구성 오류 (MSE)')
plt.ylabel('빈도')

# 임계값 설정 (예: 90 percentile 또는 평균 + 표준편차 등)
# 데이터가 매우 적을 수 있으므로 주의
if len(reconstruction_errors) > 1:
    threshold = np.percentile(reconstruction_errors, 90) # 상위 10%를 이상치로 간주
elif len(reconstruction_errors) == 1:
    threshold = reconstruction_errors[0] * 1.1 # 단일 샘플이면 약간 높은 값 (임의)
else:
    threshold = 0 # 오류가 없으면 임계값 0
    print("경고: 재구성 오류가 없어 임계값을 0으로 설정합니다.")

plt.axvline(threshold, color='r', linestyle='--', label=f'임계값: {threshold:.4f}')
plt.legend()
plt.show()

print(f"\n설정된 불량 탐지 임계값: {threshold:.4f}")

# 임계값을 기준으로 불량(1)과 정상(0) 예측
y_pred = (reconstruction_errors > threshold).astype(int)
print("\n--- 모델 예측 결과 (0: 정상, 1: 불량) ---")
print(pd.Series(y_pred).value_counts())

In [None]:
# 5. 모델 평가
print("\n--- 모델 성능 평가 ---")
if len(y_true) != len(y_pred):
    print(f"오류: 실제 값(y_true, 길이 {len(y_true)})과 예측 값(y_pred, 길이 {len(y_pred)})의 길이가 다릅니다. 평가를 중단합니다.")
else:
    # (1) 교차표 (Confusion Matrix)
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(7, 5))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
                xticklabels=['정상 (0)', '불량 (1)'], 
                yticklabels=['정상 (0)', '불량 (1)'])
    plt.title('교차표 (Confusion Matrix)')
    plt.xlabel('예측된 값 (Predicted)')
    plt.ylabel('실제 값 (Actual)')
    plt.show()

    # (2) 평가지표
    accuracy = accuracy_score(y_true, y_pred)
    precision = precision_score(y_true, y_pred, zero_division=0)
    recall = recall_score(y_true, y_pred, zero_division=0)
    f1 = f1_score(y_true, y_pred, zero_division=0)
    kappa = cohen_kappa_score(y_true, y_pred)
    
    # ROC AUC (y_true에 0과 1이 모두 있어야 계산 가능)
    if len(np.unique(y_true)) > 1 and len(reconstruction_errors) == len(y_true) :
        roc_auc = roc_auc_score(y_true, reconstruction_errors)
    else:
        roc_auc = np.nan 
        print("경고: y_true에 단일 클래스만 존재하거나 길이가 맞지 않아 ROC AUC를 계산할 수 없습니다.")

    print(f"1. 정확도 (Accuracy): {accuracy:.4f}")
    print(f"2. 정밀도 (Precision): {precision:.4f}") # TP / (TP + FP)
    print(f"3. 재현율 (Recall): {recall:.4f}")       # TP / (TP + FN)
    print(f"4. F1-Score: {f1:.4f}")
    print(f"5. ROC AUC Score: {roc_auc:.4f}")
    print(f"6. Cohen's Kappa: {kappa:.4f}")

    print("\n--- 분류 리포트 (Classification Report) ---")
    # target_names은 y_true의 클래스에 맞게 설정
    target_names_report = [f'Class {cls}' for cls in sorted(y_true.unique())]
    if len(target_names_report) == 1: # 단일 클래스만 있는 경우
        if 0 in y_true.unique(): target_names_report = ['정상 (Class 0)']
        elif 1 in y_true.unique(): target_names_report = ['불량 (Class 1)']
    elif len(target_names_report) == 2:
        target_names_report = ['정상 (Class 0)', '불량 (Class 1)']
        
    print(classification_report(y_true, y_pred, target_names=target_names_report, zero_division=0))

    # 6. 추가 시각화: ROC Curve
    if not np.isnan(roc_auc) and len(np.unique(y_true)) > 1: # roc_auc가 계산 가능할 때만 그림
        fpr, tpr, _ = roc_curve(y_true, reconstruction_errors)
        plt.figure(figsize=(7, 7))
        plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (area = {roc_auc:.2f})')
        plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
        plt.xlim([0.0, 1.0])
        plt.ylim([0.0, 1.05])
        plt.xlabel('False Positive Rate (FPR)')
        plt.ylabel('True Positive Rate (TPR)')
        plt.title('ROC (Receiver Operating Characteristic) Curve')
        plt.legend(loc="lower right")
        plt.grid(True)
        plt.show()
    else:
        print("ROC Curve를 그릴 수 없습니다 (y_true에 단일 클래스만 있거나 ROC AUC 계산 불가).")