In [1]:
from preamble import *

In [2]:
import xgboost as xgb
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
import re
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import IsolationForest
from sklearn.impute import SimpleImputer

In [3]:
# 1. 데이터 로딩 및 전처리
def load_and_preprocess(data_path):
  df = pd.read_csv(data_path)

  def parse_data_array(data_str):
    data_str = re.sub(r'\s+', ' ', data_str).strip()
    # 빈 리스트 "[]" 처리
    if data_str == '[]':
      return None  # None 반환
    try:
      return np.fromstring(data_str[1:-1], sep=' ')
    except ValueError:
      print(f"Warning: Could not parse data array: {data_str}")
      return None  # ValueError 발생 시에도 None 반환

  df['data_array'] = df['data_array'].apply(parse_data_array)
  return df

# 1.2 특징 추출 함수
def extract_features(data_array):
  x = np.array(data_array)
  x = x[:len(x)//2]

  features = []
  features.append(np.max(x) - np.min(x))
  features.append(np.mean(x))
  if len(x) > 1:
    features.append(np.std(x, ddof=1))
  else:
    features.append(0)
  features.append(np.sqrt(np.mean(x**2)))
  if len(x) > 1 and features[3] != 0:
    features.append(np.max(np.abs(x)) / features[3])
  else:
    features.append(0)

  if len(x) > 1 and features[2] != 0:
    features.append(np.mean(((x - features[1]) / features[2])**3))
    features.append(np.mean(((x - features[1]) / features[2])**4))
  else:
    features.append(0)
    features.append(0)
  return features

In [4]:
# 1.3 데이터프레임에 특징 추가
def create_feature_df(df):
  feature_list = []
  for _, row in df.iterrows():
    # data_array가 None이면 특징 추출 건너뜀
    if row['data_array'] is None:
      feature_list.append(None)  # None 추가
    else:
      features = extract_features(row['data_array'])
      feature_list.append(features)

  # None이 아닌 행들만 사용하여 DataFrame 생성
  valid_features = [f for f in feature_list if f is not None]
  if valid_features:
    feature_df = pd.DataFrame(valid_features, columns=[f'feature_{i}' for i in range(7)])
  else:
    feature_df = pd.DataFrame() #빈 데이터 프레임 반환
  
  # 인덱스 리셋 (channel_id 필터링 전에!)
  feature_df = feature_df.reset_index(drop=True)

  channel_features = []
  for channel in df['channel_id'].unique():
    # 해당 channel_id를 가진 행이 feature_df에 있는지 확인
    if channel in df[df['data_array'].notna()]['channel_id'].values: #수정
      channel_df = feature_df.loc[df['channel_id'] == channel].copy() #수정
      channel_df.columns = [f"{col}_{channel}" for col in channel_df.columns]
      channel_features.append(channel_df)
    else:
      # 해당 channel_id에 대한 데이터가 없으면 빈 DataFrame 추가 (또는 건너뛰기)
      channel_features.append(pd.DataFrame()) # 빈 DataFrame

  if channel_features:
    feature_df = pd.concat(channel_features, axis=1)
  else:
    feature_df = pd.DataFrame()
  
  final_df = pd.concat([df.reset_index(drop=True), feature_df], axis=1)
  return final_df

In [5]:
# 2. 이상치 탐지 및 레이블 생성 (핵심 변경 부분)
def detect_outliers_and_label(final_df, contamination=0.05): # contamination 파라미터 추가
  """
  Isolation Forest를 사용하여 이상치를 탐지하고, 'failure' 레이블을 생성합니다.

  Args:
      final_df: 특징 데이터프레임.
      contamination:  데이터셋에서 예상되는 이상치 비율 (기본값: 0.05).

  Returns:
      'failure' 열이 추가된 데이터프레임.
  """

  failure_labels = []
  for channel in final_df['channel_id'].unique():
    channel_data = final_df[final_df['channel_id'] == channel]
    X_channel = channel_data.filter(like=f'feature_')

    if X_channel.empty:
      print(f"Skipping channel {channel} due to empty feature set.")
      continue

    imputer = SimpleImputer(strategy='mean')
    X_channel_imputed = imputer.fit_transform(X_channel)

    scaler = StandardScaler()
    X_channel_scaled = scaler.fit_transform(X_channel_imputed)

    isolation_forest = IsolationForest(n_estimators=100, contamination=contamination, random_state=42, n_jobs=-1)
    isolation_forest.fit(X_channel_scaled)

    outlier_predictions = isolation_forest.predict(X_channel_scaled)
    channel_failure_labels = [1 if pred == -1 else 0 for pred in outlier_predictions]
    failure_labels.extend(channel_failure_labels)

  final_df['failure'] = failure_labels
  return final_df

In [6]:
data_path = './data/pms_data_decompressed.csv'  # 실제 데이터 경로
df = load_and_preprocess(data_path)
final_df = create_feature_df(df)
if final_df.empty:
  print("No data available after feature extraction.")
else:
  # 이상치 탐지 및 레이블 생성
  final_df = detect_outliers_and_label(final_df, contamination=0.05)

  return np.fromstring(data_str[1:-1], sep=' ')
 'feature_4_CHN1' 'feature_5_CHN1' 'feature_6_CHN1' 'feature_0_CHN2'
 'feature_1_CHN2' 'feature_2_CHN2' 'feature_3_CHN2' 'feature_4_CHN2'
 'feature_5_CHN2' 'feature_6_CHN2' 'feature_0_CHN3' 'feature_1_CHN3'
 'feature_2_CHN3' 'feature_3_CHN3' 'feature_4_CHN3' 'feature_5_CHN3'
 'feature_6_CHN3' 'feature_0_CHN4' 'feature_1_CHN4' 'feature_2_CHN4'
 'feature_3_CHN4' 'feature_4_CHN4' 'feature_5_CHN4' 'feature_6_CHN4'
 'feature_0_CHN5' 'feature_1_CHN5' 'feature_2_CHN5' 'feature_3_CHN5'
 'feature_4_CHN5' 'feature_5_CHN5' 'feature_6_CHN5' 'feature_0_CHN6'
 'feature_1_CHN6' 'feature_2_CHN6' 'feature_3_CHN6' 'feature_4_CHN6'
 'feature_5_CHN6' 'feature_6_CHN6' 'feature_0_CHN7' 'feature_1_CHN7'
 'feature_2_CHN7' 'feature_3_CHN7' 'feature_4_CHN7' 'feature_5_CHN7'
 'feature_6_CHN7' 'feature_0_CHN8' 'feature_1_CHN8' 'feature_2_CHN8'
 'feature_3_CHN8' 'feature_4_CHN8' 'feature_5_CHN8' 'feature_6_CHN8'
 'feature_0_CHN9' 'feature_1_CHN9' 'feature_2_CHN9' 'fe

In [7]:
# 3. 데이터 준비, 모델 학습 및 평가 (XGBoost 부분)
def train_and_evaluate_xgboost(final_df):
  """
  데이터를 준비하고, XGBoost 모델을 학습 및 평가합니다.

  Args:
    final_df:  'failure' 레이블이 포함된 최종 데이터프레임.

  Returns:
    학습된 XGBoost 모델,  테스트 정확도.
  """

  # 1. 데이터 분할 (시간순으로, 'failure' 레이블 사용)
  final_df = final_df.sort_values(by='acq_date')
  X = final_df.drop(['motor_id', 'equipment_id', 'center_id', 'channel_id', 'acq_date', 'data_array', 'failure'], axis=1)
  y = final_df['failure']  # 'failure' 레이블 사용
  X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)  # random_state 추가

  # 2. XGBoost 모델 학습 및 튜닝 (GridSearchCV 사용)
  param_grid = {
    'n_estimators': [100, 200, 300],
    'max_depth': [3, 4, 5],
    'learning_rate': [0.01, 0.1, 0.2],
    'subsample': [0.8, 1.0],
    'colsample_bytree': [0.8, 1.0],
    'gamma': [0, 0.1, 0.2],
    'reg_alpha': [0, 0.01, 0.1],
    'reg_lambda': [0, 0.01, 0.1]
  }
  xgb_model = xgb.XGBClassifier(objective='binary:logistic', eval_metric='logloss', use_label_encoder=False, random_state=42)
  grid_search = GridSearchCV(xgb_model, param_grid, cv=5, scoring='accuracy', verbose=3, n_jobs=-1)
  grid_search.fit(X_train, y_train)

  # 최적 모델
  best_model = grid_search.best_estimator_

  # 3. 모델 평가
  y_pred = best_model.predict(X_test)
  accuracy = accuracy_score(y_test, y_pred)
  print("Accuracy:", accuracy)
  print(confusion_matrix(y_test, y_pred))
  print(classification_report(y_test, y_pred))

  # 4. 변수 중요도
  feature_importance = best_model.feature_importances_
  importance_df = pd.DataFrame({'Feature': X.columns, 'Importance': feature_importance})
  importance_df = importance_df.sort_values(by='Importance', ascending=False)
  print(importance_df)

  return best_model, accuracy

In [8]:
# 1. 데이터 분할 (시간순으로, 'failure' 레이블 사용)
final_df = final_df.sort_values(by='acq_date')
X = final_df.drop(['motor_id', 'equipment_id', 'center_id', 'channel_id', 'acq_date', 'data_array', 'failure'], axis=1)
y = final_df['failure']  # 'failure' 레이블 사용
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)  # random_state 추가

In [9]:
# 2. XGBoost 모델 학습 및 튜닝 (GridSearchCV 사용)
param_grid = {
  'n_estimators': [100, 200, 300],
  'max_depth': [3, 4, 5],
  'learning_rate': [0.01, 0.1, 0.2],
  'subsample': [0.8, 1.0],
  'colsample_bytree': [0.8, 1.0],
  'gamma': [0, 0.1, 0.2],
  'reg_alpha': [0, 0.01, 0.1],
  'reg_lambda': [0, 0.01, 0.1]
}
xgb_model = xgb.XGBClassifier(objective='binary:logistic', eval_metric='logloss', use_label_encoder=False, random_state=42)
grid_search = GridSearchCV(xgb_model, param_grid, cv=5, scoring='accuracy', verbose=3, n_jobs=-1)
grid_search.fit(X_train, y_train)

Fitting 5 folds for each of 2916 candidates, totalling 14580 fits


Parameters: { "use_label_encoder" } are not used.



In [10]:
# 최적 모델
best_model = grid_search.best_estimator_

In [11]:
model_filename = './model/motor_xgboost_model.json'
best_model.save_model(model_filename)

In [12]:
# 3. 모델 평가
y_pred = best_model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print("Accuracy:", accuracy)
print(confusion_matrix(y_test, y_pred))
print(classification_report(y_test, y_pred))

Accuracy: 0.9523483812129503
[[16707     3]
 [  833     1]]
              precision    recall  f1-score   support

           0       0.95      1.00      0.98     16710
           1       0.25      0.00      0.00       834

    accuracy                           0.95     17544
   macro avg       0.60      0.50      0.49     17544
weighted avg       0.92      0.95      0.93     17544



In [13]:
# 4. 변수 중요도
feature_importance = best_model.feature_importances_
importance_df = pd.DataFrame({'Feature': X.columns, 'Importance': feature_importance})
importance_df = importance_df.sort_values(by='Importance', ascending=False)
print(importance_df)

             Feature  Importance
235  feature_3_CHO17        0.57
4     feature_3_CHN0        0.30
86   feature_1_CHN12        0.07
233  feature_1_CHO17        0.04
2     feature_1_CHN0        0.02
..               ...         ...
84   feature_6_CHN11        0.00
85   feature_0_CHN12        0.00
87   feature_2_CHN12        0.00
88   feature_3_CHN12        0.00
238  feature_6_CHO17        0.00

[239 rows x 2 columns]
