# 1. CMAPSS 데이터 구조
## 1-1. train_FD001
- 여러 기기의 여러 센서 데이터와 RUL 정보를 포함한다.
- 각 엔진은 고장까지의 여러 시간 단계에서 데이터를 기록한다.
### 칼럼
- First column: Engine ID
- Second column: Time Cycle
- 3rd-5th column: Operational Settings
- 6th-26th column: Sensor measurements

## 1-2. test_FD001
- 고장 전 마지막까지의 데이터를 포함하며, 고장에 대한 정보는 포함하지 않고 있다.
- 이 데이터를 사용해서 고장 시점을 예측해야 한다.

## 1-3. RUL_FD001
- test 데이터에 해당하는 각 엔진의 실제 남은 수명(RUL)을 포함하는 레이블이다

# 2. 데이터 전처리

## 2-1. 데이터 로드

In [1]:
import pandas as pd

# define filepath to read dataset
data_path = './dataset/'

# define column names
columns = ['engine_id', 'time_in_cycles'] + \
          ['operational_setting_1', 'operational_setting_2', 'operational_setting_3'] + \
          [f'sensor_measurement_{i}' for i in range(1, 22)]

train_df = pd.read_csv((data_path+'train_FD001.txt'), sep=r'\s+', header=None, names=columns)
test_df = pd.read_csv((data_path+'test_FD001.txt'), sep=r'\s+', header=None, names=columns)
rul_test_df = pd.read_csv((data_path+'RUL_FD001.txt'), sep=r'\s+', header=None, names=['RUL'])

## 2-2. RUL 라벨 추가(train data)
- train 데이터는 고장 직전까지의 데이터를 가지고 있으므로, 각 엔진의 RUL 을 구할 수 있다.

In [2]:
# Max cycle per engine (각 엔진의 마지막 cycle 을 고장 cycle 로 간주)
max_cycle = train_df.groupby('engine_id')['time_in_cycles'].max()

# Caculate RUL (최대 사이클 - 해당 칼럼 사이클)
train_df = train_df.merge(max_cycle, on='engine_id', suffixes=('', '_max'))
train_df['RUL'] = train_df['time_in_cycles_max'] - train_df['time_in_cycles']
train_df.drop('time_in_cycles_max', axis=1, inplace=True)

## 2-3. 특성 엔지니어링
- 센서 데이터는 노이즈가 많을 수 있으므로, 불필요한 센서를 제거하거나 파생 변수를 생성하는 방법이 있다.

In [3]:
# 센서 값이 일정한 (변동이 없는) 컬럼을 제거
sensor_columns = [f'sensor_measurement_{i}' for i in range(1, 22)]
constant_sensors = train_df[sensor_columns].std(axis=0) == 0
train_df.drop(columns=constant_sensors.index[constant_sensors], axis=1, inplace=True)
test_df.drop(columns=constant_sensors.index[constant_sensors], axis=1, inplace=True)

## 2-4. 데이터 정규화
- 센서 데이터와 운영 조건들은 서로 다른 범위를 가질 수 있으므로, 이를 정규화해 주는 것이 일반적이다.

In [4]:
# Min-Max Scaler
from sklearn.preprocessing import MinMaxScaler

scaler = MinMaxScaler()
sensor_columns = [col for col in train_df.columns if 'sensor_measurement' in col]
train_df[sensor_columns] = scaler.fit_transform(train_df[sensor_columns])
test_df[sensor_columns] = scaler.transform(test_df[sensor_columns])

In [5]:
# # Z-Score Scaler
# from sklearn.preprocessing import StandardScaler

# scaler = StandardScaler()
# sensor_columns = [col for col in train_df.columns if 'sensor_measurement' in col]
# train_df[sensor_columns] = scaler.fit_transform(train_df[sensor_columns])
# test_df[sensor_columns] = scaler.transform(test_df[sensor_columns])

In [6]:
# Define the window size for moving average
window_size = 3  # You can adjust this value

# Function to apply moving average to sensor columns
def apply_moving_average(df):
    sensor_columns = [col for col in df.columns if 'sensor_measurement' in col]
    
    # Group by engine_id and apply moving average
    df_grouped = df.groupby('engine_id')[sensor_columns]
    
    # Apply rolling mean
    df[sensor_columns] = df_grouped.rolling(window=window_size, min_periods=1).mean().reset_index(level=0, drop=True)
    
    return df

# Apply moving average to train_df and test_df
train_df = apply_moving_average(train_df)
test_df = apply_moving_average(test_df)

In [7]:
# Re-scale sensor data
scaler = StandardScaler()
sensor_columns = [col for col in train_df.columns if 'sensor_measurement' in col]
train_df[sensor_columns] = scaler.fit_transform(train_df[sensor_columns])
test_df[sensor_columns] = scaler.transform(test_df[sensor_columns])

# 3. 학습 데이터 준비
- LSTM, GRU 와 같은 시계열 모델을 사용하려면 각 엔진의 시계열 데이터를 고정된 기이의 시퀀스로 변환해야 한다.
- 이를 위해 각 엔진의 데이터를 일정한 길이의 시퀀스로 잘라서 학습에 사용한다.
- 예를 들어, 각 엔진의 시간 축 데이터를 50 단위의 시퀀스로 만들 수 있다.

## 3-1. Create Sequence whith Sliding Window method
- Sliding Window 기법을 사용하여 각 엔진의 시계열 데이터를 고정된 길이로 자르고, 해당 시퀀스를 학습에 사용할 수 있다.
- 예를 들어, 각 엔진의 시계열 데이터를 50 사이클 단위로 자르는 코드를 작성해보자.

In [8]:
import numpy as np

# 시퀀스 길이 설정
sequence_length = 50  # 50

def create_sequences(data, sequence_length):
    """주어진 데이터에서 시퀀스를 만드는 함수"""
    sequences = []
    targets = []
    
    engines = data['engine_id'].unique()
    
    for engine_id in engines:
        engine_data = data[data['engine_id'] == engine_id]
        engine_values = engine_data.drop(columns=['engine_id', 'RUL']).values
        rul_values = engine_data['RUL'].values
        
        for i in range(len(engine_values) - sequence_length + 1):
            sequences.append(engine_values[i: i + sequence_length])
            targets.append(rul_values[i + sequence_length - 1])  # 시퀀스의 마지막 값이 목표 RUL
            
    return np.array(sequences), np.array(targets)

# Train data에서 시퀀스와 타겟 생성
train_sequences, train_targets = create_sequences(train_df, sequence_length)

# 실제 RUL 값은 numpy 배열로 변환
true_rul = rul_test_df.values.squeeze()  # RUL_FD001.csv에서 실제 RUL 값 (예: rul_test_df = pd.read_csv('RUL_FD001.csv'))

## 3-2. 테스트 데이터에 대한 시퀀스 생성
- 테스트 데이터는 각 엔진의 마지막 시점까지의 데이터를 포함하고 있다.
- 테스트 데이터를 처리할 때는 각 엔진의 마지막 sequence_length 만큼의 데이터를 사용하여 예측해야 한다.

In [9]:
# def create_test_sequences(data, sequence_length):
#     """테스트 데이터를 위해 마지막 sequence_length만큼 시퀀스를 만드는 함수"""
#     sequences = []
    
#     engines = data['engine_id'].unique()
    
#     for engine_id in engines:
#         engine_data = data[data['engine_id'] == engine_id]
#         engine_values = engine_data.drop(columns=['engine_id']).values
        
#         # 시퀀스 길이가 sequence_length보다 짧으면 제외
#         if len(engine_values) >= sequence_length:
#             sequences.append(engine_values[-sequence_length:])  # 마지막 sequence_length만큼 사용
    
#     return np.array(sequences)

# # Test data에서 시퀀스 생성
# test_sequences = create_test_sequences(test_df, sequence_length)

In [10]:
import numpy as np

def create_test_sequences(data, sequence_length):
    """테스트 데이터를 위해 마지막 sequence_length만큼 시퀀스를 만드는 함수"""
    sequences = []
    
    engines = data['engine_id'].unique()
    
    for engine_id in engines:
        engine_data = data[data['engine_id'] == engine_id]
        engine_values = engine_data.drop(columns=['engine_id']).values
        
        # 시퀀스 길이가 sequence_length보다 짧으면 앞에 0으로 패딩
        if len(engine_values) < sequence_length:
            padding = np.zeros((sequence_length - len(engine_values), engine_values.shape[1]))
            engine_values = np.vstack((padding, engine_values))
        
        sequences.append(engine_values[-sequence_length:])  # 마지막 sequence_length 만큼 사용
    
    return np.array(sequences)

# Test data에서 시퀀스 생성
test_sequences = create_test_sequences(test_df, sequence_length)

In [11]:
# import torch
# import torch.nn.utils.rnn as rnn_utils

# def create_test_sequences(data, sequence_length):
#     """테스트 데이터를 위해 마지막 sequence_length만큼 시퀀스를 만드는 함수"""
#     sequences = []
#     sequence_lengths = []
    
#     engines = data['engine_id'].unique()
    
#     for engine_id in engines:
#         engine_data = data[data['engine_id'] == engine_id]
#         engine_values = engine_data.drop(columns=['engine_id']).values
        
#         # 시퀀스 길이가 sequence_length보다 짧으면 앞에 0으로 패딩
#         if len(engine_values) < sequence_length:
#             padding = np.zeros((sequence_length - len(engine_values), engine_values.shape[1]))
#             engine_values = np.vstack((padding, engine_values))
        
#         sequences.append(engine_values[-sequence_length:])  # 마지막 sequence_length만큼 사용
#         sequence_lengths.append(len(engine_data))  # 원래 시퀀스 길이 저장
    
#     return torch.tensor(sequences, dtype=torch.float32), sequence_lengths

# # Test data에서 시퀀스 생성
# test_sequences, sequence_lengths = create_test_sequences(test_df, sequence_length)

# # PackedSequence로 변환
# packed_test_sequences = rnn_utils.pack_padded_sequence(test_sequences, sequence_lengths, batch_first=True, enforce_sorted=False)

## 3-3. 텐서 변환
- PyTorch 를 사용하기 위해서는 데이터를 텐서로 변환해야 한다.

In [12]:
import torch

# 데이터를 텐서로 변환
train_sequences_tensor = torch.tensor(train_sequences, dtype=torch.float32)
train_targets_tensor = torch.tensor(train_targets, dtype=torch.float32)
test_sequences_tensor = torch.tensor(test_sequences, dtype=torch.float32)
test_targets_tensor = torch.tensor(true_rul, dtype=torch.float32)

# 4. PyTorch LSTM 모델 설계
- LSTM Layer: 여러 개의 LSTM 레이어를 쌓은 구조이며, 각 타임 스텝의 출력을 계산한다.
- FC Layer: 마지막 LSTM 의 출력에서 가장 마지막 타임스텝의 출력을 가져와서 Fully Connected Layer 로 RUL 을 예측한다.

In [13]:
import torch.nn as nn

# 랜덤 시드 고정
def set_seed(seed=42):
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)  # if you are using multi-GPU.
    np.random.seed(seed)
    random.seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers):
        super(LSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, 1)  # RUL을 예측하기 위한 출력 레이어
        self.best_state_dict = None

    def forward(self, x):
        # LSTM에 입력: (batch_size, sequence_length, input_size)
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)  # hidden state 초기화
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)  # cell state 초기화
        
        # LSTM의 출력: (batch_size, sequence_length, hidden_size), (hn, cn)
        out, _ = self.lstm(x, (h0, c0))
        
        # 최종 타임스텝의 출력만 사용
        out = out[:, -1, :]  # (batch_size, hidden_size)
        
        # Fully connected layer를 통해 RUL 예측
        out = self.fc(out)
        return out

    # 최적의 가중치 저장하는 함수
    def save_best_state_dict(self):
        self.best_state_dict = self.state_dict()
        return self.best_state_dict

    # 최적의 가중치를 반환하는 함수
    def get_best_state_dict(self):
        return self.best_state_dict

# 스코어 함수 선언 asymmetric_scoring 함수는 조기 예측과 늦은 예측에 대해 서로 다른 가중치를 적용하여 스코어를 계산한다. (조기 예측 가중치 a1, 늦은 예측 가중치 a2)
def asymmetric_scoring(y_true, y_pred, a1=10, a2=13):
    """
    비대칭 스코어링 함수
    y_true: 실제 RUL 값 (numpy array)
    y_pred: 예측된 RUL 값 (numpy array)
    a1: 조기 예측에 대한 가중치
    a2: 늦은 예측에 대한 가중치
    """
    errors = y_pred - y_true
    scores = np.where(errors < 0, np.exp(-errors / a1) - 1, np.exp(errors / a2) - 1)
    return np.sum(scores)

# evaluate_algorithm 함수는 여러 UUT 에 대해 총 스코어를 계산한다.
def evaluate_algorithm(y_true_all, y_pred_all, a1=10, a2=13):
    """
    알고리즘 평가 함수
    y_true_all: 실제 RUL 값 리스트 (각 UUT별 numpy array)
    y_pred_all: 예측된 RUL 값 리스트 (각 UUT별 numpy array)
    a1: 조기 예측에 대한 가중치
    a2: 늦은 예측에 대한 가중치
    """
    total_score = 0

    for y_true, y_pred in zip(y_true_all, y_pred_all):
        score = asymmetric_scoring(y_true, y_pred, a1, a2)
        total_score += score
    
    return total_score

## 4-1. 모델 초기화
- LSTM 모델을 초기화하고, 입력 크기, LSTM 의 hidden size, LSTM 레이어 수 등ㅇ

In [55]:
import torch.optim as optim

input_size = train_sequences.shape[2]  # 센서/피처 수
hidden_size = 50  # LSTM hidden state 크기 (높을 수로 모델 복잡해짐)
num_layers = 2  # LSTM 레이어 수
learning_rate = 0.0005

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = LSTMModel(input_size, hidden_size, num_layers).to(device)  # 모델을 GPU로 이동

# 랜덤 시드 고정
set_seed(369)

# 손실 함수 및 옵티마이저 정의
criterion = nn.MSELoss()  # RUL 예측이므로 MSELoss 사용
optimizer = optim.Adam(model.parameters(), lr=learning_rate)  # Adam Optimizer 사용

# 5. 모델 학습

In [56]:
from torch.utils.data import TensorDataset, DataLoader

# 학습 설정
num_epochs = 200
batch_size = 128

# Early Stopping을 위한 변수 추가
best_val_loss = float('inf')
patience = 6 # 조기 종료를 위한 patience 값 설정 (100 epoch 동안 개선이 없으면 종료)
epochs_no_improve = 0 # 개선이 없었던 epoch 수를 기록하는 변수

# # 텐서로 변환된 학습 데이터 (train_sequences_tensor, train_targets_tensor)를 DataLoader로 묶기
# train_dataset = TensorDataset(train_sequences_tensor, train_targets_tensor)
# train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# # 학습 루프
# for epoch in range(num_epochs):
#     model.train()  # 학습 모드 설정
#     running_loss = 0.0
    
#     for sequences, targets in train_loader:
#         # 입력 데이터와 타겟을 GPU로 이동
#         sequences = sequences.to(device)
#         targets = targets.to(device)

#         # 옵티마이저의 기울기 초기화
#         optimizer.zero_grad()

#         # 모델의 예측값 계산
#         outputs = model(sequences)

#         # 손실 계산
#         loss = criterion(outputs.squeeze(), targets)  # MSE 손실 함수
#         loss.backward()  # 역전파로 기울기 계산
#         optimizer.step()  # 옵티마이저로 가중치 갱신

#         running_loss += loss.item()

#     # 매 epoch마다 평균 손실 출력
#     print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}")





# # 검증 데이터셋 생성 (예시로 일부 데이터를 분리)
# val_sequences_tensor, val_targets_tensor = train_sequences_tensor[:100], train_targets_tensor[:100]
# train_sequences_tensor, train_targets_tensor = train_sequences_tensor[100:], train_targets_tensor[100:]

# val_dataset = TensorDataset(val_sequences_tensor, val_targets_tensor)
# val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

# # 모델 저장을 위한 초기 설정
# best_val_loss = float('inf')
# best_model_path = 'best_model.pth'

# # 학습 및 검증 루프
# for epoch in range(num_epochs):
#     model.train()
#     running_loss = 0.0
#     for sequences, targets in train_loader:
#         sequences = sequences.to(device)
#         targets = targets.to(device)
#         optimizer.zero_grad()
#         outputs = model(sequences)
#         loss = criterion(outputs.squeeze(), targets)
#         loss.backward()
#         optimizer.step()
#         running_loss += loss.item()
    
#     # 검증 단계
#     model.eval()
#     val_loss = 0.0
#     with torch.no_grad():
#         for sequences, targets in val_loader:
#             sequences = sequences.to(device)
#             targets = targets.to(device)
#             outputs = model(sequences)
#             loss = criterion(outputs.squeeze(), targets)
#             val_loss += loss.item()

#     avg_val_loss = val_loss / len(val_loader)
#     print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}, Val Loss: {avg_val_loss:.4f}")

#     # 최적 모델 저장
#     if avg_val_loss < best_val_loss:
#         best_val_loss = avg_val_loss
#         torch.save(model.state_dict(), best_model_path)
#         print("Saved Best Model")


# 텐서로 변환된 학습 데이터 (train_sequences_tensor, train_targets_tensor)를 DataLoader로 묶기
train_dataset = TensorDataset(train_sequences_tensor, train_targets_tensor)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# 검증 데이터셋 크기 증가 (훈련 데이터의 20% 정도 추천)
val_size = int(len(train_sequences_tensor) * 0.2)  # 훈련 데이터의 20%를 검증 데이터로 사용
val_sequences_tensor, val_targets_tensor = train_sequences_tensor[:val_size], train_targets_tensor[:val_size]
train_sequences_tensor, train_targets_tensor = train_sequences_tensor[val_size:], train_targets_tensor[val_size:]

val_dataset = TensorDataset(val_sequences_tensor, val_targets_tensor)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

# 모델 저장을 위한 초기 설정
best_model_path = 'best_model.pth'

# 학습 및 검증 루프 + Early Stopping
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for sequences, targets in train_loader:
        sequences = sequences.to(device)
        targets = targets.to(device)
        optimizer.zero_grad()
        outputs = model(sequences)
        loss = criterion(outputs.squeeze(), targets)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    # 검증 단계
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for sequences, targets in val_loader:
            sequences = sequences.to(device)
            targets = targets.to(device)
            outputs = model(sequences)
            loss = criterion(outputs.squeeze(), targets)
            val_loss += loss.item()

    avg_val_loss = val_loss / len(val_loader)
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}, Val Loss: {avg_val_loss:.4f}")

    # Early Stopping 구현
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        torch.save(model.state_dict(), best_model_path)
        epochs_no_improve = 0
        print("Saved Best Model")
    else:
        epochs_no_improve += 1
        if epochs_no_improve >= patience:
            print(f"Early stopping triggered after {epoch+1} epochs.")
            break

Epoch [1/200], Loss: 12976.6323, Val Loss: 7998.0570
Saved Best Model
Epoch [2/200], Loss: 12532.1970, Val Loss: 7628.6327
Saved Best Model
Epoch [3/200], Loss: 12147.8591, Val Loss: 7383.7244
Saved Best Model
Epoch [4/200], Loss: 11904.7468, Val Loss: 7222.2206
Saved Best Model
Epoch [5/200], Loss: 11718.8541, Val Loss: 7085.2933
Saved Best Model
Epoch [6/200], Loss: 11553.5585, Val Loss: 6957.7430
Saved Best Model
Epoch [7/200], Loss: 11398.0580, Val Loss: 6837.7968
Saved Best Model
Epoch [8/200], Loss: 11249.9520, Val Loss: 6722.4985
Saved Best Model
Epoch [9/200], Loss: 11106.7126, Val Loss: 6611.1820
Saved Best Model
Epoch [10/200], Loss: 10967.4377, Val Loss: 6503.8100
Saved Best Model
Epoch [11/200], Loss: 10832.6871, Val Loss: 6398.2556
Saved Best Model
Epoch [12/200], Loss: 10700.3567, Val Loss: 6295.7413
Saved Best Model
Epoch [13/200], Loss: 10570.7031, Val Loss: 6196.7166
Saved Best Model
Epoch [14/200], Loss: 10444.7347, Val Loss: 6098.7012
Saved Best Model
Epoch [15/200],

In [57]:
from sklearn.metrics import mean_squared_error, r2_score

# 테스트 데이터 예측
model.load_state_dict(torch.load(best_model_path))
model.eval()  # 평가 모드로 전환 (드롭아웃 등을 비활성화)
with torch.no_grad():
    test_sequences_tensor = test_sequences_tensor.to(device)  # 테스트 데이터도 GPU로 이동
    predicted_rul = model(test_sequences_tensor)

# 모델이 예측한 RUL 값
predicted_rul = predicted_rul.cpu().numpy().squeeze()  # GPU에서 예측한 값을 numpy로 변환

total_score = evaluate_algorithm(true_rul, predicted_rul)
rmse = np.sqrt(mean_squared_error(true_rul, predicted_rul))
r2 = r2_score(true_rul, predicted_rul)

# 결과 출력
print(f"Test Score: {total_score}")
print(f'Test RMSE: {rmse:.2f}')
print(f'Test R² Score: {r2:.4f}')

Test Score: 523.6329494714737
Test RMSE: 18.58
Test R² Score: 0.8002


  model.load_state_dict(torch.load(best_model_path))


## 손실함수로 score function 사용

In [25]:
# 1. 비대칭 스코어링 함수를 PyTorch 로 변환
import random
import numpy as np
import torch

# 비대칭 스코어링 함수
def asymmetric_scoring(y_true, y_pred, a1=10, a2=13):
    errors = y_pred - y_true
    scores = torch.where(errors < 0, torch.exp(-errors / a1) - 1, torch.exp(errors / a2) - 1)
    return torch.sum(scores)

# 비대칭 스코어링 손실 함수
class AsymmetricLoss(nn.Module):
    def __init__(self, a1=10, a2=13):
        super(AsymmetricLoss, self).__init__()
        self.a1 = a1
        self.a2 = a2

    def forward(self, y_pred, y_true):
        return asymmetric_scoring(y_true, y_pred, self.a1, self.a2)

# 모델 정의
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers):
        super(LSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, 1)  # RUL을 예측하기 위한 출력 레이어

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        
        out, _ = self.lstm(x, (h0, c0))
        out = out[:, -1, :]
        out = self.fc(out)
        return out

In [62]:
# 2. 학습 루프에서 비대칭 스코어링 함수 사용
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader

# 랜덤 시드 고정
# set_seed(100)

input_size = train_sequences.shape[2]
hidden_size = 100
num_layers = 2

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = LSTMModel(input_size, hidden_size, num_layers).to(device)

# 손실 함수 및 옵티마이저 정의
criterion = AsymmetricLoss(a1=10, a2=13)
optimizer = optim.Adam(model.parameters(), lr=0.001)
# optimizer = optim.Adam(model.parameters(), lr=0.005)

# 학습 설정
num_epochs = 50
batch_size = 128

# 텐서로 변환된 학습 데이터 (train_sequences_tensor, train_targets_tensor)를 DataLoader로 묶기
train_dataset = TensorDataset(train_sequences_tensor, train_targets_tensor)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# 학습 루프
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    
    for sequences, targets in train_loader:
        sequences = sequences.to(device)
        targets = targets.to(device)

        optimizer.zero_grad()
        outputs = model(sequences)
        loss = criterion(outputs.squeeze(), targets)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}")

Epoch [1/50], Loss: 2363077547786.2402
Epoch [2/50], Loss: 1461999608791.0400
Epoch [3/50], Loss: 1116232281948.1599
Epoch [4/50], Loss: 918693341921.2800
Epoch [5/50], Loss: 768967108362.2400
Epoch [6/50], Loss: 660711765852.1600
Epoch [7/50], Loss: 576426639564.8000
Epoch [8/50], Loss: 503785729802.2400
Epoch [9/50], Loss: 449084302458.8800
Epoch [10/50], Loss: 401665211760.6400
Epoch [11/50], Loss: 361450786119.6800
Epoch [12/50], Loss: 327860453007.3600
Epoch [13/50], Loss: 299702597570.5600
Epoch [14/50], Loss: 274530161131.5200
Epoch [15/50], Loss: 252994051522.5600
Epoch [16/50], Loss: 233641954631.6800
Epoch [17/50], Loss: 216781140213.7600
Epoch [18/50], Loss: 202506029260.8000
Epoch [19/50], Loss: 187832853888.0000
Epoch [20/50], Loss: 175959481589.7600
Epoch [21/50], Loss: 165361752460.8000
Epoch [22/50], Loss: 155305084108.8000
Epoch [23/50], Loss: 146429620715.5200
Epoch [24/50], Loss: 138139437219.8400
Epoch [25/50], Loss: 130480354590.7200
Epoch [26/50], Loss: 1235813470

In [63]:
# 3. 평가 루프에서 비대칭 스코어링 함수 사용
from sklearn.metrics import mean_squared_error, r2_score

# 평가 함수
def evaluate_algorithm(y_true_all, y_pred_all, a1=10, a2=13):
    total_score = 0

    for y_true, y_pred in zip(y_true_all, y_pred_all):
        score = asymmetric_scoring(torch.tensor(y_true), torch.tensor(y_pred), a1, a2)
        total_score += score.item()
    
    return total_score

# 테스트 데이터셋 준비
test_dataset = TensorDataset(test_sequences_tensor, test_targets_tensor)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# 모델을 평가 모드로 설정
model.eval()
true_rul = []
predicted_rul = []

with torch.no_grad():
    for sequences, targets in test_loader:
        sequences = sequences.to(device)
        targets = targets.to(device)
        outputs = model(sequences)
        true_rul.append(targets.cpu().numpy())
        predicted_rul.append(outputs.squeeze().cpu().numpy())

# 리스트를 numpy 배열로 변환
true_rul = np.concatenate(true_rul)
predicted_rul = np.concatenate(predicted_rul)

# 평가
total_score = evaluate_algorithm(true_rul, predicted_rul, a1=10, a2=13)
rmse = np.sqrt(mean_squared_error(true_rul, predicted_rul))
r2 = r2_score(true_rul, predicted_rul)

# 결과 출력
print(f"Test Score: {total_score}")
print(f'Test RMSE: {rmse:.2f}')
print(f'Test R² Score: {r2:.4f}')

Test Score: 177852.19478714466
Test RMSE: 53.57
Test R² Score: -0.6616
