<a href="https://colab.research.google.com/github/ByeonJaeseong/DeepLearningProject/blob/main/train.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
!pip install -U keras-tuner

import pandas as pd
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from kerastuner.tuners import RandomSearch


#uploaded = files.upload()



  from kerastuner.tuners import RandomSearch


In [5]:
# 데이터 불러오기
lane_data_c = pd.read_csv('lane_data_c.csv', encoding='utf-8')
lane_data_s = pd.read_csv('lane_data_s.csv', encoding='utf-8')
data_c30 = pd.read_csv('data_c30.csv', encoding='utf-8')

# 데이터 결합
data_combined = pd.concat([lane_data_c, lane_data_s, data_c30], axis=1)
data_combined = data_combined.loc[:, ~data_combined.columns.duplicated()]
data_combined = data_combined.drop_duplicates(subset='Distance', keep='first')  # 첫 번째 중복 행만 남기기
# 가중치 계산 함수 정의
def weighted_mape(y_true, y_pred, weights):
    return np.sum(weights * np.abs((y_true - y_pred) / y_true)) / np.sum(weights) * 100

# 사용자 정의 Weighted MAPE 손실 함수
def weighted_mape_loss(weights):
    def loss(y_true, y_pred):
        return tf.reduce_sum(weights * tf.abs((y_true - y_pred) / y_true)) / tf.reduce_sum(weights) * 100
    return loss

# 입력 변수와 탈선계수 분리
X = data_combined.drop(['YL_M1_B1_W1', 'YR_M1_B1_W1', 'YL_M1_B1_W2', 'YR_M1_B1_W2'], axis=1)
y = data_combined[['YL_M1_B1_W1', 'YR_M1_B1_W1', 'YL_M1_B1_W2', 'YR_M1_B1_W2']]

# 데이터 정규화
scaler = MinMaxScaler()
X_scaled = scaler.fit_transform(X)

# 학습 데이터와 테스트 데이터 분할
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2, random_state=42)



In [12]:
X_train.shape
# X.head()
# X.describe()

(9600, 38)

In [6]:
# 시계열 데이터와 특성 데이터 분리
time_series_data = data_combined.iloc[:, 0].values
feature_data = data_combined.iloc[:, 1:].values

# 데이터 정규화
scaler = MinMaxScaler()
time_series_data = scaler.fit_transform(time_series_data.reshape(-1, 1)).flatten()
feature_data = scaler.fit_transform(feature_data)

# 시계열 데이터와 특성 데이터를 각각 3D 텐서로 변환
time_steps = 10
X_time_series = []
X_features = []

for i in range(len(time_series_data) - time_steps):
    X_time_series.append(time_series_data[i:i + time_steps])
    X_features.append(feature_data[i:i + time_steps])

X_time_series = np.array(X_time_series)
X_features = np.array(X_features)

# 출력 데이터
y = time_series_data[time_steps:]

# 학습 데이터와 테스트 데이터 분할
X_time_series_train, X_time_series_test, X_features_train, X_features_test, y_train, y_test = train_test_split(
    X_time_series, X_features, y, test_size=0.2, random_state=42)

# 가중치 계산 함수 정의
def weighted_mape(y_true, y_pred, weights):
    return np.sum(weights * np.abs((y_true - y_pred) / y_true)) / np.sum(weights) * 100

# 사용자 정의 Weighted MAPE 손실 함수
def weighted_mape_loss(weights):
    def loss(y_true, y_pred):
        return tf.reduce_sum(weights * tf.abs((y_true - y_pred) / y_true)) / tf.reduce_sum(weights) * 100
    return loss

# 모델 생성 함수 정의
def build_model(hp):
    model = Sequential()
    model.add(LSTM(units=hp.Int('units', min_value=32, max_value=128, step=32), input_shape=(X_time_series_train.shape[1], 1), activation='relu', return_sequences=True))
    model.add(LSTM(units=hp.Int('units', min_value=32, max_value=128, step=32), activation='relu'))
    model.add(Dense(1))  # 시계열 데이터 예측이므로 출력 뉴런 수는 1
    model.compile(optimizer='adam', loss='mean_squared_error')
    return model

# Early Stopping 설정
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)

# Model Checkpoint 설정
model_checkpoint = ModelCheckpoint('best_model.h5', save_best_only=True, save_weights_only=True, monitor='val_loss', mode='min')

# Hyperparameter Tuning 설정
tuner = RandomSearch(build_model,
                     objective='val_loss',
                     max_trials=10,
                     directory='tuner_results',
                     project_name='my_tuning_project')

# 모델 훈련
tuner.search(X_time_series_train, y_train, epochs=50, batch_size=32, validation_data=(X_time_series_test, y_test),
             callbacks=[early_stopping, model_checkpoint])

# 튜닝 결과 요약
tuner.results_summary()

Trial 4 Complete [00h 09m 24s]
val_loss: 6.643612326939774e-09

Best val_loss So Far: 6.643612326939774e-09
Total elapsed time: 00h 14m 36s
Results summary
Results in tuner_results/my_tuning_project
Showing 10 best trials
Objective(name="val_loss", direction="min")

Trial 03 summary
Hyperparameters:
units: 128
Score: 6.643612326939774e-09

Trial 02 summary
Hyperparameters:
units: 96
Score: 2.326107129135835e-08

Trial 01 summary
Hyperparameters:
units: 64
Score: 4.295517186392317e-08

Trial 00 summary
Hyperparameters:
units: 32
Score: 6.380202677291891e-08


In [7]:
# 최적 모델 선택
best_model = tuner.get_best_models(num_models=1)[0]

# 학습
best_model.fit(X_time_series_train, y_train, epochs=50, batch_size=32, validation_data=(X_time_series_test, y_test),
               callbacks=[early_stopping, model_checkpoint])

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50


<keras.callbacks.History at 0x79454552e1d0>

In [10]:
# 다음 1999개의 샘플 예측
next_samples = 1999
X_time_series_predict = X_time_series[-next_samples:]
X_features_predict = X_features[-next_samples:]

# 모델 예측
predictions = best_model.predict([X_time_series_predict, X_features_predict])

# 예측 결과 업데이트
answer_sample = pd.read_csv('answer_sample.csv', header=None)
answer_sample.iloc[:, 1:5] = predictions  # 2번째 열부터 5번째 열에 예측 결과 저장
answer_sample.to_csv('answer_sample.csv', index=False, header=False)  # 결과를 파일에 저장

ValueError: ignored