### 비교 실험
- 데이터는 가운데 5번째 1분만 사용
1. STFT + LSTM
2. STFT + LSTM AutoEncoder
3. STFT + Transformer
4. STFT + LSTM AutoEncoder + Attention
5. STFT + Conv LSTM

In [47]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
import torch
import torch.nn.functional as F
from torch_geometric.data import Data
from torch_geometric.nn import GCNConv
from sklearn.model_selection import train_test_split
import scipy.signal
import matplotlib.pyplot as plt
from keras.utils import to_categorical
from keras.models import Sequential, Model
from keras.layers import (
    LSTM,
    Dense,
    Dropout,
    RepeatVector,
    TimeDistributed,
    Input,
    LeakyReLU,
    Layer,
    MultiHeadAttention,
    LayerNormalization,
    Add,
    GlobalAveragePooling1D,
    ConvLSTM2D,
    BatchNormalization,
    Attention,
    Conv3D,
    Reshape,
    TimeDistributed,
    Flatten,
)
from keras.callbacks import EarlyStopping
import tensorflow.keras.backend as K
from sklearn.metrics import classification_report, accuracy_score
from sklearn.neighbors import kneighbors_graph
import os

In [48]:
# 데이터 파일 로드 및 정규화
def normalize_data(df, features):
    scaler = StandardScaler()
    return scaler.fit_transform(df[features])

In [49]:
RPM = 300
base_path = "5000hz_raw_data/" + str(RPM) + "rpm/"
folders = [
    str(RPM) + "rpm " + "normal data",
    str(RPM) + "rpm " + "carriage damage",
    str(RPM) + "rpm " + "high-speed damage",
    str(RPM) + "rpm " + "lack of lubrication",
    str(RPM) + "rpm " + "oxidation and corrosion",
]
columns = ["motor1_x", "motor1_y", "motor1_z", "sound", "time"]

In [50]:
# 데이터를 읽고 결합하는 함수
def read_and_concatenate(folder):
    all_files = []
    count = 4
    for file_name in os.listdir(folder):
        if count != 0:
            count -= 1
            continue

        if file_name.endswith(".csv"):
            file_path = os.path.join(folder, file_name)
            df = pd.read_csv(file_path, usecols=columns)
            all_files.append(df)
            
    combined_df = pd.concat(all_files)
    combined_df.sort_values("time", inplace=True)  # 시간 열 기준 정렬
    return combined_df

In [51]:
concatenated_df = dict()
folder_index = [
    "normal data",
    "carriage damage",
    "high-speed damage",
    "lack of lubrication",
    "oxidation and corrosion",
]
# 각 폴더에서 데이터를 처리
features = ["motor1_x", "motor1_y", "motor1_z", "sound"]
for index, folder_name in enumerate(folders):
    folder_path = os.path.join(base_path, folder_name)
    concatenated_df[folder_index[index]] = read_and_concatenate(folder_path)
    concatenated_df[folder_index[index]] = normalize_data(concatenated_df[folder_index[index]], features) # 정규화
    # time 열 제거
    # concatenated_df[folder_index[index]].drop(columns="time", inplace=True)

In [52]:
# STFT 수행
fs = 5000
def compute_stft(data):
    return np.array(
        [
            np.abs(scipy.signal.stft(d, fs=fs, nperseg=min(d.shape[-1], 256))[2])
            for d in data
        ]
    )  # 절대값을 사용하여 실수 데이터로 변환

In [53]:
for index, folder_name in enumerate(folders):
    concatenated_df[folder_index[index]] = compute_stft(
        concatenated_df[folder_index[index]]
    )

In [54]:
# STFT 데이터 합치기
def concatenate_data(*stft_data):
    min_time_steps = min(stft.shape[2] for stft in stft_data)
    return np.concatenate([stft[:, :, :min_time_steps] for stft in stft_data], axis=0)

In [55]:
for index, folder_name in enumerate(folders):
    concatenated_df[folder_index[index]] = concatenate_data(
        concatenated_df[folder_index[index]]
    )

In [56]:
# X 데이터 합치기
X = np.concatenate(
    [concatenated_df[folder_index[index]] for index, folder_name in enumerate(folders)],
    axis=0,
)

In [57]:
# 레이블 생성
Y = np.concatenate(
    (
        np.zeros(concatenated_df[folder_index[0]].shape[0]),
        np.ones(concatenated_df[folder_index[1]].shape[0]),
        np.full(concatenated_df[folder_index[2]].shape[0], 2),
        np.full(concatenated_df[folder_index[3]].shape[0], 3),
        np.full(concatenated_df[folder_index[4]].shape[0], 4),
    )
)
# 원핫 인코딩ㅇ
Y = to_categorical(Y)

In [58]:
# 데이터 분할 (6:2:2)
X_train_val, X_test, Y_train_val, Y_test = train_test_split(
    X, Y, test_size=0.2, random_state=42
)
X_train, X_val, Y_train, Y_val = train_test_split(
    X_train_val, Y_train_val, test_size=0.25, random_state=42
)

In [59]:
# 데이터 형태 확인
# (데이터 수, 시간, 특징 수)
print("Train data shape:", X_train.shape)
print("Test data shape:", X_test.shape)
print("Validation data shape:", X_val.shape)
print("Train label shape:", Y_val.shape)
print("Train label shape:", Y_train.shape)
print("Test label shape:", Y_test.shape)


Train data shape: (5400000, 3, 3)
Test data shape: (1800000, 3, 3)
Validation data shape: (1800000, 3, 3)
Train label shape: (1800000, 5)
Train label shape: (5400000, 5)
Test label shape: (1800000, 5)


In [60]:
# 데이터 형태 변환  (샘플 수, 시간, 특징 수)
X_train = X_train.reshape(
    X_train.shape[0], X_train.shape[1], -1
)  
X_val = X_val.reshape(X_val.shape[0], X_val.shape[1], -1)
X_test = X_test.reshape(X_test.shape[0], X_test.shape[1], -1)

In [61]:
# 코드 확인
print("Train data shape after reshape:", X_train.shape)
print("Validation data shape after reshape:", X_val.shape)
print("Test data shape after reshape:", X_test.shape)

Train data shape after reshape: (5400000, 3, 3)
Validation data shape after reshape: (1800000, 3, 3)
Test data shape after reshape: (1800000, 3, 3)


### 1. STFT + LSTM

In [62]:
# LSTM 모델 생성
input_shape = (X_train.shape[1], X_train.shape[2])
# LSTM Autoencoder 모델 정의
def create_lstm(input_shape):
    inputs = Input(shape=input_shape)
    encoded = LSTM(128, activation="relu", return_sequences=True)(inputs)
    encoded = LSTM(64, activation="relu", return_sequences=True)(encoded)
    encoded = LSTM(32, activation="relu")(encoded)
    dense = Dense(64, activation="relu")(encoded)
    dropout = Dropout(0.2)(dense)
    outputs = Dense(5, activation="softmax")(dropout)
    model = Model(inputs, outputs)
    return model

In [63]:
# 모델 생성
model = create_lstm(input_shape)
model.compile(optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"])
model.summary()

In [64]:
# 얼리스탑 콜백 설정
early_stopping = EarlyStopping(monitor="val_loss", patience=10, verbose=1)

In [65]:
# 모델 훈련
history = model.fit(
    X_train,
    Y_train,
    epochs=100,
    batch_size=256,
    validation_data=(X_val, Y_val),
    callbacks=[early_stopping],
)

Epoch 1/100
[1m21094/21094[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m302s[0m 14ms/step - accuracy: 0.2561 - loss: 1.5610 - val_accuracy: 0.3932 - val_loss: 1.2858
Epoch 2/100
[1m21094/21094[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m302s[0m 14ms/step - accuracy: 0.4325 - loss: 1.2068 - val_accuracy: 0.5899 - val_loss: 0.9349
Epoch 3/100
[1m21094/21094[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m304s[0m 14ms/step - accuracy: 0.5539 - loss: 0.9745 - val_accuracy: 0.6490 - val_loss: 0.8022
Epoch 4/100
[1m21094/21094[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m305s[0m 14ms/step - accuracy: 0.6117 - loss: 0.8600 - val_accuracy: 0.7106 - val_loss: 0.6737
Epoch 5/100
[1m21094/21094[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m317s[0m 15ms/step - accuracy: 0.6867 - loss: 0.7025 - val_accuracy: 0.8018 - val_loss: 0.4937
Epoch 6/100
[1m21094/21094[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m296s[0m 14ms/step - accuracy: 0.7373 - loss: 0.5935 - val_accuracy: 0.7966

KeyboardInterrupt: 

In [None]:
# STFT + LSTM 모델 평가
# 훈련 및 검증 데이터에 대한 손실 및 정확도 그래프
train_loss = history.history["loss"]
val_loss = history.history["val_loss"]
train_accuracy = history.history["accuracy"]
val_accuracy = history.history["val_accuracy"]

plt.plot(train_loss, label="Train Loss")
plt.plot(val_loss, label="Validation Loss")
plt.title("Loss")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()
plt.show()

plt.plot(train_accuracy, label="Train Accuracy")
plt.plot(val_accuracy, label="Validation Accuracy")
plt.title("Accuracy")
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.legend()
plt.show()

In [None]:
# 모델 평가
test_loss, test_accuracy = model.evaluate(X_test, Y_test)
print(f"Test Loss: {test_loss}")
print(f"Test Accuracy: {test_accuracy}")

# 예측 및 성능 평가
y_pred = model.predict(X_test)
y_pred_classes = np.argmax(y_pred, axis=1)
y_true_classes = np.argmax(Y_test, axis=1)

print(classification_report(y_true_classes, y_pred_classes))

### 2. STFT + LSTM Autoencoder

In [None]:
input_shape = (X_train.shape[1], X_train.shape[2])
# LSTM Autoencoder 모델 정의
def create_lstm_autoencoder(input_shape):
    inputs = Input(shape=input_shape)
    # Encoder
    encoded = LSTM(64, return_sequences=True)(inputs)
    encoded = LSTM(32, return_sequences=False)(encoded)
    # Repeat Vector
    repeated = RepeatVector(input_shape[0])(encoded)
    # Decoder
    decoded = LSTM(32, return_sequences=True)(repeated)
    decoded = LSTM(64, return_sequences=True)(decoded)
    outputs = TimeDistributed(Dense(input_shape[1]))(decoded)
    model = Model(inputs, outputs)
    return model

In [None]:
# Autoencoder에 인코딩된 특징을 활용하여 분류 모델 생성
def create_classifier(input_shape, num_classes):
    classifier = Sequential()
    classifier.add(Dense(64, input_shape=input_shape))
    classifier.add(LeakyReLU(alpha=0.01))  # Leaky ReLU activation function
    classifier.add(Dropout(0.5))
    classifier.add(Dense(32))
    classifier.add(LeakyReLU(alpha=0.01))  # Leaky ReLU activation function
    classifier.add(Dropout(0.2))
    classifier.add(Dense(num_classes, activation="softmax"))

    classifier.compile(
        optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"]
    )

    return classifier

In [None]:
# Autoencoder 모델 생성
model = create_lstm_autoencoder(input_shape)
model.compile(optimizer="adam", loss="mse")
model.summary()

In [None]:
# 모델 훈련
history = model.fit(
    X_train,
    X_train,
    epochs=5,
    batch_size=128,
    validation_data=(X_val, X_val),
    callbacks=[early_stopping],
)

In [None]:
# 훈련 및 검증 데이터에 대한 손실 그래프
train_loss = history.history["loss"]
val_loss = history.history["val_loss"]

plt.plot(train_loss, label="Train Loss")
plt.plot(val_loss, label="Validation Loss")
plt.title("Loss")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()
plt.show()

In [None]:
# LSTM Autoencoder에서 인코딩된 특징 추출
encoder = Model(inputs=model.input, outputs=model.layers[2].output)
X_train_encoded = encoder.predict(X_train)
X_val_encoded = encoder.predict(X_val)
X_test_encoded = encoder.predict(X_test)

In [None]:
# 다중 클래스 분류 모델 생성
classifier = create_classifier((X_train_encoded.shape[1],), 5)
classifier.compile(
    optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"]
)
classifier.summary()

In [None]:
# 다중 클래스 분류 모델 훈련
history = classifier.fit(
    X_train_encoded,
    Y_train,
    epochs=100,
    batch_size=256,
    validation_data=(X_val_encoded, Y_val),
    callbacks=[early_stopping],
)

In [None]:
# 훈련 및 검증 데이터에 대한 손실 및 정확도 그래프
train_loss = history.history["loss"]
val_loss = history.history["val_loss"]
train_accuracy = history.history["accuracy"]
val_accuracy = history.history["val_accuracy"]

plt.plot(train_loss, label="Train Loss")
plt.plot(val_loss, label="Validation Loss")
plt.title("Loss")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()
plt.show()

plt.plot(train_accuracy, label="Train Accuracy")
plt.plot(val_accuracy, label="Validation Accuracy")
plt.title("Accuracy")
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.legend()
plt.show()

In [None]:
# 모델 평가
test_loss, test_accuracy = classifier.evaluate(X_test_encoded, Y_test)
print(f"Test Loss: {test_loss}")
print(f"Test Accuracy: {test_accuracy}")

# 예측 및 성능 평가
y_pred = classifier.predict(X_test_encoded)
y_pred_classes = np.argmax(y_pred, axis=1)
y_true_classes = np.argmax(Y_test, axis=1)
print(classification_report(y_true_classes, y_pred_classes))

### 3. STFT + Transformer

In [None]:
# Transformer 모델 생성
input_shape = (X_train.shape[1], X_train.shape[2])

def create_transformer_model(input_shape, num_heads=4, ff_dim=32):
    inputs = Input(shape=input_shape)
    attn_output = MultiHeadAttention(num_heads=num_heads, key_dim=input_shape[1])(
        inputs, inputs
    )
    attn_output = Dropout(0.1)(attn_output)
    attn_output = LayerNormalization(epsilon=1e-6)(attn_output + inputs)

    ff_output = Dense(ff_dim, activation="relu")(attn_output)
    ff_output = Dense(input_shape[1])(ff_output)
    ff_output = Dropout(0.1)(ff_output)
    ff_output = LayerNormalization(epsilon=1e-6)(ff_output + attn_output)

    outputs = GlobalAveragePooling1D()(ff_output)
    outputs = Dense(5, activation="softmax")(outputs)

    model = Model(inputs, outputs)
    return model

In [None]:
# Transformer 모델 생성
model = create_transformer_model(input_shape)
model.compile(optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"])
model.summary()

In [None]:
# 모델 훈련
history = model.fit(
    X_train,
    Y_train,
    epochs=100,
    batch_size=256,
    validation_data=(X_val, Y_val),
    callbacks=[early_stopping],
)

In [None]:
# 훈련 및 검증 데이터에 대한 손실 및 정확도 그래프
train_loss = history.history["loss"]
val_loss = history.history["val_loss"]
train_accuracy = history.history["accuracy"]
val_accuracy = history.history["val_accuracy"]

plt.plot(train_loss, label="Train Loss")
plt.plot(val_loss, label="Validation Loss")
plt.title("Loss")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()
plt.show()

plt.plot(train_accuracy, label="Train Accuracy")
plt.plot(val_accuracy, label="Validation Accuracy")
plt.title("Accuracy")
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.legend()
plt.show()

In [None]:
# 모델 평가
test_loss, test_accuracy = model.evaluate(X_test, Y_test)
print(f"Test Loss: {test_loss}")
print(f"Test Accuracy: {test_accuracy}")
 
# 예측 및 성능 평가
y_pred = model.predict(X_test)
y_pred_classes = np.argmax(y_pred, axis=1)
y_true_classes = np.argmax(Y_test, axis=1)
print(classification_report(y_true_classes, y_pred_classes))

### 4. STFT + LSTM AutoEncoder + Attention

In [None]:
input_shape = (X_train.shape[1], X_train.shape[2])
# LSTM Autoencoder 모델 정의 (Attention 추가)
def create_lstm_ae_with_attention(input_shape):
    inputs = Input(shape=input_shape)
    encoded = LSTM(64, return_sequences=True)(inputs)
    encoded = LSTM(32, return_sequences=True)(encoded)
    attention = Attention()(encoded)
    repeated = RepeatVector(input_shape[0])(attention)
    decoded = LSTM(32, return_sequences=True)(repeated)
    decoded = LSTM(64, return_sequences=True)(decoded)
    outputs = TimeDistributed(Dense(input_shape[1]))(decoded)
    model = Model(inputs, outputs)
    return model

In [None]:
# Autoencoder 모델 생성
model = create_lstm_ae_with_attention(input_shape)
model.compile(optimizer="adam", loss="mse")
model.summary()

In [None]:
# 모델 훈련
history = model.fit(
    X_train,
    X_train,
    epochs=5,
    batch_size=128,
    validation_data=(X_val, X_val),
    callbacks=[early_stopping],
)

In [None]:
# 훈련 및 검증 데이터에 대한 손실 그래프
train_loss = history.history["loss"]
val_loss = history.history["val_loss"]

plt.plot(train_loss, label="Train Loss")
plt.plot(val_loss, label="Validation Loss")
plt.title("Loss")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()
plt.show()

In [None]:
# LSTM Autoencoder에서 인코딩된 특징 추출
encoder = Model(inputs=model.input, outputs=model.layers[3].output)
X_train_encoded = encoder.predict(X_train)
X_val_encoded = encoder.predict(X_val)
X_test_encoded = encoder.predict(X_test)

In [None]:
# 다중 클래스 분류 모델 생성
classifier = create_classifier((X_train_encoded.shape[1],), 5)
classifier.compile(
    optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"]
)
classifier.summary()

In [None]:
# 다중 클래스 분류 모델 훈련
history = classifier.fit(
    X_train_encoded,
    Y_train,
    epochs=100,
    batch_size=256,
    validation_data=(X_val_encoded, Y_val),
    callbacks=[early_stopping],
)

In [None]:
# 훈련 및 검증 데이터에 대한 손실 및 정확도 그래프
train_loss = history.history["loss"]
val_loss = history.history["val_loss"]
train_accuracy = history.history["accuracy"]
val_accuracy = history.history["val_accuracy"]

plt.plot(train_accuracy, label="Train Accuracy")
plt.plot(val_accuracy, label="Validation Accuracy")
plt.title("Accuracy")
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.legend()
plt.show()

In [None]:
# 모델 평가
test_loss, test_accuracy = classifier.evaluate(X_test_encoded, Y_test)
print(f"Test Loss: {test_loss}")
print(f"Test Accuracy: {test_accuracy}")

# 예측 및 성능 평가
y_pred = classifier.predict(X_test_encoded)
y_pred_classes = np.argmax(y_pred, axis=1)
y_true_classes = np.argmax(Y_test, axis=1)
print(classification_report(y_true_classes, y_pred_classes))

### 5.  STFT + Conv LSTM

In [None]:
# ConvLSTM + Attention 모델 생성
def create_convlstm(input_shape):
    inputs = Input(shape=input_shape)
    conv_lstm = ConvLSTM2D(
        filters=64, kernel_size=(3, 1), padding="same", return_sequences=True
    )(inputs)
    conv_lstm = BatchNormalization()(conv_lstm)
    conv_lstm = ConvLSTM2D(
        filters=32, kernel_size=(3, 1), padding="same", return_sequences=True
    )(conv_lstm)
    conv_lstm = BatchNormalization()(conv_lstm)

    conv_lstm_decoded = ConvLSTM2D(
        filters=32, kernel_size=(3, 1), padding="same", return_sequences=True
    )(conv_lstm)
    conv_lstm_decoded = BatchNormalization()(conv_lstm_decoded)
    conv_lstm_decoded = ConvLSTM2D(
        filters=64, kernel_size=(3, 1), padding="same", return_sequences=True
    )(conv_lstm_decoded)
    conv_lstm_decoded = BatchNormalization()(conv_lstm_decoded)
    outputs = Flatten()(conv_lstm_decoded)
    outputs = Dense(5, activation="softmax")(outputs)

    model = Model(inputs, outputs)
    return model

In [None]:
# 데이터 변환
X_train = X_train.reshape(X_train.shape[0], 1, X_train.shape[1], X_train.shape[2], 1, 1)
X_val = X_val.reshape(X_val.shape[0], 1, X_val.shape[1], X_val.shape[2], 1, 1)
X_test = X_test.reshape(X_test.shape[0], 1, X_test.shape[1], X_test.shape[2], 1, 1)

In [None]:
# 데이터 형태 확인
print("Train data shape:", X_train.shape)
print("Validation data shape:", X_val.shape)
print("Test data shape:", X_test.shape)
print("Train label shape:", Y_train.shape)
print("Validation label shape:", Y_val.shape)
print("Test label shape:", Y_test.shape)

In [None]:
# 입력 형태 정의
input_shape = (X_train.shape[1], X_train.shape[2], X_train.shape[3], X_train.shape[4])

In [None]:
# ConvLSTM 모델 생성
model = create_convlstm(input_shape)
model.compile(optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"])
model.summary()

In [None]:
# 모델 훈련
history = model.fit(
    X_train,
    Y_train,
    epochs=100,
    batch_size=256,
    validation_data=(X_val, Y_val),
    callbacks=[early_stopping],
)

In [None]:
# 훈련 및 검증 데이터에 대한 손실 및 정확도 그래프
train_loss = history.history["loss"]
val_loss = history.history["val_loss"]
train_accuracy = history.history["accuracy"]
val_accuracy = history.history["val_accuracy"]

plt.plot(train_loss, label="Train Loss")
plt.plot(val_loss, label="Validation Loss")
plt.title("Loss")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()
plt.show()

plt.plot(train_accuracy, label="Train Accuracy")
plt.plot(val_accuracy, label="Validation Accuracy")
plt.title("Accuracy")
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.legend()
plt.show()

In [None]:
# 모델 평가
test_loss, test_accuracy = model.evaluate(X_test, Y_test)
print(f"Test Loss: {test_loss}")
print(f"Test Accuracy: {test_accuracy}")

# 예측 및 성능 평가
y_pred = model.predict(X_test)
y_pred_classes = np.argmax(y_pred, axis=1)
y_true_classes = np.argmax(Y_test, axis=1)

print(classification_report(y_true_classes, y_pred_classes))