In [1]:
# ✅ [1] 필수 패키지 임포트
import os
import pandas as pd
import numpy as np
import random
import itertools

import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.ensemble import IsolationForest, RandomForestClassifier
from sklearn.manifold import TSNE
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    classification_report, confusion_matrix
)

from scipy.stats import wasserstein_distance, energy_distance
from scipy.spatial import distance
from sklearn import metrics

from dtaidistance import dtw  # pip install dtaidistance
from tqdm import tqdm
import torch

In [2]:
def dummy_creation(dataset, dummy_categories):
    for i in dummy_categories:
        dataset_dummy = pd.get_dummies(dataset[i])

        dataset=pd.concat([dataset,dataset_dummy],
                          axis=1)
        dataset=dataset.drop(i,axis=1)
    return dataset

## 1. 데이터 불러오기 (Load a Dataset)

Data Source: https://www.kaggle.com/datasets/shasun/tool-wear-detection-in-cnc-mill

In [3]:

# 데이터셋이 저장된 Google Drive 내 폴더 경로 지정
FOLDER_DIR = "./dataset-SMART"

# 개별 실험 데이터를 저장할 리스트 초기화
experiments = []

# 폴더 내 파일들을 순회
for file_name in os.listdir(FOLDER_DIR):
  if file_name.startswith("experiment"):
    df = pd.DataFrame()  # 빈 데이터프레임 생성 (선택사항)

    df = pd.read_csv(f"{FOLDER_DIR}/{file_name}", index_col=None, header=0)

    # 파일명에서 실험 번호 추출하여 'Experiment' 컬럼에 추가 (예: experiment02.csv → 2)
    df['Experiment'] = int(file_name[-6:-4])
    experiments.append(df)

# 개별 실험 데이터를 하나의 데이터프레임으로 병합
df_raw_original = pd.concat(experiments, axis=0, ignore_index=True)

In [4]:
# 메타데이터 CSV 파일 읽기 (조건 정보 포함)
df_meta = pd.read_csv(os.path.join(FOLDER_DIR, "train.csv"))

# tool_condition, clamp_pressure, feedrate 순으로 정렬
df_meta = df_meta.sort_values(by=['tool_condition', 'clamp_pressure', 'feedrate'])

# tool_condition이 'worn'이면 1, 아니면 0으로 라벨링 >>> 1이면 마모됨. 0이면 정상
df_meta['label'] = df_meta['tool_condition'].apply(lambda x: 1 if x == "worn" else 0)

# feedrate와 clamp_pressure를 조합해 WorkingCondition이라는 새로운 컬럼 생성 >>> 이동속도와 클램핑 압력열을 조합한 파생 변수 생성. ex) 3-2.5
df_meta['WorkingCondition'] = df_meta['feedrate'].apply(lambda x: str(x)) + "-" + df_meta['clamp_pressure'].apply(lambda x: str(x))

df_raw = dummy_creation(df_raw_original, ['Machining_Process'])

df_raw['label'] = 0         ## 공구 상태가 정상
df_raw['label'] = df_raw['Experiment'].apply(lambda x: df_meta[df_meta['No'] == x]['label'].values[0])      # 실험번호와 라벨 맞추기

no_wc_dict = dict(zip(df_meta['No'], df_meta['WorkingCondition']))

df_raw['WorkingCondition'] = df_raw['Experiment'].apply(lambda x: no_wc_dict[x])

# 'WorkingCondition'을 '-' 기준으로 나누어 두 개의 새 컬럼 생성
df_raw[['feedrate', 'clamp_pressure']] = df_raw['WorkingCondition'].str.split('-', expand=True)

# 필요한 경우 형변환 (예: 정수 또는 실수형으로)
df_raw['feedrate'] = df_raw['feedrate'].astype(int)
df_raw['clamp_pressure'] = df_raw['clamp_pressure'].astype(float)

In [5]:
col_sensor = ['X1_ActualPosition', 'X1_ActualVelocity', 'X1_ActualAcceleration',
       'X1_CommandPosition', 'X1_CommandVelocity', 'X1_CommandAcceleration',
       'X1_CurrentFeedback', 'X1_DCBusVoltage', 'X1_OutputCurrent',
       'X1_OutputVoltage', 'X1_OutputPower', 'Y1_ActualPosition',
       'Y1_ActualVelocity', 'Y1_ActualAcceleration', 'Y1_CommandPosition',
       'Y1_CommandVelocity', 'Y1_CommandAcceleration', 'Y1_CurrentFeedback',
       'Y1_DCBusVoltage', 'Y1_OutputCurrent', 'Y1_OutputVoltage',
       'Y1_OutputPower', 'Z1_ActualPosition', 'Z1_ActualVelocity',
       'Z1_ActualAcceleration', 'Z1_CommandPosition', 'Z1_CommandVelocity',
       'Z1_CommandAcceleration', 'Z1_CurrentFeedback', 'Z1_DCBusVoltage',
       'Z1_OutputCurrent', 'Z1_OutputVoltage', 'S1_ActualPosition',
       'S1_ActualVelocity', 'S1_ActualAcceleration', 'S1_CommandPosition',
       'S1_CommandVelocity', 'S1_CommandAcceleration', 'S1_CurrentFeedback',
       'S1_DCBusVoltage', 'S1_OutputCurrent', 'S1_OutputVoltage',
       'S1_OutputPower', 'S1_SystemInertia']

col_bn = ['X1_ActualPosition', 'X1_ActualVelocity', 'X1_ActualAcceleration',
       'X1_CommandPosition', 'X1_CommandVelocity', 'X1_CommandAcceleration',
       'X1_CurrentFeedback', 'X1_DCBusVoltage', 'X1_OutputCurrent',
       'X1_OutputVoltage', 'X1_OutputPower', 'Y1_ActualPosition',
       'Y1_ActualVelocity', 'Y1_ActualAcceleration', 'Y1_CommandPosition',
       'Y1_CommandVelocity', 'Y1_CommandAcceleration', 'Y1_CurrentFeedback',
       'Y1_DCBusVoltage', 'Y1_OutputCurrent', 'Y1_OutputVoltage',
       'Y1_OutputPower', 'Z1_ActualPosition', 'Z1_ActualVelocity',
       'Z1_ActualAcceleration', 'Z1_CommandPosition', 'Z1_CommandVelocity',
       'Z1_CommandAcceleration', 'Z1_CurrentFeedback', 'Z1_DCBusVoltage',
       'Z1_OutputCurrent', 'Z1_OutputVoltage', 'S1_ActualPosition',
       'S1_ActualVelocity', 'S1_ActualAcceleration', 'S1_CommandPosition',
       'S1_CommandVelocity', 'S1_CommandAcceleration', 'S1_CurrentFeedback',
       'S1_DCBusVoltage', 'S1_OutputCurrent', 'S1_OutputVoltage',
       'S1_OutputPower', 'S1_SystemInertia','feedrate','clamp_pressure','layer']

col_CNCCode = ['M1_CURRENT_PROGRAM_NUMBER',
       'M1_sequence_number', 'M1_CURRENT_FEEDRATE', 'Experiment', 'End',
       'Layer 1 Down', 'Layer 1 Up', 'Layer 2 Down', 'Layer 2 Up',
       'Layer 3 Down', 'Layer 3 Up', 'Prep', 'Repositioning', 'Starting',
       'end', 'label']

# 절삭 구간만 자르기 -> 공구 마모와 관련된 구간
layer_cols = ['Layer 1 Down', 'Layer 1 Up', 'Layer 2 Down', 'Layer 2 Up', 'Layer 3 Down', 'Layer 3 Up']
df_raw['Layer_Total'] = df_raw[layer_cols].sum(axis=1)  # axis=1로 행 방향 합계
df_layer = df_raw[df_raw['Layer_Total'] > 0].copy() # .copy()를 사용하여 복사본 생성

# Layer to Working Conditions
df_layer.loc[:, 'Layer1'] = df_layer[['Layer 1 Down', 'Layer 1 Up']].sum(axis=1)
df_layer.loc[:, 'Layer2'] = df_layer[['Layer 2 Down', 'Layer 2 Up']].sum(axis=1)
df_layer.loc[:, 'Layer3'] = df_layer[['Layer 3 Down', 'Layer 3 Up']].sum(axis=1)

df_layer.loc[:, 'Layer_Info'] = df_layer[['Layer1', 'Layer2', 'Layer3']].idxmax(axis=1)  # idxmax로 최대값 컬럼명 가져오기
df_layer.loc[:, 'Layer_Info'] = df_layer['Layer_Info'].map({'Layer1': 1, 'Layer2': 2, 'Layer3': 3})  # 컬럼명을 숫자로 매핑

# Scenario = Experiment (Load X Speed X Label) X Layer
df_layer.loc[:, 'SCN'] = df_layer['Experiment'].astype(str) + "-" + df_layer['Layer_Info'].astype(str)  # 문자열로 변환하여 결합

---

## 고장 진단 모델 성능 지표 개발

semi-supervised-learning, base 지표로 확인

In [9]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report
from sklearn.model_selection import train_test_split
import numpy as np
import pandas as pd

# X, y 정의
X = df_layer.drop(columns=['label', 'SCN','WorkingCondition'])  # 필요 없는 컬럼 제거
y = df_layer['label']

# 비율별 split (10%, 50%, 100%)
for ratio in [0.1, 0.5, 1.0]:
    print(f"\n=== Training with {int(ratio*100)}% labeled data ===")

    if ratio < 1.0:
        X_labeled, X_unlabeled, y_labeled, y_unlabeled = train_test_split(
            X, y, train_size=ratio, stratify=y, random_state=42
        )
    else:
        X_labeled, y_labeled = X.copy(), y.copy()
        X_unlabeled, y_unlabeled = pd.DataFrame(columns=X.columns), pd.Series(dtype=y.dtype)

    # 초기 모델 학습
    clf = RandomForestClassifier(n_estimators=100, random_state=42)
    clf.fit(X_labeled, y_labeled)

    # Pseudo-label 예측
    if not X_unlabeled.empty:
        pseudo_labels = clf.predict(X_unlabeled)

        # Labeled + pseudo-labeled 데이터로 재학습
        X_combined = pd.concat([X_labeled, X_unlabeled])
        y_combined = pd.concat([y_labeled, pd.Series(pseudo_labels, index=X_unlabeled.index)])
    else:
        X_combined = X_labeled
        y_combined = y_labeled

    final_model = RandomForestClassifier(n_estimators=100, random_state=42)
    final_model.fit(X_combined, y_combined)

    # 전체 평가
    y_pred = final_model.predict(X)
    print(classification_report(y, y_pred))



=== Training with 10% labeled data ===
              precision    recall  f1-score   support

           0       1.00      0.99      0.99      8042
           1       0.99      1.00      0.99      9478

    accuracy                           0.99     17520
   macro avg       0.99      0.99      0.99     17520
weighted avg       0.99      0.99      0.99     17520


=== Training with 50% labeled data ===
              precision    recall  f1-score   support

           0       1.00      1.00      1.00      8042
           1       1.00      1.00      1.00      9478

    accuracy                           1.00     17520
   macro avg       1.00      1.00      1.00     17520
weighted avg       1.00      1.00      1.00     17520


=== Training with 100% labeled data ===
              precision    recall  f1-score   support

           0       1.00      1.00      1.00      8042
           1       1.00      1.00      1.00      9478

    accuracy                           1.00     17520
   macr

In [10]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
import numpy as np
import pandas as pd

# === 고장 구간 탐지를 위한 유틸리티 ===
def get_event_ranges(y, min_length=1, target_value=1):
    """지정된 target_value의 연속된 구간(start, end index) 반환"""
    events = []
    in_event = False
    start = 0
    for i, val in enumerate(y):
        if val == target_value and not in_event:
            in_event = True
            start = i
        elif val != target_value and in_event:
            end = i
            if end - start >= min_length:
                events.append((start, end))
            in_event = False
    if in_event:
        end = len(y)
        if end - start >= min_length:
            events.append((start, end))
    return events

def temporal_metric(y_true, y_pred, tolerance=0, w_tp=1.0, w_fp=1.0, w_fn=1.0):
    y_true = np.array(y_true)
    y_pred = np.array(y_pred)
    
    true_events = get_event_ranges(y_true)
    pred_events = get_event_ranges(y_pred)

    TP = 0
    matched_pred = set()

    for t_start, t_end in true_events:
        matched = False
        for i, (p_start, p_end) in enumerate(pred_events):
            if p_end >= t_start - tolerance and p_start <= t_end + tolerance:
                if i not in matched_pred:
                    matched_pred.add(i)
                    matched = True
                    break
        if matched:
            TP += 1

    FN = len(true_events) - TP
    FP = len(pred_events) - len(matched_pred)

    custom_score = (w_tp * TP) - (w_fp * FP) - (w_fn * FN)
    
    return {
        'TP': TP,
        'FP': FP,
        'FN': FN,
        'custom_score': custom_score
    }

def temporal_metric_by_scn(y_true, y_pred, scn, tolerance=0, w_tp=1.0, w_fp=0.5, w_fn=2.0):
    """
    y_true, y_pred: np.array 또는 list 형태의 전체 라벨 시퀀스
    scn: 각 인덱스에 대응하는 SCN 정보 (list or np.array)
    tolerance, w_tp, w_fp, w_fn: 기존과 동일
    
    SCN별로 분리하여 temporal_metric 계산 후 합산 반환
    """
    y_true = np.array(y_true)
    y_pred = np.array(y_pred)
    scn = np.array(scn)

    unique_scn = np.unique(scn)
    total_TP, total_FP, total_FN = 0, 0, 0

    for s in unique_scn:
        idx = (scn == s)
        y_true_s = y_true[idx]
        y_pred_s = y_pred[idx]

        # 기존 temporal_metric 함수 재활용
        metrics = temporal_metric(y_true_s, y_pred_s, tolerance, w_tp, w_fp, w_fn)
        
        total_TP += metrics['TP']
        total_FP += metrics['FP']
        total_FN += metrics['FN']

    custom_score = (w_tp * total_TP) - (w_fp * total_FP) - (w_fn * total_FN)

    return {
        'TP': total_TP,
        'FP': total_FP,
        'FN': total_FN,
        'custom_score': custom_score
    }


# === 모델 훈련 및 평가 ===
X = df_layer.drop(columns=['label', 'SCN','WorkingCondition'])  # 필요 없는 컬럼 제거
y = df_layer['label']

for ratio in [0.1, 0.5, 1.0]:
    print(f"\n=== Training with {int(ratio*100)}% labeled data ===")

    if ratio < 1.0:
        X_labeled, X_unlabeled, y_labeled, y_unlabeled = train_test_split(
            X, y, train_size=ratio, stratify=y, random_state=42
        )
    else:
        X_labeled, y_labeled = X.copy(), y.copy()
        X_unlabeled, y_unlabeled = pd.DataFrame(columns=X.columns), pd.Series(dtype=y.dtype)

    clf = RandomForestClassifier(n_estimators=100, random_state=42)
    clf.fit(X_labeled, y_labeled)

    if not X_unlabeled.empty:
        pseudo_labels = clf.predict(X_unlabeled)
        X_combined = pd.concat([X_labeled, X_unlabeled])
        y_combined = pd.concat([y_labeled, pd.Series(pseudo_labels, index=X_unlabeled.index)])
        clf.fit(X_combined, y_combined) 

    final_model = RandomForestClassifier(n_estimators=100, random_state=42)
    final_model.fit(X_combined, y_combined)

    # 예측 및 커스텀 지표 계산
    y_pred = final_model.predict(X)

    metrics = temporal_metric_by_scn(
        y_true=df_layer['label'],
        y_pred=y_pred,
        scn=df_layer['SCN'],
        tolerance=3,
        w_tp=1.0,
        w_fp=0.5,
        w_fn=2.0
    )

    print(f"TP={metrics['TP']}, FP={metrics['FP']}, FN={metrics['FN']}, Custom Score={metrics['custom_score']:.3f}")



=== Training with 10% labeled data ===
TP=27, FP=47, FN=0, Custom Score=3.500

=== Training with 50% labeled data ===
TP=27, FP=6, FN=0, Custom Score=24.000

=== Training with 100% labeled data ===
TP=27, FP=6, FN=0, Custom Score=24.000
