In [1]:
import pandas as pd
import numpy as np
from scipy.stats import spearmanr, pearsonr
from tensorflow.keras.layers import Dense, Dropout
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import MinMaxScaler

In [2]:
### 시각화를 위한 라이브러리
# - 파이썬 시각화에서 가장 기본적인 시각화 라이브러리
import matplotlib.pyplot as plt

### 한글처리
from matplotlib import font_manager, rc

### 운영체제 확인 라이브러리
import platform

### 시각화 시 마이너스(-, 음수) 기호 깨짐 방지
plt.rcParams["axes.unicode_minus"] = False

### OS별 한글처리
# - 윈도우 운영체게
if platform.system() == "Windows" :
    # path = "c:/Windows/Fonts/malgun.ttf"
    # font_name = font_manager.FontProperties(fname=path).get_name()
    # rc("font", family = font_name)
    
    ### 또는 아래처럼 한줄로도 가능 (아래 한글처리를 주로 사용합니다.)
    plt.rc("font", family = "Malgun Gothic")

# - Mac 운영체제
elif platform.system() == "Darwin" :
    rc("font", family = "AppleGothic")
    
else :
    print("넌 누구?")

In [3]:
labeled = pd.read_csv("./data/labeled_data.csv")

In [4]:
labeled_cn7 = labeled[(labeled["PART_NAME"] == "CN7 W/S SIDE MLD'G RH") |(labeled["PART_NAME"] == "CN7 W/S SIDE MLD'G LH")]

In [5]:
labeled_cn7 = labeled_cn7.drop(columns=["_id", "TimeStamp","PART_FACT_PLAN_DATE", "Reason", "PART_FACT_SERIAL", "PART_NAME",
               "EQUIP_NAME", "EQUIP_CD", "Switch_Over_Position", "Mold_Temperature_1","Mold_Temperature_2","Mold_Temperature_5",
                "Mold_Temperature_6", "Mold_Temperature_7","Mold_Temperature_8","Mold_Temperature_9","Mold_Temperature_10",
               "Mold_Temperature_11","Mold_Temperature_12","Barrel_Temperature_7"])

In [6]:
labeled_cn7

Unnamed: 0,PassOrFail,Injection_Time,Filling_Time,Plasticizing_Time,Cycle_Time,Clamp_Close_Time,Cushion_Position,Plasticizing_Position,Clamp_Open_Position,Max_Injection_Speed,...,Average_Back_Pressure,Barrel_Temperature_1,Barrel_Temperature_2,Barrel_Temperature_3,Barrel_Temperature_4,Barrel_Temperature_5,Barrel_Temperature_6,Hopper_Temperature,Mold_Temperature_3,Mold_Temperature_4
0,Y,9.59,4.47,16.920000,59.520000,7.13,653.409973,68.849998,647.98999,55.400002,...,59.299999,276.500000,274.700012,274.799988,269.200012,255.000000,229.699997,66.300003,24.799999,27.500000
1,Y,9.60,4.48,16.910000,59.580002,7.13,653.409973,68.839996,647.98999,55.299999,...,59.299999,276.200012,275.500000,275.299988,270.799988,254.699997,229.500000,67.199997,24.799999,27.600000
2,Y,9.60,4.48,16.910000,59.580002,7.13,653.409973,68.839996,647.98999,55.299999,...,59.299999,276.200012,275.500000,275.299988,270.799988,254.699997,229.500000,67.199997,24.799999,27.600000
3,Y,9.59,4.48,16.910000,59.560001,7.13,653.419983,68.839996,647.98999,55.299999,...,59.299999,276.500000,275.000000,275.399994,271.100006,254.899994,230.000000,66.900002,25.000000,27.600000
4,Y,9.59,4.48,16.910000,59.560001,7.13,653.419983,68.839996,647.98999,55.299999,...,59.299999,276.500000,275.000000,275.399994,271.100006,254.899994,230.000000,66.900002,25.000000,27.600000
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
7991,Y,9.60,4.48,16.620001,59.560001,7.11,653.429993,68.330002,647.98999,54.900002,...,59.900002,276.500000,274.899994,275.100006,269.500000,255.600006,229.600006,66.099998,21.000000,22.400000
7992,Y,9.60,4.48,16.650000,59.560001,7.11,653.429993,68.349998,647.98999,55.000000,...,59.900002,275.799988,275.299988,275.500000,270.700012,254.899994,230.000000,66.099998,21.000000,22.400000
7993,Y,9.60,4.48,16.650000,59.560001,7.11,653.429993,68.349998,647.98999,55.000000,...,59.900002,275.799988,275.299988,275.500000,270.700012,254.899994,230.000000,66.099998,21.000000,22.400000
7994,Y,9.60,4.48,16.629999,59.580002,7.11,653.429993,68.330002,647.98999,54.900002,...,59.900002,276.100006,275.799988,275.000000,271.299988,255.100006,230.199997,65.199997,21.000000,22.299999


In [7]:
labeled_cn7["PassOrFail"] = labeled_cn7["PassOrFail"].replace('Y',1).replace('N',0)

  labeled_cn7["PassOrFail"] = labeled_cn7["PassOrFail"].replace('Y',1).replace('N',0)


In [8]:
### Clamp_Open_Position에서 눈에띄게 낮은 값들 제거
cn7 = labeled_cn7[labeled_cn7["Clamp_Open_Position"]>=200] 

In [9]:
cn7 = cn7.drop(columns=["Clamp_Open_Position"])

In [10]:
def remove_outliers(df, column):
    # 1사분위수(Q1)와 3사분위수(Q3) 계산
    Q1 = df[column].quantile(0.25)
    Q3 = df[column].quantile(0.75)
    
    # IQR 계산
    IQR = Q3 - Q1
    
    # 이상치 범위 설정
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    
    # 이상치 제거
    filtered_df = df[(df[column] >= lower_bound) & (df[column] <= upper_bound)]
    
    return filtered_df

In [11]:
cn7_Y = cn7[cn7["PassOrFail"]==1].copy()
print("CN7의 양품 개수:", len(cn7_Y))
cn7_N = cn7[cn7["PassOrFail"]==0].copy()
print("CN7의 불량 개수:", len(cn7_N))

CN7의 양품 개수: 6661
CN7의 불량 개수: 39


In [12]:
tmp1 = cn7_Y.copy()
tmp1 = remove_outliers(tmp1, "Injection_Time")
tmp1 = remove_outliers(tmp1, "Plasticizing_Time")
tmp1 = remove_outliers(tmp1, "Cycle_Time")
tmp1 = remove_outliers(tmp1, "Max_Injection_Speed")
tmp1 = remove_outliers(tmp1, "Mold_Temperature_3")
cn7_pass_removed = tmp1.copy()
cn7_pass_removed.info()

<class 'pandas.core.frame.DataFrame'>
Index: 6534 entries, 0 to 7995
Data columns (total 24 columns):
 #   Column                    Non-Null Count  Dtype  
---  ------                    --------------  -----  
 0   PassOrFail                6534 non-null   int64  
 1   Injection_Time            6534 non-null   float64
 2   Filling_Time              6534 non-null   float64
 3   Plasticizing_Time         6534 non-null   float64
 4   Cycle_Time                6534 non-null   float64
 5   Clamp_Close_Time          6534 non-null   float64
 6   Cushion_Position          6534 non-null   float64
 7   Plasticizing_Position     6534 non-null   float64
 8   Max_Injection_Speed       6534 non-null   float64
 9   Max_Screw_RPM             6534 non-null   float64
 10  Average_Screw_RPM         6534 non-null   float64
 11  Max_Injection_Pressure    6534 non-null   float64
 12  Max_Switch_Over_Pressure  6534 non-null   float64
 13  Max_Back_Pressure         6534 non-null   float64
 14  Average_Back_

In [13]:
cn7_removed = pd.concat([cn7_pass_removed, cn7_N], ignore_index=True)
cn7_removed["PassOrFail"].value_counts()

PassOrFail
1    6534
0      39
Name: count, dtype: int64

In [14]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import IsolationForest
from sklearn.svm import OneClassSVM
from sklearn.cluster import DBSCAN
from sklearn.neighbors import LocalOutlierFactor
from sklearn.metrics import classification_report, confusion_matrix

# 독립변수와 종속변수 분리 (cn7_removed는 미리 정의된 DataFrame이어야 함)
y = cn7_removed["PassOrFail"]
X = cn7_removed.drop(columns=["PassOrFail"])

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

# 데이터 정규화: 학습 데이터에 fit한 후 양쪽을 transform
ss = StandardScaler()
ss.fit(X_train)
X_train_scaled = ss.transform(X_train)
X_test_scaled  = ss.transform(X_test)

# 이상탐지 모델 리스트 (여기서는 4가지 모델을 사용)
# Local Outlier Factor는 test 데이터에 대해 예측하려면 novelty=True로 설정해야 합니다.
models = {
    "Isolation Forest": IsolationForest(random_state=42),
    "One-Class SVM": OneClassSVM(),
    "DBSCAN": DBSCAN(eps=0.5, min_samples=5),
    "Local Outlier Factor": LocalOutlierFactor(n_neighbors=20, novelty=True, contamination='auto')
}

results = []
for model_name, model in models.items():
    # DBSCAN은 predict()가 없으므로, fit_predict()를 사용 (전체 테스트 데이터에 대해 클러스터링 수행)
    if model_name == "DBSCAN":
        y_pred = model.fit_predict(X_test_scaled)
        # DBSCAN은 outlier에 -1, 정상에 클러스터 번호를 부여하므로,
        # -1이면 0(불량), 그 외에는 1(정상)으로 변환
        y_pred_converted = [0 if label == -1 else 1 for label in y_pred]
    else:
        # 이상탐지 모델(나머지)은 일반적으로 정상 데이터의 특성만 학습하므로
        # X_train_scaled로 모델을 학습한 후 X_test_scaled에 대해 predict
        model.fit(X_train_scaled)
        y_pred = model.predict(X_test_scaled)
        # Isolation Forest와 One-Class SVM, LOF는 정상: 1, 이상: -1 을 반환하므로, -1이면 0, 1이면 1로 변환
        y_pred_converted = [1 if label == 1 else 0 for label in y_pred]
    
    results.append((model_name, y_pred_converted))

# 평가 지표 계산 및 결과 출력
for model_name, y_pred in results:
    print(f"{model_name} 결과:")
    print(classification_report(y_test, y_pred))
    cm = confusion_matrix(y_test, y_pred)
    cm_df = pd.DataFrame(cm, index=["Actual 0", "Actual 1"], columns=["Predicted 0", "Predicted 1"])
    print("혼돈 행렬:")
    print(cm_df)
    print("\n")


Isolation Forest 결과:
              precision    recall  f1-score   support

           0       0.08      1.00      0.15        14
           1       1.00      0.92      0.96      1958

    accuracy                           0.92      1972
   macro avg       0.54      0.96      0.55      1972
weighted avg       0.99      0.92      0.95      1972

혼돈 행렬:
          Predicted 0  Predicted 1
Actual 0           14            0
Actual 1          159         1799


One-Class SVM 결과:
              precision    recall  f1-score   support

           0       0.01      1.00      0.03        14
           1       1.00      0.52      0.69      1958

    accuracy                           0.53      1972
   macro avg       0.51      0.76      0.36      1972
weighted avg       0.99      0.53      0.68      1972

혼돈 행렬:
          Predicted 0  Predicted 1
Actual 0           14            0
Actual 1          932         1026


DBSCAN 결과:
              precision    recall  f1-score   support

           0 

  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


In [23]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense
from tensorflow.keras.optimizers import Adam
# --- 오토인코더 모델 정의 ---
input_dim = X_train_scaled.shape[1]  # 특성 차원
encoding_dim = 10  # 압축할 차원 수 (원하는 값으로 조정 가능)

# 입력층 정의
input_layer = Input(shape=(input_dim,))
# 인코딩 층 (은닉층)
encoded = Dense(encoding_dim, activation='relu')(input_layer)
# 디코딩 층 (출력층)
decoded = Dense(input_dim, activation='sigmoid')(encoded)

# 오토인코더 모델 생성 및 컴파일
autoencoder = Model(inputs=input_layer, outputs=decoded)
autoencoder.compile(optimizer=Adam(), loss='mean_squared_error')

# --- 오토인코더 모델 학습 ---
autoencoder.fit(X_train_scaled, X_train_scaled,
                epochs=100,
                batch_size=32,
                validation_split=0.2)

# --- 재구성 오차 계산을 통한 이상치 탐지 ---
# 테스트 데이터에 대한 재구성
X_test_pred = autoencoder.predict(X_test_scaled)
# 각 샘플별 재구성 오차(평균 제곱 오차) 계산
reconstruction_error = np.mean(np.square(X_test_scaled - X_test_pred), axis=1)

# 95% 분위수를 기준으로 이상치 판별 임계값 산출
threshold = np.percentile(reconstruction_error, 98)
print("이상치 판별 기준 (재구성 오차):", threshold)

# 재구성 오차가 임계값보다 큰 경우 이상치(불량)로 판단: 0 (불량), 그렇지 않으면 1 (정상)
y_pred = np.where(reconstruction_error > threshold, 0, 1)

# --- 평가 지표 출력 ---
print("Classification Report:")
print(classification_report(y_test, y_pred))
cm = confusion_matrix(y_test, y_pred)
cm_df = pd.DataFrame(cm, index=["Actual 0", "Actual 1"], columns=["Predicted 0", "Predicted 1"])
print("혼돈 행렬:")
print(cm_df)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100
Epoch 45/100
Epoch 46/100
Epoch 47/100
Epoch 48/100
Epoch 49/100
Epoch 50/100
Epoch 51/100
Epoch 52/100
Epoch 53/100
Epoch 54/100
Epoch 55/100
Epoch 56/100
Epoch 57/100
Epoch 58/100
Epoch 59/100
Epoch 60/100
Epoch 61/100
Epoch 62/100
Epoch 63/100
Epoch 64/100
Epoch 65/100
Epoch 66/100
Epoch 67/100
Epoch 68/100
Epoch 69/100
Epoch 70/100
Epoch 71/100
Epoch 72/100
Epoch 73/100
Epoch 74/100
Epoch 75/100
Epoch 76/100
Epoch 77/100
Epoch 78

In [29]:
threshold = np.percentile(reconstruction_error, 99.5
                         )
print("이상치 판별 기준 (재구성 오차):", threshold)

# 재구성 오차가 임계값보다 큰 경우 이상치(불량)로 판단: 0 (불량), 그렇지 않으면 1 (정상)
y_pred = np.where(reconstruction_error > threshold, 0, 1)

# --- 평가 지표 출력 ---
print("Classification Report:")
print(classification_report(y_test, y_pred))
cm = confusion_matrix(y_test, y_pred)
cm_df = pd.DataFrame(cm, index=["Actual 0", "Actual 1"], columns=["Predicted 0", "Predicted 1"])
print("혼돈 행렬:")
print(cm_df)

이상치 판별 기준 (재구성 오차): 1.8988741765023507
Classification Report:
              precision    recall  f1-score   support

           0       1.00      0.71      0.83        14
           1       1.00      1.00      1.00      1958

    accuracy                           1.00      1972
   macro avg       1.00      0.86      0.92      1972
weighted avg       1.00      1.00      1.00      1972

혼돈 행렬:
          Predicted 0  Predicted 1
Actual 0           10            4
Actual 1            0         1958


# import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import IsolationForest
from sklearn.svm import OneClassSVM
from sklearn.cluster import DBSCAN
from sklearn.neighbors import LocalOutlierFactor
from sklearn.metrics import f1_score, classification_report, confusion_matrix
from sklearn.base import BaseEstimator, ClassifierMixin

# -----------------------------
# 데이터 준비 및 정규화
# -----------------------------
# cn7_removed는 미리 정의된 DataFrame이어야 함 (PassOrFail 열 포함)
y = cn7_removed["PassOrFail"]
X = cn7_removed.drop(columns=["PassOrFail"])

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

# 학습 데이터에 대해 조건 필터링 (예: Average_Screw_RPM > 250)
Xy_train = X_train.copy()
Xy_train["PassOrFail"] = y_train
Xy_train = Xy_train[Xy_train["Average_Screw_RPM"] > 250]
X_train = Xy_train.drop(columns=["PassOrFail"])
y_train = Xy_train["PassOrFail"]

# 데이터 정규화 (표준화)
ss = StandardScaler()
X_train_scaled = ss.fit_transform(X_train)
X_test_scaled  = ss.transform(X_test)

# -----------------------------
# 사용자 정의 스코어러 (F1 score 기준)
# -----------------------------
def anomaly_scorer(estimator, X, y_true):
    # 예측값은 각 모델에서 1(정상) 또는 -1(이상)으로 반환하므로, -1은 0(불량), 1은 1(정상)으로 변환
    y_pred = estimator.predict(X)
    y_pred_conv = np.where(y_pred == 1, 1, 0)
    return f1_score(y_true, y_pred_conv)

# -----------------------------
# DBSCAN 래퍼 클래스 (GridSearchCV 호환)
# -----------------------------
class DBSCANWrapper(BaseEstimator, ClassifierMixin):
    def __init__(self, eps=0.5, min_samples=5):
        self.eps = eps
        self.min_samples = min_samples
    
    def fit(self, X, y=None):
        self.model_ = DBSCAN(eps=self.eps, min_samples=self.min_samples)
        # DBSCAN은 fit 이후에도 predict가 없으므로 미리 학습 데이터에 대해 fit 수행
        self.model_.fit(X)
        return self
    
    def predict(self, X):
        # fit_predict를 통해 클러스터링 수행; -1은 이상치로 간주
        labels = self.model_.fit_predict(X)
        return np.where(labels == -1, 0, 1)
    
    def get_params(self, deep=True):
        return {"eps": self.eps, "min_samples": self.min_samples}
    
    def set_params(self, **params):
        self.eps = params.get("eps", self.eps)
        self.min_samples = params.get("min_samples", self.min_samples)
        return self

# -----------------------------
# 각 모델별 하이퍼파라미터 그리드
# -----------------------------
# Isolation Forest
iso = IsolationForest(random_state=42)
param_grid_if = {
    'n_estimators': [50, 100, 200],
    'max_samples': [0.5, 1.0],
    'contamination': [0.01, 0.05, 0.1]
}

# One-Class SVM
ocs = OneClassSVM()
param_grid_ocs = {
    'kernel': ['rbf', 'linear'],
    'nu': [0.1, 0.5, 0.9],
    'gamma': ['scale', 'auto']
}

# DBSCAN (래퍼 사용)
dbscan_wrapper = DBSCANWrapper()
param_grid_dbscan = {
    'eps': [0.3, 0.5, 0.7],
    'min_samples': [5, 10]
}

# Local Outlier Factor (novelty=True)
lof = LocalOutlierFactor(novelty=True)
param_grid_lof = {
    'n_neighbors': [10, 20, 30],
    'contamination': [0.01, 0.05, 0.1]
}

# -----------------------------
# Grid Search 실행 및 성능 평가
# -----------------------------
models = {
    "Isolation Forest": (iso, param_grid_if),
    "One-Class SVM": (ocs, param_grid_ocs),
    "DBSCAN": (dbscan_wrapper, param_grid_dbscan),
    "Local Outlier Factor": (lof, param_grid_lof)
}

results = {}  # 각 모델별 GridSearch 결과 저장

for model_name, (model_obj, param_grid) in models.items():
    print(f"==== {model_name} Grid Search ====")
    grid = GridSearchCV(model_obj,
                        param_grid,
                        cv=5,
                        scoring=anomaly_scorer,
                        n_jobs=-1)
    grid.fit(X_train_scaled, y_train)
    best_estimator = grid.best_estimator_
    best_params = grid.best_params_
    best_score = grid.best_score_
    
    # 테스트 데이터 평가
    y_pred = best_estimator.predict(X_test_scaled)
    # 변환: -1 -> 0, 1 -> 1
    y_pred_conv = np.where(y_pred == 1, 1, 0)
    report = classification_report(y_test, y_pred_conv, output_dict=True)
    cm = confusion_matrix(y_test, y_pred_conv)
    print(f"Best Parameters: {best_params}")
    print(f"Best CV F1 Score: {best_score:.4f}")
    print("Test Classification Report:")
    print(classification_report(y_test, y_pred_conv))
    print("Test Confusion Matrix:")
    print(pd.DataFrame(cm, index=["실제 0", "실제 1"], columns=["예측 0", "예측 1"]))
    print("\n")
    
    results[model_name] = {
        "Best Params": best_params,
        "CV F1 Score": best_score,
        "Test Report": report,
        "Confusion Matrix": cm
    }

# -----------------------------
# 결과 요약 (원하는 방식으로 후속 분석 가능)
# -----------------------------
results_df = pd.DataFrame({
    m: {"Best Params": results[m]["Best Params"], "CV F1 Score": results[m]["CV F1 Score"]}
    for m in results
}).T

print("모델별 최적 하이퍼파라미터 및 CV F1 Score 요약:")
print(results_df)
