In [36]:
import pandas as pd
df = pd.read_csv('data/clinical_mastitis_cows.csv')

In [28]:
df['Breed'].value_counts()

Jersey      6498
hostlene     102
Name: Breed, dtype: int64

In [29]:
df.head(6)

Unnamed: 0,Cow_ID,Day,Breed,Months after giving birth,Previous_Mastits_status,IUFL,EUFL,IUFR,EUFR,IURL,EURL,IURR,EURR,Temperature,Hardness,Pain,Milk_visibility,class1
0,cow1,1,Jersey,1,0,150,180,150,180,150,181,150,181,43,0,0,0,0
1,cow1,2,Jersey,1,0,152,180,152,185,151,180,152,181,42,0,0,0,0
2,cow1,3,Jersey,1,0,152,182,153,186,151,186,153,183,41,0,0,0,0
3,cow1,4,Jersey,1,0,155,183,155,189,155,182,155,186,40,0,0,0,0
4,cow1,5,Jersey,1,0,150,186,150,181,150,185,150,188,41,0,0,0,0
5,cow1,6,Jersey,1,0,152,182,152,182,150,181,152,181,40,0,0,0,0


In [37]:
changing_ids = ['cow10']

# 사용할 컬럼
cols = ['Cow_ID', 'Day', 'IUFL', 'EUFL', 'IUFR', 'EUFR', 
        'IURL', 'EURL', 'IURR', 'EURR', 'Temperature', 'class1']

# 변화한 개체 제거
df_filtered = df[
    (~df['Cow_ID'].isin(changing_ids)) &
    (df['Breed'] != 'hostlene')
][cols].copy()

# 평균과 범위 계산 대상 컬럼
value_cols = ['IUFL', 'EUFL', 'IUFR', 'EUFR', 
              'IURL', 'EURL', 'IURR', 'EURR', 'Temperature']

# (1) 평균 계산
mean_df = df_filtered.groupby('Cow_ID')[value_cols].mean()
mean_df.columns = [f"{col}_mean" for col in mean_df.columns]

# (2) 범위 (최대 - 최소) 계산
range_df = (
    df_filtered.groupby('Cow_ID')[value_cols].max()
    - df_filtered.groupby('Cow_ID')[value_cols].min()
)
range_df.columns = [f"{col}_range" for col in range_df.columns]

# (3) class1은 마지막 날짜 기준
class_df = (
    df_filtered.sort_values(['Cow_ID', 'Day'])
               .groupby('Cow_ID')['class1']
               .last()
)

# (4) 병합
final_df = pd.concat([mean_df, range_df], axis=1)
final_df['class1'] = class_df

# 정렬 및 인덱스 정리
final_df = final_df.reset_index()
final_df['Cow_ID_num'] = final_df['Cow_ID'].str.extract('(\d+)').astype(int)
final_df = final_df.sort_values('Cow_ID_num').drop(columns='Cow_ID_num')
final_df = final_df.set_index('Cow_ID')

# 결과 확인
print(f"최종 변수 개수: {final_df.shape[1]}")
final_df.head()

최종 변수 개수: 19


Unnamed: 0_level_0,IUFL_mean,EUFL_mean,IUFR_mean,EUFR_mean,IURL_mean,EURL_mean,IURR_mean,EURR_mean,Temperature_mean,IUFL_range,EUFL_range,IUFR_range,EUFR_range,IURL_range,EURL_range,IURR_range,EURR_range,Temperature_range,class1
Cow_ID,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1
cow1,151.833333,182.166667,152.0,183.833333,151.166667,182.5,152.0,183.333333,41.166667,5,6,5,9,5,6,5,7,3,0
cow2,237.833333,275.5,233.5,276.166667,234.333333,279.5,238.666667,277.333333,40.833333,15,11,10,13,15,17,20,10,3,1
cow3,238.833333,276.5,234.5,277.166667,235.333333,280.5,239.666667,278.333333,42.166667,15,11,10,13,15,17,20,10,6,1
cow4,183.333333,211.5,185.5,215.833333,183.833333,211.333333,185.666667,213.666667,41.166667,7,6,8,8,6,5,10,5,2,0
cow5,154.333333,184.833333,154.333333,188.166667,156.833333,183.833333,154.0,184.666667,42.666667,8,9,9,9,10,7,10,8,5,1


In [38]:
final_df.shape

(1082, 19)

In [39]:
from sklearn.model_selection import train_test_split

# 시드 고정
seed = 42

# class1 기준 분리
normal_df = final_df[final_df['class1'] == 0].drop(columns='class1')
abnormal_df = final_df[final_df['class1'] == 1].drop(columns='class1')

# 정상: 400 train / 100 val / 100 test
train_normal, remaining_normal = train_test_split(normal_df, train_size=400, random_state=seed)
val_normal = remaining_normal.sample(n=100, random_state=seed)
remaining_for_test = remaining_normal.drop(val_normal.index)
test_normal = remaining_for_test.sample(n=100, random_state=seed)

# 비정상: 100 val / 100 test
val_abnormal = abnormal_df.sample(n=100, random_state=seed)
test_abnormal = abnormal_df.drop(val_abnormal.index).sample(n=100, random_state=seed)

# 확인
print(f"✅ train_normal: {len(train_normal)}")
print(f"✅ val_normal: {len(val_normal)}, val_abnormal: {len(val_abnormal)}")
print(f"✅ test_normal: {len(test_normal)}, test_abnormal: {len(test_abnormal)}")

✅ train_normal: 400
✅ val_normal: 100, val_abnormal: 100
✅ test_normal: 100, test_abnormal: 100


In [40]:
import torch

# DataFrame to Tensor 변환 함수
def to_tensor(df):
    return torch.tensor(df.values, dtype=torch.float32)

# 텐서로 변환
X_train = to_tensor(train_normal)
X_val_normal = to_tensor(val_normal)
X_val_abnormal = to_tensor(val_abnormal)
X_test_normal = to_tensor(test_normal)
X_test_abnormal = to_tensor(test_abnormal)

In [80]:
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.metrics import confusion_matrix, precision_score, recall_score, f1_score, accuracy_score
import numpy as np
import random

seed = 42
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

# AutoEncoder 정의
class AutoEncoder(nn.Module):
    def __init__(self, input_dim=18, latent_dim=14):
        super(AutoEncoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, latent_dim),
            nn.ReLU(),
            nn.Dropout(0.2)
        )
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, input_dim)
        )

    def forward(self, x):
        z = self.encoder(x)
        out = self.decoder(z)
        return out

# 재구성 오차 계산 함수
def reconstruction_error(x, model, batch_size=128):
    model.eval()
    errors = []
    with torch.no_grad():
        for i in range(0, len(x), batch_size):
            x_batch = x[i:i+batch_size]
            recon = model(x_batch)
            err = torch.mean((x_batch - recon) ** 2, dim=1)
            errors.append(err)
    return torch.cat(errors)

# 모델 초기화
model = AutoEncoder(input_dim=18)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

# 학습 파라미터 설정
n_epochs = 100000
patience = 10000
best_loss = float('inf')
trigger_times = 0

# 학습 루프
for epoch in range(n_epochs):
    model.train()
    optimizer.zero_grad()
    output = model(X_train)
    loss = criterion(output, X_train)
    loss.backward()
    optimizer.step()

    # early stopping용 검증 로스
    model.eval()
    with torch.no_grad():
        val_output = model(X_val_normal)
        val_loss = criterion(val_output, X_val_normal)

    if val_loss < best_loss:
        best_loss = val_loss
        best_model_state = model.state_dict()
        trigger_times = 0
    else:
        trigger_times += 1
        if trigger_times >= patience:
            print(f"Early stopping at epoch {epoch}")
            break

# 최적 모델 로드
model.load_state_dict(best_model_state)

# 검증 데이터 재구성 오차
recon_val_normal = reconstruction_error(X_val_normal, model)
recon_val_abnormal = reconstruction_error(X_val_abnormal, model)
recon_val_all = torch.cat([recon_val_normal, recon_val_abnormal])
y_val_true = torch.cat([
    torch.zeros_like(recon_val_normal),
    torch.ones_like(recon_val_abnormal)
])

# F1 기준 최적 임계값 찾기
thresholds = torch.linspace(recon_val_all.min(), recon_val_all.max(), steps=200)
best_f1 = 0
best_threshold = None
for t in thresholds:
    y_pred = (recon_val_all >= t).int()
    f1 = f1_score(y_val_true.numpy(), y_pred.numpy())
    if f1 > best_f1:
        best_f1 = f1
        best_threshold = t

# 테스트 데이터 평가
recon_test_normal = reconstruction_error(X_test_normal, model)
recon_test_abnormal = reconstruction_error(X_test_abnormal, model)
recon_test_all = torch.cat([recon_test_normal, recon_test_abnormal])
y_test_true = torch.cat([
    torch.zeros_like(recon_test_normal),
    torch.ones_like(recon_test_abnormal)
])
y_test_pred = (recon_test_all >= best_threshold).int()

# 평가 지표 계산 (비정상 class=1을 기준으로)
cm = confusion_matrix(y_test_true, y_test_pred, labels=[0, 1])
precision = precision_score(y_test_true, y_test_pred, pos_label=1)
recall = recall_score(y_test_true, y_test_pred, pos_label=1)
f1 = f1_score(y_test_true, y_test_pred, pos_label=1)
accuracy = accuracy_score(y_test_true, y_test_pred)

# 결과 출력
print("📌 Confusion Matrix (기준: 실제=행, 예측=열, class=1 비정상 기준)")
print(cm)
print(f"Precision: {precision:.4f}")
print(f"Recall:    {recall:.4f}")
print(f"F1 Score:  {f1:.4f}")
print(f"Accuracy:  {accuracy:.4f}")
print(f"Best Threshold by F1: {best_threshold:.6f}")

Early stopping at epoch 32989
📌 Confusion Matrix (기준: 실제=행, 예측=열, class=1 비정상 기준)
[[98  2]
 [ 4 96]]
Precision: 0.9796
Recall:    0.9600
F1 Score:  0.9697
Accuracy:  0.9700
Best Threshold by F1: 81.600266


In [8]:
import torch.nn as nn
import torch.optim as optim
from sklearn.metrics import confusion_matrix, precision_score, recall_score, f1_score, accuracy_score
import numpy as np
import random
from sklearn.metrics import fbeta_score

seed = 45
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

# AutoEncoder 정의
class AutoEncoder(nn.Module):
    def __init__(self, input_dim=18, latent_dim=7):
        super(AutoEncoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, latent_dim),
            nn.ReLU(),
            nn.Dropout(0.2)
        )
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, input_dim)
        )

    def forward(self, x):
        z = self.encoder(x)
        out = self.decoder(z)
        return out

# 재구성 오차 계산 함수
def reconstruction_error(x, model, batch_size=128):
    model.eval()
    errors = []
    with torch.no_grad():
        for i in range(0, len(x), batch_size):
            x_batch = x[i:i+batch_size]
            recon = model(x_batch)
            err = torch.mean((x_batch - recon) ** 2, dim=1)
            errors.append(err)
    return torch.cat(errors)

# 모델 초기화
model = AutoEncoder(input_dim=18)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

# 학습 파라미터 설정
n_epochs = 100000
patience = 10000
best_loss = float('inf')
trigger_times = 0

# 학습 루프
for epoch in range(n_epochs):
    model.train()
    optimizer.zero_grad()
    output = model(X_train)
    loss = criterion(output, X_train)
    loss.backward()
    optimizer.step()

    # early stopping용 검증 로스
    model.eval()
    with torch.no_grad():
        val_output = model(X_val_normal)
        val_loss = criterion(val_output, X_val_normal)

    if val_loss < best_loss:
        best_loss = val_loss
        best_model_state = model.state_dict()
        trigger_times = 0
    else:
        trigger_times += 1
        if trigger_times >= patience:
            print(f"Early stopping at epoch {epoch}")
            break

# 최적 모델 로드
model.load_state_dict(best_model_state)

# 검증 데이터 재구성 오차
recon_val_normal = reconstruction_error(X_val_normal, model)
recon_val_abnormal = reconstruction_error(X_val_abnormal, model)
recon_val_all = torch.cat([recon_val_normal, recon_val_abnormal])
y_val_true = torch.cat([
    torch.zeros_like(recon_val_normal),
    torch.ones_like(recon_val_abnormal)
])

# F2 기준 최적 임계값 찾기
thresholds = torch.linspace(recon_val_all.min(), recon_val_all.max(), steps=200)
best_f2 = 0
best_threshold = None
for t in thresholds:
    y_pred = (recon_val_all >= t).int()
    f2 = fbeta_score(y_val_true.numpy(), y_pred.numpy(), beta=2, pos_label=1)
    if f2 > best_f2:
        best_f2 = f2
        best_threshold = t

# 테스트 데이터 평가
recon_test_normal = reconstruction_error(X_test_normal, model)
recon_test_abnormal = reconstruction_error(X_test_abnormal, model)
recon_test_all = torch.cat([recon_test_normal, recon_test_abnormal])
y_test_true = torch.cat([
    torch.zeros_like(recon_test_normal),
    torch.ones_like(recon_test_abnormal)
])
y_test_pred = (recon_test_all >= best_threshold).int()

# 평가 지표 계산 (비정상 class=1을 기준으로)
cm = confusion_matrix(y_test_true, y_test_pred, labels=[0, 1])
precision = precision_score(y_test_true, y_test_pred, pos_label=1)
recall = recall_score(y_test_true, y_test_pred, pos_label=1)
f1 = f1_score(y_test_true, y_test_pred, pos_label=1)
accuracy = accuracy_score(y_test_true, y_test_pred)

# 결과 출력
print("📌 Confusion Matrix (기준: 실제=행, 예측=열, class=1 비정상 기준)")
print(cm)
print(f"Precision: {precision:.4f}")
print(f"Recall:    {recall:.4f}")
print(f"F2 Score:  {fbeta_score(y_test_true.numpy(), y_test_pred.numpy(), beta=2):.4f}")
print(f"Accuracy:  {accuracy:.4f}")
print(f"Best Threshold by F2: {best_threshold:.6f}")

Early stopping at epoch 23696
📌 Confusion Matrix (기준: 실제=행, 예측=열, class=1 비정상 기준)
[[64 36]
 [ 4 96]]
Precision: 0.7273
Recall:    0.9600
F2 Score:  0.9023
Accuracy:  0.8000
Best Threshold by F2: 87.064911


# 7/2 피드백

In [41]:
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, precision_score, recall_score, f1_score, accuracy_score, fbeta_score
import pandas as pd
import numpy as np

# 시드 고정 함수
def set_seed(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    np.random.seed(seed)
    import random
    random.seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

# AutoEncoder 정의
class AutoEncoder(nn.Module):
    def __init__(self, input_dim=18, latent_dim=4, dropout_rate=0.3):
        super(AutoEncoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, latent_dim),
            nn.ReLU(),
            nn.Dropout(dropout_rate)
        )
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, input_dim)
        )

    def forward(self, x):
        z = self.encoder(x)
        return self.decoder(z)

# 재구성 오차 계산
def reconstruction_error(x, model, batch_size=128):
    model.eval()
    errors = []
    with torch.no_grad():
        for i in range(0, len(x), batch_size):
            x_batch = x[i:i+batch_size]
            recon = model(x_batch)
            err = torch.mean((x_batch - recon) ** 2, dim=1)
            errors.append(err)
    return torch.cat(errors)

# 결과 저장 리스트
all_results = []

# 실험 조합 반복
for latent_dim in [4, 7]:
    for dropout_rate in [0.2, 0.3, 0.4, 0.5]:
        precisions, recalls, f1s, f2s, accs = [], [], [], [], []
        print(f"\n=== latent_dim={latent_dim}, dropout={dropout_rate} ===")
        for seed in range(42, 47):
            set_seed(seed)

            # 데이터 분할
            normal_df = final_df[final_df['class1'] == 0].drop(columns='class1')
            abnormal_df = final_df[final_df['class1'] == 1].drop(columns='class1')

            train_normal, remaining_normal = train_test_split(normal_df, train_size=400, random_state=seed)
            val_normal = remaining_normal.sample(n=100, random_state=seed)
            test_normal = remaining_normal.drop(val_normal.index).sample(n=100, random_state=seed)

            val_abnormal = abnormal_df.sample(n=100, random_state=seed)
            test_abnormal = abnormal_df.drop(val_abnormal.index).sample(n=100, random_state=seed)

            # 텐서 변환
            X_train = torch.tensor(train_normal.values, dtype=torch.float32)
            X_val_normal = torch.tensor(val_normal.values, dtype=torch.float32)
            X_val_abnormal = torch.tensor(val_abnormal.values, dtype=torch.float32)
            X_test_normal = torch.tensor(test_normal.values, dtype=torch.float32)
            X_test_abnormal = torch.tensor(test_abnormal.values, dtype=torch.float32)

            # 모델 정의
            model = AutoEncoder(input_dim=18, latent_dim=latent_dim, dropout_rate=dropout_rate)
            criterion = nn.MSELoss()
            optimizer = optim.Adam(model.parameters(), lr=1e-3)

            best_loss = float('inf')
            patience = 10000
            trigger = 0

            # 학습
            for epoch in range(100000):
                model.train()
                optimizer.zero_grad()
                output = model(X_train)
                loss = criterion(output, X_train)
                loss.backward()
                optimizer.step()

                # 검증
                model.eval()
                with torch.no_grad():
                    val_output = model(X_val_normal)
                    val_loss = criterion(val_output, X_val_normal)

                if val_loss < best_loss:
                    best_loss = val_loss
                    best_model_state = model.state_dict()
                    trigger = 0
                else:
                    trigger += 1
                    if trigger >= patience:
                        break

            model.load_state_dict(best_model_state)

            # threshold 탐색
            recon_val_normal = reconstruction_error(X_val_normal, model)
            recon_val_abnormal = reconstruction_error(X_val_abnormal, model)
            recon_val_all = torch.cat([recon_val_normal, recon_val_abnormal])
            y_val_true = torch.cat([
                torch.zeros_like(recon_val_normal),
                torch.ones_like(recon_val_abnormal)
            ])
            thresholds = torch.linspace(recon_val_all.min(), recon_val_all.max(), steps=200)
            best_f2, best_th = 0, None
            for t in thresholds:
                y_pred = (recon_val_all >= t).int()
                f2 = fbeta_score(y_val_true.numpy(), y_pred.numpy(), beta=2, pos_label=1)
                if f2 > best_f2:
                    best_f2, best_th = f2, t

            # 테스트
            recon_test_normal = reconstruction_error(X_test_normal, model)
            recon_test_abnormal = reconstruction_error(X_test_abnormal, model)
            recon_test_all = torch.cat([recon_test_normal, recon_test_abnormal])
            y_test_true = torch.cat([
                torch.zeros_like(recon_test_normal),
                torch.ones_like(recon_test_abnormal)
            ])
            y_test_pred = (recon_test_all >= best_th).int()

            cm = confusion_matrix(y_test_true, y_test_pred, labels=[0, 1])
            precision = precision_score(y_test_true, y_test_pred, pos_label=1)
            recall = recall_score(y_test_true, y_test_pred, pos_label=1)
            f1 = f1_score(y_test_true, y_test_pred, pos_label=1)
            acc = accuracy_score(y_test_true, y_test_pred)
            f2 = fbeta_score(y_test_true, y_test_pred, beta=2, pos_label=1)

            print(f"\n[Seed {seed}] Confusion Matrix\n{cm}")

            precisions.append(precision)
            recalls.append(recall)
            f1s.append(f1)
            f2s.append(f2)
            accs.append(acc)

        # 평균 저장
        all_results.append({
            'latent_dim': latent_dim,
            'dropout': dropout_rate,
            'precision': np.mean(precisions),
            'recall': np.mean(recalls),
            'f1': np.mean(f1s),
            'f2': np.mean(f2s),
            'accuracy': np.mean(accs)
        })

# 최종 결과 출력
df_results = pd.DataFrame(all_results)
print("\n📊 실험 결과 요약:")
print(df_results)


=== latent_dim=4, dropout=0.2 ===

[Seed 42] Confusion Matrix
[[98  2]
 [ 8 92]]

[Seed 43] Confusion Matrix
[[97  3]
 [12 88]]

[Seed 44] Confusion Matrix
[[96  4]
 [ 8 92]]

[Seed 45] Confusion Matrix
[[96  4]
 [11 89]]

[Seed 46] Confusion Matrix
[[89 11]
 [13 87]]

=== latent_dim=4, dropout=0.3 ===

[Seed 42] Confusion Matrix
[[98  2]
 [ 8 92]]

[Seed 43] Confusion Matrix
[[97  3]
 [14 86]]

[Seed 44] Confusion Matrix
[[96  4]
 [ 8 92]]

[Seed 45] Confusion Matrix
[[96  4]
 [15 85]]

[Seed 46] Confusion Matrix
[[95  5]
 [17 83]]

=== latent_dim=4, dropout=0.4 ===

[Seed 42] Confusion Matrix
[[98  2]
 [ 9 91]]

[Seed 43] Confusion Matrix
[[97  3]
 [16 84]]

[Seed 44] Confusion Matrix
[[96  4]
 [ 8 92]]

[Seed 45] Confusion Matrix
[[97  3]
 [15 85]]

[Seed 46] Confusion Matrix
[[96  4]
 [19 81]]

=== latent_dim=4, dropout=0.5 ===

[Seed 42] Confusion Matrix
[[100   0]
 [ 10  90]]

[Seed 43] Confusion Matrix
[[92  8]
 [16 84]]

[Seed 44] Confusion Matrix
[[96  4]
 [ 9 91]]

[Seed 45]

In [42]:
# 테스트 재구성 오차 계산
recon_test_normal = reconstruction_error(X_test_normal, model)
recon_test_abnormal = reconstruction_error(X_test_abnormal, model)

# 실제 라벨
y_test_true = torch.cat([
    torch.zeros_like(recon_test_normal),
    torch.ones_like(recon_test_abnormal)
])

# 전체 테스트 재구성오차
recon_test_all = torch.cat([recon_test_normal, recon_test_abnormal])
y_test_pred = (recon_test_all >= best_th).int()

# 전체 테스트 데이터프레임 구성
test_all_df = pd.concat([test_normal, test_abnormal], axis=0).reset_index(drop=True)
test_all_df['recon_error'] = recon_test_all.cpu().numpy()
test_all_df['true_label'] = y_test_true.cpu().numpy()
test_all_df['pred_label'] = y_test_pred.cpu().numpy()

# ✅ 이상치로 예측된 케이스만 보기
anomalies = test_all_df[test_all_df['pred_label'] == 1]
print(f"\n예측된 이상치 수: {len(anomalies)}")
display(anomalies)


예측된 이상치 수: 83


Unnamed: 0,IUFL_mean,EUFL_mean,IUFR_mean,EUFR_mean,IURL_mean,EURL_mean,IURR_mean,EURR_mean,Temperature_mean,IUFL_range,...,IUFR_range,EUFR_range,IURL_range,EURL_range,IURR_range,EURR_range,Temperature_range,recon_error,true_label,pred_label
44,235.666667,267.000000,236.000000,264.500000,234.166667,265.000000,124.500000,263.666667,43.000000,10,...,9,5,10,8,8,7,0,3138.750977,0.0,1
46,263.666667,297.666667,263.666667,295.166667,262.000000,297.666667,263.333333,297.333333,43.000000,9,...,6,6,6,9,9,10,0,3697.348633,0.0,1
101,235.500000,276.833333,306.000000,356.000000,235.333333,276.000000,237.666667,277.333333,50.000000,10,...,9,8,10,8,7,8,0,4968.123047,1.0,1
103,154.000000,186.833333,305.833333,358.000000,153.833333,185.333333,154.500000,185.333333,49.000000,7,...,10,9,10,5,8,7,0,4575.005859,1.0,1
104,237.333333,276.000000,307.666667,356.166667,234.500000,275.833333,238.000000,279.000000,49.833333,10,...,10,7,6,7,10,10,1,4994.061523,1.0,1
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
194,309.166667,360.000000,240.666667,282.833333,239.500000,281.833333,241.166667,279.833333,43.000000,7,...,10,9,10,9,10,6,0,4238.732422,1.0,1
195,305.833333,357.333333,260.333333,283.166667,260.500000,281.166667,258.500000,282.000000,55.500000,10,...,8,9,9,9,10,10,2,5531.837402,1.0,1
196,302.666667,356.833333,236.500000,277.166667,236.500000,275.833333,234.500000,276.333333,56.000000,6,...,8,10,10,8,3,9,0,5160.001953,1.0,1
198,239.000000,276.000000,240.333333,278.833333,308.666667,356.666667,237.666667,277.166667,43.000000,10,...,10,8,9,10,10,10,0,3926.436523,1.0,1


In [35]:
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, precision_score, recall_score, f1_score, accuracy_score, fbeta_score
import pandas as pd
import numpy as np

# 시드 고정 함수
def set_seed(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    np.random.seed(seed)
    import random
    random.seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

# AutoEncoder 정의
class AutoEncoder(nn.Module):
    def __init__(self, input_dim=18, latent_dim=4, dropout_rate=0.3):
        super(AutoEncoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, latent_dim),
            nn.ReLU(),
            nn.Dropout(dropout_rate)
        )
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, input_dim)
        )

    def forward(self, x):
        z = self.encoder(x)
        return self.decoder(z)

# 재구성 오차 계산
def reconstruction_error(x, model, batch_size=128):
    model.eval()
    errors = []
    with torch.no_grad():
        for i in range(0, len(x), batch_size):
            x_batch = x[i:i+batch_size]
            recon = model(x_batch)
            err = torch.mean((x_batch - recon) ** 2, dim=1)
            errors.append(err)
    return torch.cat(errors)

# 결과 저장 리스트
all_results = []

# 실험 조합 반복
for latent_dim in [4, 7]:
    for dropout_rate in [0.2, 0.3, 0.4, 0.5]:
        precisions, recalls, f1s, f2s, accs = [], [], [], [], []
        print(f"\n=== latent_dim={latent_dim}, dropout={dropout_rate} ===")
        for seed in range(42, 47):
            set_seed(seed)

            # 데이터 분할
            normal_df = final_df[final_df['class1'] == 0].drop(columns='class1')
            abnormal_df = final_df[final_df['class1'] == 1].drop(columns='class1')

            train_normal, remaining_normal = train_test_split(normal_df, train_size=400, random_state=seed)
            val_normal = remaining_normal.sample(n=100, random_state=seed)
            test_normal = remaining_normal.drop(val_normal.index).sample(n=100, random_state=seed)

            val_abnormal = abnormal_df.sample(n=100, random_state=seed)
            test_abnormal = abnormal_df.drop(val_abnormal.index).sample(n=100, random_state=seed)

            # 텐서 변환
            X_train = torch.tensor(train_normal.values, dtype=torch.float32)
            X_val_normal = torch.tensor(val_normal.values, dtype=torch.float32)
            X_val_abnormal = torch.tensor(val_abnormal.values, dtype=torch.float32)
            X_test_normal = torch.tensor(test_normal.values, dtype=torch.float32)
            X_test_abnormal = torch.tensor(test_abnormal.values, dtype=torch.float32)

            # 모델 정의
            model = AutoEncoder(input_dim=18, latent_dim=latent_dim, dropout_rate=dropout_rate)
            criterion = nn.MSELoss()
            optimizer = optim.Adam(model.parameters(), lr=0.005)

            best_loss = float('inf')
            patience = 10000
            trigger = 0

            # 학습
            for epoch in range(100000):
                model.train()
                optimizer.zero_grad()
                output = model(X_train)
                loss = criterion(output, X_train)
                loss.backward()
                optimizer.step()

                # 검증
                model.eval()
                with torch.no_grad():
                    val_output = model(X_val_normal)
                    val_loss = criterion(val_output, X_val_normal)

                if val_loss < best_loss:
                    best_loss = val_loss
                    best_model_state = model.state_dict()
                    trigger = 0
                else:
                    trigger += 1
                    if trigger >= patience:
                        break

            model.load_state_dict(best_model_state)

            # threshold 탐색
            recon_val_normal = reconstruction_error(X_val_normal, model)
            recon_val_abnormal = reconstruction_error(X_val_abnormal, model)
            recon_val_all = torch.cat([recon_val_normal, recon_val_abnormal])
            y_val_true = torch.cat([
                torch.zeros_like(recon_val_normal),
                torch.ones_like(recon_val_abnormal)
            ])
            thresholds = torch.linspace(recon_val_all.min(), recon_val_all.max(), steps=200)
            best_f2, best_th = 0, None
            for t in thresholds:
                y_pred = (recon_val_all >= t).int()
                f2 = fbeta_score(y_val_true.numpy(), y_pred.numpy(), beta=2, pos_label=1)
                if f2 > best_f2:
                    best_f2, best_th = f2, t

            # 테스트
            recon_test_normal = reconstruction_error(X_test_normal, model)
            recon_test_abnormal = reconstruction_error(X_test_abnormal, model)
            recon_test_all = torch.cat([recon_test_normal, recon_test_abnormal])
            y_test_true = torch.cat([
                torch.zeros_like(recon_test_normal),
                torch.ones_like(recon_test_abnormal)
            ])
            y_test_pred = (recon_test_all >= best_th).int()

            cm = confusion_matrix(y_test_true, y_test_pred, labels=[0, 1])
            precision = precision_score(y_test_true, y_test_pred, pos_label=1)
            recall = recall_score(y_test_true, y_test_pred, pos_label=1)
            f1 = f1_score(y_test_true, y_test_pred, pos_label=1)
            acc = accuracy_score(y_test_true, y_test_pred)
            f2 = fbeta_score(y_test_true, y_test_pred, beta=2, pos_label=1)

            print(f"\n[Seed {seed}] Confusion Matrix\n{cm}")

            precisions.append(precision)
            recalls.append(recall)
            f1s.append(f1)
            f2s.append(f2)
            accs.append(acc)

        # 평균 저장
        all_results.append({
            'latent_dim': latent_dim,
            'dropout': dropout_rate,
            'precision': np.mean(precisions),
            'recall': np.mean(recalls),
            'f1': np.mean(f1s),
            'f2': np.mean(f2s),
            'accuracy': np.mean(accs)
        })

# 최종 결과 출력
df_results = pd.DataFrame(all_results)
print("\n📊 실험 결과 요약:")
print(df_results)


=== latent_dim=4, dropout=0.2 ===

[Seed 42] Confusion Matrix
[[98  2]
 [ 4 96]]

[Seed 43] Confusion Matrix
[[69 31]
 [10 90]]

[Seed 44] Confusion Matrix
[[97  3]
 [ 8 92]]

[Seed 45] Confusion Matrix
[[71 29]
 [ 8 92]]

[Seed 46] Confusion Matrix
[[  1  99]
 [  0 100]]

=== latent_dim=4, dropout=0.3 ===

[Seed 42] Confusion Matrix
[[98  2]
 [ 4 96]]

[Seed 43] Confusion Matrix
[[69 31]
 [12 88]]

[Seed 44] Confusion Matrix
[[90 10]
 [ 7 93]]

[Seed 45] Confusion Matrix
[[72 28]
 [11 89]]

[Seed 46] Confusion Matrix
[[  1  99]
 [  0 100]]

=== latent_dim=4, dropout=0.4 ===

[Seed 42] Confusion Matrix
[[98  2]
 [ 8 92]]

[Seed 43] Confusion Matrix
[[46 54]
 [ 7 93]]

[Seed 44] Confusion Matrix
[[76 24]
 [ 6 94]]

[Seed 45] Confusion Matrix
[[49 51]
 [ 5 95]]

[Seed 46] Confusion Matrix
[[  1  99]
 [  0 100]]

=== latent_dim=4, dropout=0.5 ===

[Seed 42] Confusion Matrix
[[97  3]
 [14 86]]

[Seed 43] Confusion Matrix
[[42 58]
 [ 7 93]]

[Seed 44] Confusion Matrix
[[76 24]
 [ 5 95]]

[

In [13]:
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, precision_score, recall_score, f1_score, accuracy_score, fbeta_score
import pandas as pd
import numpy as np

# 시드 고정 함수
def set_seed(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    np.random.seed(seed)
    import random
    random.seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

# AutoEncoder 정의
class AutoEncoder(nn.Module):
    def __init__(self, input_dim=18, latent_dim=4, dropout_rate=0.3):
        super(AutoEncoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, latent_dim),
            nn.ReLU(),
            nn.Dropout(dropout_rate)
        )
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, input_dim)
        )

    def forward(self, x):
        z = self.encoder(x)
        return self.decoder(z)

# 재구성 오차 계산
def reconstruction_error(x, model, batch_size=128):
    model.eval()
    errors = []
    with torch.no_grad():
        for i in range(0, len(x), batch_size):
            x_batch = x[i:i+batch_size]
            recon = model(x_batch)
            err = torch.mean((x_batch - recon) ** 2, dim=1)
            errors.append(err)
    return torch.cat(errors)

# 결과 저장 리스트
all_results = []

# 실험 조합 반복
for latent_dim in [4, 7]:
    for dropout_rate in [0.2, 0.3, 0.4, 0.5]:
        precisions, recalls, f1s, f2s, accs = [], [], [], [], []
        print(f"\n=== latent_dim={latent_dim}, dropout={dropout_rate} ===")
        for seed in range(42, 47):
            set_seed(seed)

            # 데이터 분할
            normal_df = final_df[final_df['class1'] == 0].drop(columns='class1')
            abnormal_df = final_df[final_df['class1'] == 1].drop(columns='class1')

            train_normal, remaining_normal = train_test_split(normal_df, train_size=400, random_state=seed)
            val_normal = remaining_normal.sample(n=100, random_state=seed)
            test_normal = remaining_normal.drop(val_normal.index).sample(n=100, random_state=seed)

            val_abnormal = abnormal_df.sample(n=100, random_state=seed)
            test_abnormal = abnormal_df.drop(val_abnormal.index).sample(n=100, random_state=seed)

            # 텐서 변환
            X_train = torch.tensor(train_normal.values, dtype=torch.float32)
            X_val_normal = torch.tensor(val_normal.values, dtype=torch.float32)
            X_val_abnormal = torch.tensor(val_abnormal.values, dtype=torch.float32)
            X_test_normal = torch.tensor(test_normal.values, dtype=torch.float32)
            X_test_abnormal = torch.tensor(test_abnormal.values, dtype=torch.float32)

            # 모델 정의
            model = AutoEncoder(input_dim=18, latent_dim=latent_dim, dropout_rate=dropout_rate)
            criterion = nn.MSELoss()
            optimizer = optim.Adam(model.parameters(), lr=1e-4)

            best_loss = float('inf')
            patience = 10000
        
            trigger = 0

            # 학습
            for epoch in range(100000):
                model.train()
                optimizer.zero_grad()
                output = model(X_train)
                loss = criterion(output, X_train)
                loss.backward()
                optimizer.step()

                # 검증
                model.eval()
                with torch.no_grad():
                    val_output = model(X_val_normal)
                    val_loss = criterion(val_output, X_val_normal)

                if val_loss < best_loss:
                    best_loss = val_loss
                    best_model_state = model.state_dict()
                    trigger = 0
                else:
                    trigger += 1
                    if trigger >= patience:
                        break

            model.load_state_dict(best_model_state)

            # threshold 탐색
            recon_val_normal = reconstruction_error(X_val_normal, model)
            recon_val_abnormal = reconstruction_error(X_val_abnormal, model)
            recon_val_all = torch.cat([recon_val_normal, recon_val_abnormal])
            y_val_true = torch.cat([
                torch.zeros_like(recon_val_normal),
                torch.ones_like(recon_val_abnormal)
            ])
            thresholds = torch.linspace(recon_val_all.min(), recon_val_all.max(), steps=200)
            best_f2, best_th = 0, None
            for t in thresholds:
                y_pred = (recon_val_all >= t).int()
                f2 = fbeta_score(y_val_true.numpy(), y_pred.numpy(), beta=2, pos_label=1)
                if f2 > best_f2:
                    best_f2, best_th = f2, t

            # 테스트
            recon_test_normal = reconstruction_error(X_test_normal, model)
            recon_test_abnormal = reconstruction_error(X_test_abnormal, model)
            recon_test_all = torch.cat([recon_test_normal, recon_test_abnormal])
            y_test_true = torch.cat([
                torch.zeros_like(recon_test_normal),
                torch.ones_like(recon_test_abnormal)
            ])
            y_test_pred = (recon_test_all >= best_th).int()

            cm = confusion_matrix(y_test_true, y_test_pred, labels=[0, 1])
            precision = precision_score(y_test_true, y_test_pred, pos_label=1)
            recall = recall_score(y_test_true, y_test_pred, pos_label=1)
            f1 = f1_score(y_test_true, y_test_pred, pos_label=1)
            acc = accuracy_score(y_test_true, y_test_pred)
            f2 = fbeta_score(y_test_true, y_test_pred, beta=2, pos_label=1)

            print(f"\n[Seed {seed}] Confusion Matrix\n{cm}")

            precisions.append(precision)
            recalls.append(recall)
            f1s.append(f1)
            f2s.append(f2)
            accs.append(acc)

        # 평균 저장
        all_results.append({
            'latent_dim': latent_dim,
            'dropout': dropout_rate,
            'precision': np.mean(precisions),
            'recall': np.mean(recalls),
            'f1': np.mean(f1s),
            'f2': np.mean(f2s),
            'accuracy': np.mean(accs)
        })

# 최종 결과 출력
df_results = pd.DataFrame(all_results)
print("\n📊 실험 결과 요약:")
print(df_results)


=== latent_dim=4, dropout=0.2 ===

[Seed 42] Confusion Matrix
[[96  4]
 [ 7 93]]

[Seed 43] Confusion Matrix
[[94  6]
 [13 87]]

[Seed 44] Confusion Matrix
[[99  1]
 [ 8 92]]

[Seed 45] Confusion Matrix
[[92  8]
 [10 90]]

[Seed 46] Confusion Matrix
[[94  6]
 [17 83]]

=== latent_dim=4, dropout=0.3 ===

[Seed 42] Confusion Matrix
[[98  2]
 [11 89]]

[Seed 43] Confusion Matrix
[[94  6]
 [15 85]]

[Seed 44] Confusion Matrix
[[98  2]
 [ 8 92]]

[Seed 45] Confusion Matrix
[[93  7]
 [14 86]]

[Seed 46] Confusion Matrix
[[95  5]
 [18 82]]

=== latent_dim=4, dropout=0.4 ===

[Seed 42] Confusion Matrix
[[96  4]
 [ 7 93]]

[Seed 43] Confusion Matrix
[[94  6]
 [15 85]]

[Seed 44] Confusion Matrix
[[97  3]
 [10 90]]

[Seed 45] Confusion Matrix
[[91  9]
 [13 87]]

[Seed 46] Confusion Matrix
[[91  9]
 [18 82]]

=== latent_dim=4, dropout=0.5 ===

[Seed 42] Confusion Matrix
[[97  3]
 [ 7 93]]

[Seed 43] Confusion Matrix
[[43 57]
 [ 6 94]]

[Seed 44] Confusion Matrix
[[98  2]
 [11 89]]

[Seed 45] Con

In [14]:
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, precision_score, recall_score, f1_score, accuracy_score, fbeta_score
import pandas as pd
import numpy as np

# 시드 고정 함수
def set_seed(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    np.random.seed(seed)
    import random
    random.seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

# AutoEncoder 정의
class AutoEncoder(nn.Module):
    def __init__(self, input_dim=18, latent_dim=4, dropout_rate=0.3):
        super(AutoEncoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, latent_dim),
            nn.ReLU(),
            nn.Dropout(dropout_rate)
        )
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, input_dim)
        )

    def forward(self, x):
        z = self.encoder(x)
        return self.decoder(z)

# 재구성 오차 계산
def reconstruction_error(x, model, batch_size=128):
    model.eval()
    errors = []
    with torch.no_grad():
        for i in range(0, len(x), batch_size):
            x_batch = x[i:i+batch_size]
            recon = model(x_batch)
            err = torch.mean((x_batch - recon) ** 2, dim=1)
            errors.append(err)
    return torch.cat(errors)

# 결과 저장 리스트
all_results = []

# 실험 조합 반복
for latent_dim in [4, 7]:
    for dropout_rate in [0.2, 0.3, 0.4, 0.5]:
        precisions, recalls, f1s, f2s, accs = [], [], [], [], []
        print(f"\n=== latent_dim={latent_dim}, dropout={dropout_rate} ===")
        for seed in range(42, 47):
            set_seed(seed)

            # 데이터 분할
            normal_df = final_df[final_df['class1'] == 0].drop(columns='class1')
            abnormal_df = final_df[final_df['class1'] == 1].drop(columns='class1')

            train_normal, remaining_normal = train_test_split(normal_df, train_size=400, random_state=seed)
            val_normal = remaining_normal.sample(n=100, random_state=seed)
            test_normal = remaining_normal.drop(val_normal.index).sample(n=100, random_state=seed)

            val_abnormal = abnormal_df.sample(n=100, random_state=seed)
            test_abnormal = abnormal_df.drop(val_abnormal.index).sample(n=100, random_state=seed)

            # 텐서 변환
            X_train = torch.tensor(train_normal.values, dtype=torch.float32)
            X_val_normal = torch.tensor(val_normal.values, dtype=torch.float32)
            X_val_abnormal = torch.tensor(val_abnormal.values, dtype=torch.float32)
            X_test_normal = torch.tensor(test_normal.values, dtype=torch.float32)
            X_test_abnormal = torch.tensor(test_abnormal.values, dtype=torch.float32)

            # 모델 정의
            model = AutoEncoder(input_dim=18, latent_dim=latent_dim, dropout_rate=dropout_rate)
            criterion = nn.MSELoss()
            optimizer = optim.Adam(model.parameters(), lr=1e-4)

            best_loss = float('inf')
            patience = 10000
        
            trigger = 0

            # 학습
            for epoch in range(100000):
                model.train()
                optimizer.zero_grad()
                output = model(X_train)
                loss = criterion(output, X_train)
                loss.backward()
                optimizer.step()

                # 검증
                model.eval()
                with torch.no_grad():
                    val_output = model(X_val_normal)
                    val_loss = criterion(val_output, X_val_normal)

                if val_loss < best_loss:
                    best_loss = val_loss
                    best_model_state = model.state_dict()
                    trigger = 0
                else:
                    trigger += 1
                    if trigger >= patience:
                        break

            model.load_state_dict(best_model_state)

            # threshold 탐색
            recon_val_normal = reconstruction_error(X_val_normal, model)
            recon_val_abnormal = reconstruction_error(X_val_abnormal, model)
            recon_val_all = torch.cat([recon_val_normal, recon_val_abnormal])
            y_val_true = torch.cat([
                torch.zeros_like(recon_val_normal),
                torch.ones_like(recon_val_abnormal)
            ])
            thresholds = torch.linspace(recon_val_all.min(), recon_val_all.max(), steps=200)
            best_f2, best_th = 0, None
            for t in thresholds:
                y_pred = (recon_val_all >= t).int()
                f2 = fbeta_score(y_val_true.numpy(), y_pred.numpy(), beta=2, pos_label=1)
                if f2 > best_f2:
                    best_f2, best_th = f2, t

            # 테스트
            recon_test_normal = reconstruction_error(X_test_normal, model)
            recon_test_abnormal = reconstruction_error(X_test_abnormal, model)
            recon_test_all = torch.cat([recon_test_normal, recon_test_abnormal])
            y_test_true = torch.cat([
                torch.zeros_like(recon_test_normal),
                torch.ones_like(recon_test_abnormal)
            ])
            y_test_pred = (recon_test_all >= best_th).int()

            cm = confusion_matrix(y_test_true, y_test_pred, labels=[0, 1])
            precision = precision_score(y_test_true, y_test_pred, pos_label=1)
            recall = recall_score(y_test_true, y_test_pred, pos_label=1)
            f1 = f1_score(y_test_true, y_test_pred, pos_label=1)
            acc = accuracy_score(y_test_true, y_test_pred)
            f2 = fbeta_score(y_test_true, y_test_pred, beta=2, pos_label=1)

            print(f"\n[Seed {seed}] Confusion Matrix\n{cm}")

            precisions.append(precision)
            recalls.append(recall)
            f1s.append(f1)
            f2s.append(f2)
            accs.append(acc)

        # 평균 저장
        all_results.append({
            'latent_dim': latent_dim,
            'dropout': dropout_rate,
            'precision': np.mean(precisions),
            'recall': np.mean(recalls),
            'f1': np.mean(f1s),
            'f2': np.mean(f2s),
            'accuracy': np.mean(accs)
        })

# 최종 결과 출력
df_results = pd.DataFrame(all_results)
print("\n📊 실험 결과 요약:")
print(df_results)


=== latent_dim=4, dropout=0.2 ===

[Seed 42] Confusion Matrix
[[94  6]
 [ 4 96]]

[Seed 43] Confusion Matrix
[[45 55]
 [ 8 92]]

[Seed 44] Confusion Matrix
[[38 62]
 [ 3 97]]

[Seed 45] Confusion Matrix
[[92  8]
 [10 90]]

[Seed 46] Confusion Matrix
[[94  6]
 [17 83]]

=== latent_dim=4, dropout=0.3 ===

[Seed 42] Confusion Matrix
[[94  6]
 [ 7 93]]

[Seed 43] Confusion Matrix
[[39 61]
 [ 5 95]]

[Seed 44] Confusion Matrix
[[46 54]
 [ 3 97]]

[Seed 45] Confusion Matrix
[[93  7]
 [14 86]]

[Seed 46] Confusion Matrix
[[95  5]
 [18 82]]

=== latent_dim=4, dropout=0.4 ===

[Seed 42] Confusion Matrix
[[95  5]
 [ 7 93]]

[Seed 43] Confusion Matrix
[[45 55]
 [ 7 93]]

[Seed 44] Confusion Matrix
[[47 53]
 [ 3 97]]

[Seed 45] Confusion Matrix
[[91  9]
 [13 87]]

[Seed 46] Confusion Matrix
[[91  9]
 [18 82]]

=== latent_dim=4, dropout=0.5 ===

[Seed 42] Confusion Matrix
[[96  4]
 [ 7 93]]

[Seed 43] Confusion Matrix
[[43 57]
 [ 6 94]]

[Seed 44] Confusion Matrix
[[47 53]
 [ 3 97]]

[Seed 45] Con

# 9개 변수, 스케일링 X

In [53]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split

# 시드 고정
seed = 42
np.random.seed(seed)
torch.manual_seed(seed)

# class1 기준 분리
normal_df = final_df[final_df['class1'] == 0].drop(columns='class1')
abnormal_df = final_df[final_df['class1'] == 1].drop(columns='class1')

# 정상 데이터에서 학습(400), 임계값 설정용(60), 평가용(200)
train_normal, rest_normal = train_test_split(normal_df, train_size=400, random_state=seed)
val_normal, test_normal = train_test_split(rest_normal, test_size=200, random_state=seed)

# 비정상 평가 데이터
test_abnormal = abnormal_df.sample(n=439, random_state=seed)

# 텐서 변환 함수
def to_tensor(df):
    return torch.tensor(df.values, dtype=torch.float32)

X_train = to_tensor(train_normal)
X_val_normal = to_tensor(val_normal)
X_test_normal = to_tensor(test_normal)
X_test_abnormal = to_tensor(test_abnormal)

# AutoEncoder 정의
class AutoEncoder(nn.Module):
    def __init__(self, input_dim, latent_dim=5):
        super(AutoEncoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, latent_dim),
            nn.ReLU()
        )
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, input_dim)
        )

    def forward(self, x):
        z = self.encoder(x)
        out = self.decoder(z)
        return out

# 모델 설정
input_dim = X_train.shape[1]
model = AutoEncoder(input_dim=input_dim)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

# 학습
n_epochs = 10000
for epoch in range(n_epochs):
    model.train()
    optimizer.zero_grad()
    output = model(X_train)
    loss = criterion(output, X_train)
    loss.backward()
    optimizer.step()

# 재구성 오차 계산 (배치처리)
def reconstruction_error(x, model, batch_size=128):
    model.eval()
    errors = []
    with torch.no_grad():
        for i in range(0, len(x), batch_size):
            x_batch = x[i:i+batch_size]
            recon = model(x_batch)
            err = torch.mean((x_batch - recon) ** 2, dim=1)
            errors.append(err)
    return torch.cat(errors)

# (1) validation normal → threshold 설정
recon_val = reconstruction_error(X_val_normal, model)
threshold = torch.quantile(recon_val, 0.99)

# (2) test 정상 평가
recon_test_normal = reconstruction_error(X_test_normal, model)
pred_test_normal = (recon_test_normal < threshold).int()
acc_test_normal = pred_test_normal.sum().item() / len(pred_test_normal)

# (3) test 비정상 평가
recon_test_abnormal = reconstruction_error(X_test_abnormal, model)
pred_test_abnormal = (recon_test_abnormal >= threshold).int()
acc_test_abnormal = pred_test_abnormal.sum().item() / len(pred_test_abnormal)

# 결과 출력
print(f"✅ 정상 평가 데이터에서 정상으로 분류된 비율: {acc_test_normal:.4f}")
print(f"✅ 비정상 평가 데이터에서 비정상으로 분류된 비율: {acc_test_abnormal:.4f}")

✅ 정상 평가 데이터에서 정상으로 분류된 비율: 0.9650
✅ 비정상 평가 데이터에서 비정상으로 분류된 비율: 0.7654


# 9개 변수 스케일링 O

In [56]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler

# 시드 고정
seed = 42
np.random.seed(seed)
torch.manual_seed(seed)

# class1 기준 분리
normal_df = final_df[final_df['class1'] == 0].drop(columns='class1')
abnormal_df = final_df[final_df['class1'] == 1].drop(columns='class1')

# 정상 데이터에서 학습(400), 임계값 설정용(60), 평가용(200)
train_normal, rest_normal = train_test_split(normal_df, train_size=400, random_state=seed)
val_normal, test_normal = train_test_split(rest_normal, test_size=200, random_state=seed)

# 비정상 평가 데이터
test_abnormal = abnormal_df.sample(n=439, random_state=seed)

# 🔹 MinMaxScaler 적용 (학습 데이터 기준)
scaler = MinMaxScaler()
X_train_np = scaler.fit_transform(train_normal)
X_val_np = scaler.transform(val_normal)
X_test_normal_np = scaler.transform(test_normal)
X_test_abnormal_np = scaler.transform(test_abnormal)

# 텐서 변환
def to_tensor(arr):
    return torch.tensor(arr, dtype=torch.float32)

X_train = to_tensor(X_train_np)
X_val_normal = to_tensor(X_val_np)
X_test_normal = to_tensor(X_test_normal_np)
X_test_abnormal = to_tensor(X_test_abnormal_np)

# AE 정의
class AutoEncoder(nn.Module):
    def __init__(self, input_dim, latent_dim=3):
        super(AutoEncoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, latent_dim),
            nn.ReLU()
        )
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, input_dim)
        )

    def forward(self, x):
        z = self.encoder(x)
        out = self.decoder(z)
        return out

# 모델 설정
input_dim = X_train.shape[1]
model = AutoEncoder(input_dim=input_dim)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

# 학습
n_epochs = 10000
for epoch in range(n_epochs):
    model.train()
    optimizer.zero_grad()
    output = model(X_train)
    loss = criterion(output, X_train)
    loss.backward()
    optimizer.step()

# 재구성 오차 계산
def reconstruction_error(x, model, batch_size=128):
    model.eval()
    errors = []
    with torch.no_grad():
        for i in range(0, len(x), batch_size):
            x_batch = x[i:i+batch_size]
            recon = model(x_batch)
            err = torch.mean((x_batch - recon) ** 2, dim=1)
            errors.append(err)
    return torch.cat(errors)

# (1) 임계값 설정
recon_val = reconstruction_error(X_val_normal, model)
threshold = torch.quantile(recon_val, 0.99)

# (2) test 정상 평가
recon_test_normal = reconstruction_error(X_test_normal, model)
pred_test_normal = (recon_test_normal < threshold).int()
acc_test_normal = pred_test_normal.sum().item() / len(pred_test_normal)

# (3) test 비정상 평가
recon_test_abnormal = reconstruction_error(X_test_abnormal, model)
pred_test_abnormal = (recon_test_abnormal >= threshold).int()
acc_test_abnormal = pred_test_abnormal.sum().item() / len(pred_test_abnormal)

# 결과 출력
print(f"✅ 정상 평가 데이터에서 정상으로 분류된 비율: {acc_test_normal:.4f}")
print(f"✅ 비정상 평가 데이터에서 비정상으로 분류된 비율: {acc_test_abnormal:.4f}")

✅ 정상 평가 데이터에서 정상으로 분류된 비율: 0.9500
✅ 비정상 평가 데이터에서 비정상으로 분류된 비율: 0.7745


In [51]:
print(X_train.shape); print(X_val_normal.shape); print(X_test_normal.shape); print(X_test_abnormal.shape)

torch.Size([400, 9])
torch.Size([60, 9])
torch.Size([200, 9])
torch.Size([439, 9])


# 8개 변수, 스케일링 O

In [59]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler

# 시드 고정
seed = 42
np.random.seed(seed)
torch.manual_seed(seed)

# 🔹 사용 변수 지정 (Temperature 제외)
feature_cols = ['IUFL', 'EUFL', 'IUFR', 'EUFR', 'IURL', 'EURL', 'IURR', 'EURR']

# class1 기준 분리
normal_df = final_df[final_df['class1'] == 0][feature_cols]
abnormal_df = final_df[final_df['class1'] == 1][feature_cols]

# 정상 데이터 분할
train_normal, rest_normal = train_test_split(normal_df, train_size=400, random_state=seed)
val_normal, test_normal = train_test_split(rest_normal, test_size=200, random_state=seed)

# 비정상 평가 데이터
test_abnormal = abnormal_df.sample(n=439, random_state=seed)

# 🔹 MinMaxScaler 적용 (train 기준)
scaler = MinMaxScaler()
X_train_np = scaler.fit_transform(train_normal)
X_val_np = scaler.transform(val_normal)
X_test_normal_np = scaler.transform(test_normal)
X_test_abnormal_np = scaler.transform(test_abnormal)

# 텐서 변환
def to_tensor(arr):
    return torch.tensor(arr, dtype=torch.float32)

X_train = to_tensor(X_train_np)
X_val_normal = to_tensor(X_val_np)
X_test_normal = to_tensor(X_test_normal_np)
X_test_abnormal = to_tensor(X_test_abnormal_np)

# AE 정의
class AutoEncoder(nn.Module):
    def __init__(self, input_dim, latent_dim=3):
        super(AutoEncoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, latent_dim),
            nn.ReLU()
        )
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, input_dim)
        )

    def forward(self, x):
        z = self.encoder(x)
        out = self.decoder(z)
        return out

# 모델 설정
input_dim = X_train.shape[1]
model = AutoEncoder(input_dim=input_dim)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

# 학습
n_epochs = 10000
for epoch in range(n_epochs):
    model.train()
    optimizer.zero_grad()
    output = model(X_train)
    loss = criterion(output, X_train)
    loss.backward()
    optimizer.step()

# 재구성 오차 계산
def reconstruction_error(x, model, batch_size=128):
    model.eval()
    errors = []
    with torch.no_grad():
        for i in range(0, len(x), batch_size):
            x_batch = x[i:i+batch_size]
            recon = model(x_batch)
            err = torch.mean((x_batch - recon) ** 2, dim=1)
            errors.append(err)
    return torch.cat(errors)

# (1) threshold 설정
recon_val = reconstruction_error(X_val_normal, model)
threshold = torch.quantile(recon_val, 0.99)

# (2) test 정상 평가
recon_test_normal = reconstruction_error(X_test_normal, model)
pred_test_normal = (recon_test_normal < threshold).int()
acc_test_normal = pred_test_normal.sum().item() / len(pred_test_normal)

# (3) test 비정상 평가
recon_test_abnormal = reconstruction_error(X_test_abnormal, model)
pred_test_abnormal = (recon_test_abnormal >= threshold).int()
acc_test_abnormal = pred_test_abnormal.sum().item() / len(pred_test_abnormal)

# 결과 출력
print(f"✅ 정상 평가 데이터에서 정상으로 분류된 비율: {acc_test_normal:.4f}")
print(f"✅ 비정상 평가 데이터에서 비정상으로 분류된 비율: {acc_test_abnormal:.4f}")

✅ 정상 평가 데이터에서 정상으로 분류된 비율: 0.9650
✅ 비정상 평가 데이터에서 비정상으로 분류된 비율: 0.7654
