In [1]:
import tensorflow as tf
import numpy as np
import os
import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
Healthy_train = np.load('/content/drive/MyDrive/DL_Colab/DL_data/Bearing_Healthy_train.npy')
InnerFault_train = np.load('/content/drive/MyDrive/DL_Colab/DL_data/Bearing_InnerFault_train.npy')
OuterFault_train = np.load('/content/drive/MyDrive/DL_Colab/DL_data/Bearing_OuterFault_train.npy')

Healthy_test = np.load('/content/drive/MyDrive/DL_Colab/DL_data/Bearing_Healthy_test.npy')
InnerFault_test = np.load('/content/drive/MyDrive/DL_Colab/DL_data/Bearing_InnerFault_test.npy')
OuterFault_test = np.load('/content/drive/MyDrive/DL_Colab/DL_data/Bearing_OuterFault_test.npy')

In [None]:
# (X, y) 만들기: 3 클래스 결합 클래스 인덱스: Healthy=0, Inner=1, Outer=2
X_train = np.concatenate([Healthy_train, InnerFault_train, OuterFault_train], axis=0)
y_train = np.concatenate([
    np.zeros(len(Healthy_train), dtype=np.int64),
    np.ones(len(InnerFault_train), dtype=np.int64),
    np.full(len(OuterFault_train), 2, dtype=np.int64)
], axis=0)

X_test = np.concatenate([Healthy_test, InnerFault_test, OuterFault_test], axis=0)
y_test = np.concatenate([
    np.zeros(len(Healthy_test), dtype=np.int64),
    np.ones(len(InnerFault_test), dtype=np.int64),
    np.full(len(OuterFault_test), 2, dtype=np.int64)
], axis=0)

print("\nCombined:")
print("X_train:", X_train.shape, "y_train:", y_train.shape)
print("X_test :", X_test.shape,  "y_test :", y_test.shape)

# 3) LSTM 입력 형태 맞추기
# 케이스 A) (N, T)  -> (N, T, 1)
# 케이스 B) (N, T, C) -> 그대로 사용
# 케이스 C) (T,) 단일 신호면 -> (1, T, 1)로 바꾸고 추가 전처리 필요
def ensure_3d(X):
    if X.ndim == 2:
        return X[..., np.newaxis]
    if X.ndim == 3:
        return X
    raise ValueError(f"Unexpected input shape: {X.shape}. Expected 2D or 3D array.")

X_train = ensure_3d(X_train)
X_test  = ensure_3d(X_test)

print("\nAfter ensure_3d:")
print("X_train:", X_train.shape)
print("X_test :", X_test.shape)

# 4) 정규화
X_train = X_train.astype(np.float32)
X_test  = X_test.astype(np.float32)

# feature 차원별 평균/표준편차 (마지막 축=features 기준)
# axis=(0,1)은 batch와 timesteps 전체에 대해 통계
mean = X_train.mean(axis=(0,1), keepdims=True)
std  = X_train.std(axis=(0,1), keepdims=True) + 1e-8

X_train = (X_train - mean) / std
X_test  = (X_test - mean) / std

print("\nNormalization done.")
print("train min/max:", float(X_train.min()), float(X_train.max()))

# 5) Train/Validation 분리 + 셔플
np.random.seed(42)
tf.random.set_seed(42)

# 인덱스 섞기
idx = np.random.permutation(len(X_train))
X_train = X_train[idx]
y_train = y_train[idx]

# 검증셋 분리
val_ratio = 0.2
val_size = int(len(X_train) * val_ratio)

X_val = X_train[:val_size]
y_val = y_train[:val_size]
X_tr  = X_train[val_size:]
y_tr  = y_train[val_size:]

print("\nSplit:")
print("X_tr :", X_tr.shape, "y_tr :", y_tr.shape)
print("X_val:", X_val.shape, "y_val:", y_val.shape)

# tf.data로 배치 구성
batch_size = 64
train_ds = tf.data.Dataset.from_tensor_slices((X_tr, y_tr)).shuffle(2000).batch(batch_size).prefetch(tf.data.AUTOTUNE)
val_ds   = tf.data.Dataset.from_tensor_slices((X_val, y_val)).batch(batch_size).prefetch(tf.data.AUTOTUNE)
test_ds  = tf.data.Dataset.from_tensor_slices((X_test, y_test)).batch(batch_size).prefetch(tf.data.AUTOTUNE)

# LSTM 모델 설계
timesteps = X_train.shape[1]
features  = X_train.shape[2]
num_classes = 3

model = tf.keras.Sequential([
    tf.keras.layers.Input(shape=(timesteps, features)),

    tf.keras.layers.LSTM(64, return_sequences=True),# 시계열 특징 추출(1)
    tf.keras.layers.Dropout(0.3),# 과적합 방지

    tf.keras.layers.LSTM(64),# 시계열 특징 추출(2) (마지막 hidden만)
    tf.keras.layers.Dropout(0.3),

    tf.keras.layers.Dense(64, activation='relu'),# 분류를 위한 dense
    tf.keras.layers.Dropout(0.2),

    tf.keras.layers.Dense(num_classes, activation='softmax')# 3-class 출력
])

model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),# Adam 최적화
    loss='sparse_categorical_crossentropy',# 정수 라벨(0/1/2)이라 sparse 사용
    metrics=['accuracy']
)

model.summary()

# 7) 학습 (EarlyStopping + Best 모델 저장)
callbacks = [
    tf.keras.callbacks.EarlyStopping(
        monitor='val_accuracy', patience=5, restore_best_weights=True
    )
]

history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=30,
    callbacks=callbacks,
    verbose=1
)

# 8) 학습 곡선 시각화
plt.figure()
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Acc')
plt.legend(['train', 'val'])
plt.show()

plt.figure()
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend(['train', 'val'])
plt.show()

# 9) 테스트 평가
test_loss, test_acc = model.evaluate(test_ds, verbose=0)
print(f"\nTest loss: {test_loss:.4f}")
print(f"Test acc : {test_acc:.4f}")

# 테스트 예측
y_prob = model.predict(test_ds)
y_pred = np.argmax(y_prob, axis=1)

# confusion matrix 계산
cm = tf.math.confusion_matrix(y_test, y_pred, num_classes=num_classes).numpy()

print("\nConfusion Matrix (rows=true, cols=pred):")
print(cm)

# 시각화
plt.figure(figsize=(5,4))
plt.imshow(cm)
plt.title("Confusion Matrix")
plt.xlabel("Predicted")
plt.ylabel("True")
plt.colorbar()
plt.xticks([0,1,2], ["Healthy","Inner","Outer"])
plt.yticks([0,1,2], ["Healthy","Inner","Outer"])
for i in range(num_classes):
    for j in range(num_classes):
        plt.text(j, i, cm[i,j], ha='center', va='center')
plt.show()

# 11) 샘플 몇 개 확인 (예측 결과 출력)
class_names = ["Healthy", "InnerFault", "OuterFault"]

print("\nSample predictions:")
for i in range(10):
    print(f"True={class_names[int(y_test[i])]}  Pred={class_names[int(y_pred[i])]}")