In [None]:
import numpy as np
import tensorflow as tf
import random
import os
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import keras
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
import re
import matplotlib.pyplot as plt
import keras_tuner as kt
import joblib

# Colab 의 한글 폰트 설정(현재 나눔바른고딕 폰트가 설치가 제대로 되지 않아 맑은 고딕으로 폰트 설정)
plt.rc('font', family='Malgun Gothic')

In [None]:
# 랜덤 시드 고정
np.random.seed(42)
tf.random.set_seed(42)

# 모든 랜덤 시드 설정 함수
def set_all_seeds(seed=42):
    np.random.seed(seed)
    tf.random.set_seed(seed)
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)

In [None]:
# 고수위 데이터 불러오기
Rain_WL = pd.read_excel('고수위_이벤트_데이터.xlsx')
Rain_WL.head()

In [None]:
# X, Y, 이벤트 ID 데이터 설정
Multi_X =  Rain_WL[['금곡교 수위', '요천대교 수위', '금곡교 강우량', '요천대교 강우량']]
Multi_Y = Rain_WL[['금곡교 수위', '요천대교 수위', '고달교 수위']]
event_ids = Rain_WL['이벤트 번호']

In [None]:
# Function to plot water levels and rainfall for each event
def plot_event_data(data, feature_names, event_ids, event_name):
    plt.figure(figsize=(15, 10))

    num_features = len(feature_names)
    event_data = data[event_ids == event_name]
    
    time_steps = range(event_data.shape[0])
    
    for i, feature_name in enumerate(feature_names):
        plt.subplot(num_features, 1, i+1)
        plt.plot(time_steps, event_data[:, i], label=f'{feature_name}')
        plt.title(f'{event_name} - {feature_name}', fontsize=15)
        plt.xlabel('Time Step', fontsize=12)
        plt.ylabel(feature_name, fontsize=12)
        plt.legend()

    plt.tight_layout()
    plt.show()

# Features names for water levels and rainfall
feature_names = ['금곡교 수위', '요천대교 수위', '고달교 수위']

# Convert DataFrame to numpy array for easier indexing
Multi_X_array = Multi_X.values
Multi_Y_array = Multi_Y.values
event_ids_array = event_ids.values

# Get unique event names
unique_events = np.unique(event_ids_array)

# Plot for each event
for event_name in unique_events:
    plot_event_data(Multi_Y_array, feature_names, event_ids_array, event_name)

In [None]:
scaler_X2 = MinMaxScaler()
scaler_Y2 = MinMaxScaler()

In [None]:
# 데이터 정규화
Multi_X_normalized = scaler_X2.fit_transform(Multi_X)
Multi_Y_normalized = scaler_Y2.fit_transform(Multi_Y)

scaler_X2_path="scaler_x2.pkl"
scaler_Y2_path="scaler_y2.pkl"

joblib.dump(scaler_X2, scaler_X2_path)
joblib.dump(scaler_Y2, scaler_Y2_path)

In [None]:
# 시퀀스 생성 함수 (리드 타임 포함)
def create_sequences_with_shift(X, y, sequence_length=1):
    """
    시퀀스를 생성하는 함수로, 첫 번째 스텝에서는 T-n+1 ~ T+1의 강우 데이터를 사용하고,
    이후 스텝부터는 T-n+2 ~ T+2로 한 시간씩 이동합니다. 수위는 처음 스텝에서 T-n ~ T를 사용하고
    이후 스텝마다 한 시간씩 이동합니다.

    :param X: 입력 데이터 (강우, 수위)
    :param y: 출력 데이터 (예측할 고달교 수위)
    :param sequence_length: 시퀀스의 길이
    :return: 입력 시퀀스 X_seq, 출력 시퀀스 Y_seq
    """
    X_seq, Y_seq = [], []

    # T-n부터 시작하는 시퀀스 생성
    for i in range(len(X) - sequence_length):
        # 각 스텝에서 강우값과 수위값이 다르게 이동하도록 처리
        # 첫 번째 스텝: 강우는 T-n+1 ~ T+1, 수위는 T-n ~ T
        if i == 0:
            rainfall_input = X[i+1:i + sequence_length+1, :2]  # 강우값: T-n+1 ~ T+1
            water_level_input = X[i:i + sequence_length, 2:]  # 수위값: T-n ~ T
        else:
            # 이후 스텝: 강우는 T-n+2 ~ T+2, 수위는 T-n+1 ~ T
            rainfall_input = X[i+1:i + sequence_length+1, :2]  # 강우값: T-n+2 ~ T+2
            water_level_input = X[i:i + sequence_length, 2:]  # 수위값: T-n+1 ~ T

        # 시퀀스 결합 (강우 + 수위)
        combined_input = np.concatenate([rainfall_input, water_level_input], axis=-1)
        X_seq.append(combined_input)

        # 출력 데이터는 y에서 고달교 수위를 예측 (T+1, T+2, ...)
        Y_seq.append(y[i + sequence_length])

    return np.array(X_seq), np.array(Y_seq)

In [None]:
# 이벤트 ID 목록 가져오기 및 정렬
event_ids_unique = Rain_WL['이벤트 번호'].unique()
def extract_event_number(event_id):
    return int(re.search(r'\d+', event_id).group())
event_ids_unique_sorted = sorted(event_ids_unique, key=extract_event_number)

In [None]:
def split_data(X_normalized, Y_normalized, sequence_length):
    set_all_seeds(42)  # 랜덤 시드 고정
    
    X_train, X_val, X_test_1, X_test_2 = [], [], [], []
    Y_train, Y_val, Y_test_1, Y_test_2 = [], [], [], []
    assigned_train, assigned_val, assigned_test = [], [], []

    # 마지막 이벤트를 테스트 세트로 설정
    test_event_id_1 = event_ids_unique[-2]
    test_event_id_2 = event_ids_unique[-1]
    # 마지막에서 두 번째 이벤트를 검증 세트로 설정
    val_event_id = event_ids_unique[-3]

    # 각 이벤트에 대해 데이터 분할
    for event_id in event_ids_unique:
        set_all_seeds(42)
        event_indices = np.where(event_ids == event_id)[0]  # 이벤트에 해당하는 인덱스 추출
        event_data_X = X_normalized[event_indices]
        event_data_Y = Y_normalized[event_indices]

        # 시퀀스 생성 (create_sequences_with_shift 함수 사용)
        if len(event_data_X) >= sequence_length:
            event_X_seq, event_Y_seq = create_sequences_with_shift(event_data_X, event_data_Y, sequence_length)

            # 이벤트에 따라 데이터 세트 분할
            if event_id == test_event_id_1:
                X_test_1.extend(event_X_seq)
                Y_test_1.extend(event_Y_seq)
                assigned_test.append(event_id)
            elif event_id == test_event_id_2:
                X_test_2.extend(event_X_seq)
                Y_test_2.extend(event_Y_seq)
                assigned_test.append(event_id)
            elif event_id == val_event_id:
                X_val.extend(event_X_seq)
                Y_val.extend(event_Y_seq)
                assigned_val.append(event_id)
            else:
                X_train.extend(event_X_seq)
                Y_train.extend(event_Y_seq)
                assigned_train.append(event_id)

    # 리스트를 numpy 배열로 변환
    X_train, Y_train = np.array(X_train), np.array(Y_train)
    X_val, Y_val = np.array(X_val), np.array(Y_val)
    X_test_1, Y_test_1 = np.array(X_test_1), np.array(Y_test_1)
    X_test_2, Y_test_2 = np.array(X_test_2), np.array(Y_test_2)

    return X_train, Y_train, X_val, Y_val, X_test_1, Y_test_1, X_test_2, Y_test_2, assigned_train, assigned_val, assigned_test

In [None]:
from tensorflow.keras import Input, Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.optimizers import Adam
import keras_tuner as kt
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint

# MyHyperModel 클래스 정의
class MyHyperModel(kt.HyperModel):
    # build 함수 정의
    def build(self, hp):
        sequence_length = hp.Int('sequence_length', min_value=6, max_value=24, step=6)
        
        model = Sequential()
        model.add(Input(shape=(sequence_length, Multi_X.shape[1])))
    
        model.add(LSTM(units=hp.Int('units', min_value=32, max_value=192, step=32), return_sequences=True))
        model.add(LSTM(units=hp.Int('units', min_value=32, max_value=192, step=32)))
    
        model.add(Dropout(rate=hp.Float('dropout', min_value=0.0, max_value=0.5, step=0.1)))
    
        model.add(Dense(units=3))
    
        model.compile(
            optimizer=Adam(learning_rate=hp.Choice('learning_rate', values=[1e-1, 1e-2, 1e-3, 1e-4, 1e-5, 1e-6])),
            loss='mean_squared_error'
        )
        return model
    
    def fit(self, hp, model, *args, **kwargs):
        sequence_length = hp.get('sequence_length')
        lead_time = 1

        X_train, Y_train, X_val, Y_val, X_test_1, Y_test_1, X_test_2, Y_test_2, assigned_train, assigned_val, assigned_test = split_data(Multi_X_normalized, Multi_Y_normalized, sequence_length)
        
        callbacks = [
            EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True),
            ModelCheckpoint("best_model5.keras", monitor='val_loss', save_best_only=True)
                    ]
        
        return model.fit(
            X_train, Y_train,
            validation_data=(X_val, Y_val),
            epochs=kwargs.get('epochs', 100),
            batch_size=hp.Int('batch_size', min_value=16, max_value=64, step=16),
            callbacks=callbacks
        )
    
# 초기 데이터셋 분할
sequence_length_initial = 6
lead_time = 1
Multi_X_train, Multi_Y_train, Multi_X_val, Multi_Y_val, Multi_X_test_1, Multi_Y_test_1, Multi_X_test_2, Multi_Y_test_2, assigned_train, assigned_val, assigned_test = split_data(Multi_X_normalized, Multi_Y_normalized,sequence_length_initial)
    
# 튜너 디렉토리 생성
tuner_dir_2 = r'C:\Users\HydroLab\OneDrive\hp_tuner_use'
# 하이퍼파라미터 튜너 설정
tuner_2 = kt.RandomSearch(
    MyHyperModel(),
    objective='val_loss',
    max_trials=20,
    executions_per_trial=2,
    directory=tuner_dir_2,
    project_name='Rec_future_rainfall_adjusted'
)

# 튜닝 실행
tuner_2.search(Multi_X_train, Multi_Y_train, 
               epochs=100, 
               validation_data=(Multi_X_val, Multi_Y_val))

# 최적 하이퍼파라미터 출력
best_hps_2 = tuner_2.get_best_hyperparameters()[0]

print(f"""
The optimal number of units in the LSTM layer is {best_hps_2.get('units')}.
The optimal dropout rate is {best_hps_2.get('dropout')}.
The optimal learning rate for the optimizer is {best_hps_2.get('learning_rate')}.
The optimal sequence length is {best_hps_2.get('sequence_length')}.
The optimal batch size is {best_hps_2.get('batch_size')}.
""")

In [None]:
def build_and_train_model(sequence_length, units, dropout_rate, learning_rate, batch_size, epochs, X_train, Y_train, X_val, Y_val, model_save_path):
    # 모델 설계
    model = Sequential()
    model.add(Input(shape=(sequence_length, X_train.shape[2])))
    
    model.add(LSTM(units=units, return_sequences=True))
    model.add(LSTM(units=units))
    
    model.add(Dropout(rate=dropout_rate))
    
    model.add(Dense(units=3))  # 출력 노드 수: 3 (금곡교 수위, 요천대교 수위, 고달교 수위)
    
    model.compile(optimizer=Adam(learning_rate=learning_rate), loss='mean_squared_error')
    
    # 콜백 설정
    callbacks = [
        EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True),
        ModelCheckpoint(model_save_path, monitor='val_loss', save_best_only=True)
    ]
    
    # 모델 학습
    history = model.fit(
        X_train, Y_train,
        validation_data=(X_val, Y_val),
        epochs=epochs,
        batch_size=batch_size,
        callbacks=callbacks
    )
    
    return model, history

In [None]:
# 고정된 하이퍼파라미터
sequence_length = best_hps_2.get('sequence_length')  
units = best_hps_2.get('units')
dropout_rate = best_hps_2.get('dropout')
learning_rate = best_hps_2.get('learning_rate')
batch_size = best_hps_2.get('batch_size')
epochs = 100
model_save_path = 'best_model5.keras'  # 모델 저장 경로

Multi_X_train, Multi_Y_train, Multi_X_val, Multi_Y_val, Multi_X_test_1, Multi_Y_test_1, Multi_X_test_2, Multi_Y_test_2, assigned_train, assigned_val, assigned_test = split_data(Multi_X_normalized, Multi_Y_normalized, sequence_length)

# 모델 빌드 및 학습
best_model2, training_history = build_and_train_model(
    sequence_length=sequence_length,
    units=units,
    dropout_rate=dropout_rate,
    learning_rate=learning_rate,
    batch_size=batch_size,
    epochs=epochs,
    X_train=Multi_X_train,
    Y_train=Multi_Y_train,
    X_val=Multi_X_val,
    Y_val=Multi_Y_val,
    model_save_path=model_save_path
)

# 학습된 모델 저장
best_model2.save(model_save_path)

In [None]:
# 손실 그래프
plt.figure(figsize=(10, 5))
plt.plot(training_history.history['loss'], label='Training Loss', color='blue', linewidth = 3)
plt.plot(training_history.history['val_loss'], label='Validation Loss', color='red', linewidth = 3)
plt.title('Training and Validation Loss', fontsize=30, fontweight = 'bold')
plt.xlabel('Epoch', fontsize=25, fontweight = 'bold')
plt.ylabel('Loss', fontsize=25, fontweight = 'bold')
plt.xticks(fontsize=20)
plt.yticks(fontsize=20)
plt.legend(fontsize=18, loc = 'upper right')
plt.show()

In [None]:
def recursive_prediction_multiple_events(model, X_test_1, X_test_2, rainfall_data_1, rainfall_data_2, forecast_steps, sequence_length):
    """
    두 가지 이벤트에 대해 재귀적 예측을 수행하는 함수로, 각 예측 스텝마다 새로운 강우 데이터와 예측된 수위값을 입력에 추가합니다.
    
    :param model: 학습된 모델
    :param X_test_1: 첫 번째 이벤트의 테스트 데이터 시퀀스 (강우와 수위 데이터 포함)
    :param X_test_2: 두 번째 이벤트의 테스트 데이터 시퀀스 (강우와 수위 데이터 포함)
    :param rainfall_data_1: 첫 번째 이벤트의 강우 데이터 (금곡교와 요천대교 강우량)
    :param rainfall_data_2: 두 번째 이벤트의 강우 데이터 (금곡교와 요천대교 강우량)
    :param forecast_steps: 예측할 시간 스텝 수 (예: 12시간 예측)
    :param sequence_length: 시퀀스 길이
    
    :return: 두 이벤트 각각의 예측된 고달교 수위값 리스트
    """
    np.random.seed(42)
    predicted_water_levels_1 = []  # 첫 번째 이벤트의 예측된 고달교 수위를 저장할 리스트
    predicted_water_levels_2 = []  # 두 번째 이벤트의 예측된 고달교 수위를 저장할 리스트
    
    # 첫 번째 이벤트에 대한 재귀적 예측
    X_sequence_1 = X_test_1.copy()
    for t in range(forecast_steps):
        prediction_1 = model.predict(X_sequence_1)
        print(f"Predicted_1 shape at step {t+1}: {prediction_1.shape}")
        
        predicted_water_levels_1.append(prediction_1)
        
        # T+2 시점의 강우 데이터를 추출
        if t + sequence_length < rainfall_data_1.shape[0]:
            next_rainfall_1 = rainfall_data_1[:, -1, :]  # T+2 시점의 강우 데이터 마지막 값 추출 (배치 크기에 맞게)
        else:
            next_rainfall_1 = rainfall_data_1[:, -1, :]  # 마지막 강우값을 사용
        print(f'next_rainfall_1 shape: {next_rainfall_1.shape}')  # (88, 2)

        # 예측된 수위 (금곡교 및 요천대교 수위 예측값) 추출
        predicted_levels_1 = prediction_1[:, :2]  # (88, 2)

        # 수위 예측값과 강우 데이터를 결합
        new_input_1 = np.concatenate([predicted_levels_1[:, np.newaxis, :], next_rainfall_1[:, np.newaxis, :]], axis=2)  # (88, 1, 4)

        print(f'new_input_1 shape: {new_input_1.shape}')

        # 앞의 1시간을 제거하고 새로운 입력을 시퀀스에 추가하여 길이 유지 (6으로 유지)
        X_sequence_1 = np.concatenate([X_sequence_1[:, 1:, :], new_input_1], axis=1)  # 시퀀스 업데이트
    
    # 두 번째 이벤트에 대한 재귀적 예측
    X_sequence_2 = X_test_2.copy()
    for t in range(forecast_steps):
        prediction_2 = model.predict(X_sequence_2)
        predicted_water_levels_2.append(prediction_2)
        print(f"Predicted_2 shape at step {t+1}: {prediction_2.shape}")
        
        # T+2 시점의 각 시퀀스에 해당하는 강우 데이터를 추출
        if t + sequence_length < rainfall_data_2.shape[0]:
            next_rainfall_2 = rainfall_data_2[:, -1, :]  # T+2 시점의 강우 데이터 마지막 값 추출 (배치 크기에 맞게)
        else:
            next_rainfall_2 = rainfall_data_2[:, -1, :]  # 마지막 강우값을 사용

        # 예측된 수위 (금곡교 및 요천대교 수위 예측값) 추출
        predicted_levels_2 = prediction_2[:, :2]  # (88, 2)

        # 수위 예측값과 강우 데이터를 결합
        new_input_2 = np.concatenate([predicted_levels_2[:, np.newaxis, :], next_rainfall_2[:, np.newaxis, :]], axis=2)  # (88, 1, 4)

        # 앞의 1시간을 제거하고 새로운 입력을 시퀀스에 추가하여 길이 유지 (6으로 유지)
        X_sequence_2 = np.concatenate([X_sequence_2[:, 1:, :], new_input_2], axis=1)

    return np.array(predicted_water_levels_1), np.array(predicted_water_levels_2)


In [None]:
# 시드 고정 함수
set_all_seeds(42)

# 각각의 이벤트에 대한 강우 데이터를 따로 추출
print(Multi_X_test_1.shape)
print(Multi_X_test_2.shape)
rainfall_data_1 = Multi_X_test_1[:, :, 2:4]  # 첫 번째 이벤트의 강우 데이터 (금곡교와 요천대교 강우량)
rainfall_data_2 = Multi_X_test_2[:, :, 2:4]  # 두 번째 이벤트의 강우 데이터 (금곡교와 요천대교 강우량)
print(rainfall_data_1.shape)
print(rainfall_data_2.shape)

# 재귀적 예측 실행 (두 가지 이벤트)
forecast_steps = 12
predicted_water_levels_1, predicted_water_levels_2 = recursive_prediction_multiple_events(
    best_model2, 
    Multi_X_test_1,  # 첫 번째 이벤트의 첫 시퀀스
    Multi_X_test_2,  # 두 번째 이벤트의 첫 시퀀스
    rainfall_data_1, 
    rainfall_data_2, 
    forecast_steps, 
    sequence_length
)

# 예측 결과 출력
print("첫 번째 이벤트의 예측된 고달교 수위값 (T+1, T+2, T+3):")
print(predicted_water_levels_1)

print("두 번째 이벤트의 예측된 고달교 수위값 (T+1, T+2, T+3):")
print(predicted_water_levels_2)



In [None]:
###### 예측값 및 실제값 역변환
predictions1_actual = {}
predictions2_actual = {}

for i in range(forecast_steps):
    # 예측된 값의 차원에 맞추기
    predictions1_padded = np.zeros((predicted_water_levels_1[i].shape[0], 3))
    predictions2_padded = np.zeros((predicted_water_levels_2[i].shape[0], 3))
    
    # 예측된 값 (고달교 수위)을 실제 데이터의 위치에 넣기
    predictions1_padded[:, :3] = predicted_water_levels_1[i][:, :3]
    predictions2_padded[:, :3] = predicted_water_levels_2[i][:, :3]
    
    # 역변환
    predictions1_actual[f'step_{i+1}'] = scaler_Y2.inverse_transform(predictions1_padded)
    print(predictions1_actual[f'step_{i+1}']) 
    print(predictions1_actual[f'step_{i+1}'].shape)
    predictions2_actual[f'step_{i+1}'] = scaler_Y2.inverse_transform(predictions2_padded)
    print(predictions2_actual[f'step_{i+1}']) 
    print(predictions2_actual[f'step_{i+1}'].shape)
    
# 실제값 역변환
test1 = {f"{forecast_steps}hr_actual": scaler_Y2.inverse_transform(Multi_Y_test_1)}
test2 = {f"{forecast_steps}hr_actual": scaler_Y2.inverse_transform(Multi_Y_test_2)}

# 결과 확인
for i in range(forecast_steps):
    print(f"Test set 1 predictions for step {i+1} vs actuals:")
    print(predictions1_actual[f'step_{i+1}'].shape)
    print(test1[f"{forecast_steps}hr_actual"].shape)
    print("Predictions:", predictions1_actual[f'step_{i+1}'][:, 2])  # 고달교 수위 값 비교
    print("Actuals:", test1[f"{forecast_steps}hr_actual"][:, 2])
    
    print(f"\nTest set 2 predictions for step {i+1} vs actuals:")
    print(predictions2_actual[f'step_{i+1}'].shape)
    print("Predictions:", predictions2_actual[f'step_{i+1}'][:, 2])  # 고달교 수위 값 비교
    print("Actuals:", test2[f"{forecast_steps}hr_actual"][:, 2])

In [None]:
# Event 1에 해당하는 관측 일시 데이터를 가져오기
def get_time_index(event_label, Y_test_actual):
    event_time_data = Rain_WL[Rain_WL['이벤트 번호'] == event_label]['관측 일시']
    if event_time_data.empty:
        print(f"Event {event_label}에 대한 관측 일시 데이터가 없습니다.")
        return pd.Series([], dtype="datetime64[ns]")
    time_index = event_time_data.iloc[-len(Y_test_actual):].reset_index(drop=True)
    print(f"Generated time_index: {time_index}")
    return time_index

In [None]:
def mean_absolute_percentage_error(y_true, y_pred):
    """
    MAPE 계산 함수
    """
    y_true, y_pred = np.array(y_true), np.array(y_pred)
    return np.mean(np.abs((y_true - y_pred) / y_true)) * 100

def evaluate_and_plot_single_feature(Y_test_actual, Y_pred_actual, feature_index, event_label, step, feature_name, plot_type, lead_time):
    # 예측 결과의 차원을 확인하고 필요한 경우 조정
    if Y_pred_actual.ndim > 1:
        Y_pred_actual = Y_pred_actual[:, feature_index].reshape(-1)
    if Y_test_actual.ndim > 1:
        Y_test_actual = Y_test_actual[:, feature_index].reshape(-1)

    # 실제 값과 예측 값의 길이를 맞춤
    min_length = min(len(Y_test_actual), len(Y_pred_actual))
    Y_test_actual = Y_test_actual[-min_length:]
    Y_pred_actual = Y_pred_actual[-min_length:]

    # 시간 인덱스 생성
    time_index = get_time_index(event_label, Y_test_actual)

    mse = mean_squared_error(Y_test_actual, Y_pred_actual)
    mae = mean_absolute_error(Y_test_actual, Y_pred_actual)
    nse = r2_score(Y_test_actual, Y_pred_actual)
    rmse = np.sqrt(mse)
    
    # 피크 값을 계산
    predicted_peak = np.max(Y_pred_actual)
    real_peak = np.max(Y_test_actual)
    
    # QER 계산
    qer = ((predicted_peak - real_peak) / real_peak) * 100
    
    # MAPE 계산
    mape = mean_absolute_percentage_error(Y_test_actual, Y_pred_actual)

    metrics = {'MSE': mse, 'MAE': mae, 'NSE': nse, 'RMSE': rmse, 'QER': qer, 'MAPE': mape}

    # 지표 시각화
    fig, axes = plt.subplots(nrows=3, ncols=2, figsize=(12, 12))
    for (metric, value), ax in zip(metrics.items(), axes.flatten()):
        ax.text(0.5, 0.5, f'{metric}\n{value:.6f}', fontsize=30, ha='center', va='center')
        ax.tick_params(axis='both', which='both', bottom=False, top=False, left=False, right=False, 
                       labelbottom=False, labelleft=False)
    plt.tight_layout()
    plt.show()

    # 실제값과 예측값 비교 그래프
    plt.figure(figsize=(10, 5))
    if plot_type == 'line':
        plt.plot(time_index, Y_test_actual, label='Actual', color='blue', linewidth=3)
        plt.plot(time_index, Y_pred_actual, label='Predicted', color='red', linewidth=3)
    elif plot_type == 'bar':
        plt.bar(range(len(Y_test_actual)), Y_test_actual, label='Actual', color='blue')
        plt.bar(range(len(Y_pred_actual)), Y_pred_actual, label='Predicted', color='red', alpha=0.5)
    plt.title(f'{feature_name}\nPredicted Vs Actual\n({event_label}, Step={step})', fontsize=30, fontweight='bold')
    plt.xlabel('Date', fontsize=25, fontweight='bold')
    plt.ylabel('Water Level(EL.m)', fontsize=25, fontweight='bold')
    plt.xticks(rotation=45, fontsize=20)
    plt.yticks(fontsize=20)
    plt.legend(fontsize = 20)
    plt.show()

    # 산점도 그래프
    plt.figure(figsize=(8, 8))
    plt.scatter(Y_test_actual, Y_pred_actual, color='blue')
    plt.plot([Y_test_actual.min(), Y_test_actual.max()], [Y_test_actual.min(), Y_test_actual.max()], color='red', lw=2)
    plt.title(f'{feature_name}\nActual Vs Predicted\n({event_label}, Step={step})', fontsize=30, fontweight='bold')
    plt.xlabel('Actual(EL.m)', fontsize=25, fontweight='bold')
    plt.ylabel('Predicted(EL.m)', fontsize=25, fontweight='bold')
    plt.xticks(fontsize=20)
    plt.yticks(fontsize=20)
    plt.show()

    return mse, mae, nse, rmse, qer, mape

In [None]:
# 다단계 예측 평가 및 시각화
prediction_steps = 12

feature_names = ['Geumgok Bridge Water Level', 'Yocheon Bridge Water Level', 'Godal Bridge Water Level']
plot_types = ['line', 'line', 'line']  # 각 특성에 대한 그래프 종류

for steps in range(1, prediction_steps + 1):
    for feature_index, (feature_name, plot_type) in enumerate(zip(feature_names, plot_types)):
        print(f'{assigned_test[0]} ({steps}시간) - {feature_name}')
        evaluate_and_plot_single_feature(
            Y_test_actual=test1[f"{prediction_steps}hr_actual"], 
            Y_pred_actual=predictions1_actual[f'step_{steps}'], 
            feature_index=feature_index, 
            event_label=assigned_test[0], 
            step=steps, 
            feature_name=feature_name, 
            plot_type=plot_type, 
            lead_time=steps
        )
        
        print(f'{assigned_test[1]} ({steps}시간) - {feature_name}')
        evaluate_and_plot_single_feature(
            Y_test_actual=test2[f"{prediction_steps}hr_actual"], 
            Y_pred_actual=predictions2_actual[f'step_{steps}'], 
            feature_index=feature_index, 
            event_label=assigned_test[1], 
            step=steps, 
            feature_name=feature_name, 
            plot_type=plot_type, 
            lead_time=steps
        )

In [None]:
def mean_absolute_percentage_error(y_true, y_pred):
    """
    MAPE 계산 함수
    """
    y_true, y_pred = np.array(y_true), np.array(y_pred)
    return np.mean(np.abs((y_true - y_pred) / y_true)) * 100

def evaluate_and_plot_single_feature(Y_test_actual, Y_pred_actual, feature_index, event_label, step, feature_name, plot_type, lead_time):
    # 예측 결과의 차원을 확인하고 필요한 경우 조정
    if Y_pred_actual.ndim > 1:
        Y_pred_actual = Y_pred_actual[:, feature_index].reshape(-1)
    if Y_test_actual.ndim > 1:
        Y_test_actual = Y_test_actual[:, feature_index].reshape(-1)

    # 실제 값과 예측 값의 길이를 맞춤
    min_length = min(len(Y_test_actual), len(Y_pred_actual))
    Y_test_actual = Y_test_actual[-min_length:]
    Y_pred_actual = Y_pred_actual[-min_length:]

    # 시간 인덱스 생성
    time_index = get_time_index(event_label, Y_test_actual)

    # 성능 지표 계산
    mse = mean_squared_error(Y_test_actual, Y_pred_actual)
    mae = mean_absolute_error(Y_test_actual, Y_pred_actual)
    nse = r2_score(Y_test_actual, Y_pred_actual)
    rmse = np.sqrt(mse)
    
    # 피크 값을 계산
    predicted_peak = np.max(Y_pred_actual)
    real_peak = np.max(Y_test_actual)
    
    # QER 계산
    qer = ((predicted_peak - real_peak) / real_peak) * 100
    
    # MAPE 계산
    mape = mean_absolute_percentage_error(Y_test_actual, Y_pred_actual)

    # 모든 성능 지표를 포함한 딕셔너리
    metrics = {'MSE': mse, 'MAE': mae, 'NSE': nse, 'RMSE': rmse, 'QER': qer, 'MAPE': mape}

    # 지표 시각화
    fig, axes = plt.subplots(nrows=3, ncols=2, figsize=(12, 12))
    for (metric, value), ax in zip(metrics.items(), axes.flatten()):
        ax.text(0.5, 0.5, f'{metric}\n{value:.6f}', fontsize=30, ha='center', va='center')
        ax.tick_params(axis='both', which='both', bottom=False, top=False, left=False, right=False, 
                       labelbottom=False, labelleft=False)
    plt.tight_layout()
    plt.show()

    # 실제값과 예측값 비교 그래프
    plt.figure(figsize=(10, 5))
    if plot_type == 'line':
        plt.plot(time_index, Y_test_actual, label='Actual Value', color='blue', linewidth=3)
        plt.plot(time_index, Y_pred_actual, label='Predicted Value', color='red', linewidth=3)
    elif plot_type == 'bar':
        plt.bar(range(len(Y_test_actual)), Y_test_actual, label='Actual Value', color='blue')
        plt.bar(range(len(Y_pred_actual)), Y_pred_actual, label='Predicted Value', color='red', alpha=0.5)
    plt.title(f'{feature_name}\nPredicted Vs Actual\n({event_label}, Step={step})', fontsize=30, fontweight='bold')
    plt.xlabel('Date', fontsize=25, fontweight='bold')
    plt.ylabel(feature_name, fontsize=25, fontweight='bold')
    plt.xticks(rotation=45, fontsize=20)
    plt.yticks(fontsize=20)
    plt.legend()
    plt.show()

    # 산점도 그래프
    plt.figure(figsize=(8, 8))
    plt.scatter(Y_test_actual, Y_pred_actual, color='blue')
    plt.plot([Y_test_actual.min(), Y_test_actual.max()], [Y_test_actual.min(), Y_test_actual.max()], color='red', lw=2)
    plt.title(f'{feature_name}\nPredicted Vs Actual:\n({event_label}, Step={step})', fontsize=30, fontweight='bold')
    plt.xlabel('Actual Value', fontsize=25, fontweight='bold')
    plt.ylabel('Predicted Value', fontsize=25, fontweight='bold')
    plt.xticks(fontsize=20)
    plt.yticks(fontsize=20)
    plt.show()

    # 성능 지표 반환
    return mse, mae, nse, rmse, qer, mape

In [None]:
# Lead times 정의
lead_times = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]

# 이벤트별로 지표를 저장할 리스트 초기화
mse_recursive_all_steps_event_1 = []
mae_recursive_all_steps_event_1 = []
rmse_recursive_all_steps_event_1 = []
nse_recursive_all_steps_event_1 = []
qer_recursive_all_steps_event_1 = []
mape_recursive_all_steps_event_1 = []

mse_recursive_all_steps_event_2 = []
mae_recursive_all_steps_event_2 = []
rmse_recursive_all_steps_event_2 = []
nse_recursive_all_steps_event_2 = []
qer_recursive_all_steps_event_2 = []
mape_recursive_all_steps_event_2 = []

In [None]:
# 지표 계산 및 저장 부분에 디버깅 메시지 추가
for steps in range(1, prediction_steps + 1):
    print(f"Calculating metrics for step {steps}")

    # Event 6 평가 및 지표 저장
    mse, mae, r2, rmse, qer, mape = evaluate_and_plot_single_feature(
        test1[f"{prediction_steps}hr_actual"], 
        predictions1_actual[f'step_{steps}'], 
        feature_index=2,  # 고달교 수위
        event_label=assigned_test[0], 
        step=steps, 
        feature_name='Godal Bridge Water Level', 
        plot_type='line', 
        lead_time=steps
    )
    print(f"Step {steps} - Event 1 MSE: {mse}, MAE: {mae}, NSE: {r2}, RMSE: {rmse}, QER: {qer}, MAPE: {mape}")
    mse_recursive_all_steps_event_1.append(mse)
    mae_recursive_all_steps_event_1.append(mae)
    nse_recursive_all_steps_event_1.append(r2)
    rmse_recursive_all_steps_event_1.append(rmse)
    qer_recursive_all_steps_event_1.append(qer)
    mape_recursive_all_steps_event_1.append(mape)

    # Event 7 평가 및 지표 저장
    mse, mae, r2, rmse, qer, mape = evaluate_and_plot_single_feature(
        test2[f"{prediction_steps}hr_actual"], 
        predictions2_actual[f'step_{steps}'], 
        feature_index=2,  # 고달교 수위
        event_label=assigned_test[1], 
        step=steps, 
        feature_name='Godal Bridge Water Level', 
        plot_type='line', 
        lead_time=steps
    )
    print(f"Step {steps} - Event 2 MSE: {mse}, MAE: {mae}, NSE: {r2}, RMSE: {rmse}, QER: {qer}, MAPE: {mape}")
    mse_recursive_all_steps_event_2.append(mse)
    mae_recursive_all_steps_event_2.append(mae)
    nse_recursive_all_steps_event_2.append(r2)
    rmse_recursive_all_steps_event_2.append(rmse)
    qer_recursive_all_steps_event_2.append(qer)
    mape_recursive_all_steps_event_2.append(mape)

In [None]:
import pandas as pd
import openpyxl
from openpyxl.utils.dataframe import dataframe_to_rows  # dataframe_to_rows 임포트
import matplotlib.pyplot as plt
from matplotlib.ticker import FuncFormatter

# y축 값을 포맷팅하는 함수 정의 (음수 값을 포함)
def y_fmt(x, y):
    return '{:.2f}'.format(x)

# 2x2 배열로 그래프 그리기 함수
def plot_metrics_2x2(steps, metrics_dict, event_id, colors):
    fig, axes = plt.subplots(2, 2, figsize=(16, 12))  # 2x2 배열 서브플롯 생성
    fig.suptitle('[Recursive Prediction]', fontsize=40, fontweight='bold')

    metric_names = ['MAE', 'RMSE', 'NSE', 'QER']  # 그릴 지표들
    metric_units = {'MAE': '(m)', 'RMSE': '(m)', 'NSE': '', 'QER': '(%)'}  # 지표별 단위
    
    for i, metric_name in enumerate(metric_names):
        ax = axes[i//2, i%2]  # 2x2 배열에서 위치 계산
        
        # 지표 길이 확인 및 맞추기
        metric_values = metrics_dict[metric_name]
        if len(metric_values) != len(steps):
            metric_values = metric_values[:len(steps)]
        
        ax.plot(steps, metric_values, marker='o',  markersize = 10, color=colors[metric_name], label=f'{metric_name}')
        ax.set_title(f'{metric_name} by Lead Time - {event_id}', fontsize=30, fontweight='bold')
        ax.set_xlabel('Lead Time (hours)', fontsize=25, fontweight='bold')
        # y축 제목에 단위 추가
        ax.set_ylabel(f'{metric_name} {metric_units[metric_name]}', fontsize=25, fontweight='bold')
        ax.tick_params(axis='x', labelsize=20)
        ax.tick_params(axis='y', labelsize=20)
        ax.yaxis.set_major_formatter(FuncFormatter(y_fmt))  # y축 값 포맷 설정
        ax.grid(True)
        ax.legend(fontsize=20)

    plt.tight_layout(rect=[0, 0, 1, 0.95])  # 타이틀과 서브플롯 간격 조정
    plt.show()

# Steps 정의: 12시간까지만 포함
steps = list(range(1, 13))

# 지표별 색상 지정
colors = {
    'MSE': 'blue',
    'MAE': 'green',
    'RMSE': 'red',
    'NSE': 'purple',
    'QER': 'orange',
    'MAPE': 'brown'
}

try:
    # Workbook 생성
    from openpyxl import Workbook
    wb = Workbook()
    
    # 첫 시트는 이미 생성된 상태이므로 이를 설정하고 데이터 입력
    sheet1 = wb.active
    sheet1.title = f"Event_{assigned_test[0]}"
    
    # Event 1에 대한 지표 저장
    metrics_event_1 = {
        'MSE': mse_recursive_all_steps_event_1[:12],
        'MAE': mae_recursive_all_steps_event_1[:12],
        'RMSE': rmse_recursive_all_steps_event_1[:12],
        'NSE': nse_recursive_all_steps_event_1[:12],
        'QER': qer_recursive_all_steps_event_1[:12],
        'MAPE': mape_recursive_all_steps_event_1[:12]
    }
    
    df_event_1 = pd.DataFrame(metrics_event_1, index=steps)
    
    # 데이터 저장
    for r in dataframe_to_rows(df_event_1, index=True, header=True):
        sheet1.append(r)

    plot_metrics_2x2(steps, metrics_event_1, assigned_test[0], colors)

    # Event 2에 대한 지표 저장
    metrics_event_2 = {
        'MSE': mse_recursive_all_steps_event_2[:12],
        'MAE': mae_recursive_all_steps_event_2[:12],
        'RMSE': rmse_recursive_all_steps_event_2[:12],
        'NSE': nse_recursive_all_steps_event_2[:12],
        'QER': qer_recursive_all_steps_event_2[:12],
        'MAPE': mape_recursive_all_steps_event_2[:12]
    }

    df_event_2 = pd.DataFrame(metrics_event_2, index=steps)
    
    # Event 2 시트 추가 및 활성화
    sheet2 = wb.create_sheet(title=f"Event_{assigned_test[1]}")
    
    # 데이터 저장
    for r in dataframe_to_rows(df_event_2, index=True, header=True):
        sheet2.append(r)
    
    plot_metrics_2x2(steps, metrics_event_2, assigned_test[1], colors)

    # 파일 저장
    wb.save('metrics_by_event_재귀적예측_예보강우사용(Event8,9).xlsx')
    
    print("Metrics have been successfully saved to 'metrics_by_event_재귀적예측_예보강우사용(Event8,9).xlsx'")

except Exception as e:
    print(f"An error occurred: {e}")

In [None]:
import matplotlib.dates as mdates
import matplotlib.ticker as ticker

def plot_multiple_steps_in_grid(predictions1_actual, predictions2_actual, test1_actual, test2_actual, assigned_test, prediction_steps, feature_index, feature_name, plot_type, lead_time):
    # 테스트 세트 1의 시간 인덱스 가져오기
    time_index_1 = get_time_index(assigned_test[0], test1_actual[f"{prediction_steps}hr_actual"][:, feature_index])

    # 테스트 세트 1에 대한 그래프 그리기
    fig, axes = plt.subplots(nrows=3, ncols=4, figsize=(20, 15))
    fig.suptitle(f'{feature_name} - {assigned_test[0]}: Predicted Vs Actual (Steps=1~12)', fontsize=40, fontweight='bold')

    for steps in range(1, 13):  # lead time 1~12에 해당하는 그래프 생성
        ax = axes[(steps-1)//4, (steps-1)%4]  # 서브플롯 위치 계산

        # Y_test_actual과 Y_pred_actual 차원 맞추기 (Test Set 1)
        Y_test_actual = test1_actual[f"{prediction_steps}hr_actual"][:, feature_index]
        Y_pred_actual = predictions1_actual[f'step_{steps}'][:, feature_index]

        min_length = min(len(Y_test_actual), len(Y_pred_actual))
        Y_test_actual = Y_test_actual[-min_length:]
        Y_pred_actual = Y_pred_actual[-min_length:]
        time_index_plot = time_index_1[-min_length:]

        # 그래프 그리기
        if plot_type == 'line':
            ax.plot(time_index_plot, Y_test_actual, label='Actual', color='blue', linewidth=3)
            ax.plot(time_index_plot, Y_pred_actual, label='Predicted', color='red', linewidth=3)
        elif plot_type == 'bar':
            ax.bar(time_index_plot, Y_test_actual, label='Actual', color='blue')
            ax.bar(time_index_plot, Y_pred_actual, label='Predicted', color='red', alpha=0.5)

        ax.set_title(f'Step={steps} (Lead Time={steps})', fontsize=25, fontweight='bold')
        ax.set_xlabel('Date', fontsize=25, fontweight='bold')
        ax.set_ylabel('Water Level(EL.m)', fontsize=25, fontweight='bold')
        
        # 날짜 간격을 설정 (년-월-일 형식으로 변경)
        ax.xaxis.set_major_locator(mdates.AutoDateLocator())  # 자동으로 적절한 날짜 간격을 선택
        ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))  # 포맷을 년-월-일로 설정
        ax.xaxis.set_major_locator(ticker.MaxNLocator(nbins=5))  # 눈금의 최대 개수를 5개로 설정

        ax.tick_params(axis='x', labelsize=20, rotation=45)
        ax.tick_params(axis='y', labelsize=20)

        if steps == 1:  # 첫 번째 그래프에서만 범례 추가
            ax.legend(fontsize=18)

    plt.tight_layout(rect=[0, 0, 1, 0.95])  # 제목과 서브플롯 간격 조정
    plt.show()

    # 테스트 세트 2의 시간 인덱스 가져오기
    time_index_2 = get_time_index(assigned_test[1], test2_actual[f"{prediction_steps}hr_actual"][:, feature_index])

    # 테스트 세트 2에 대한 그래프 그리기
    fig, axes = plt.subplots(nrows=3, ncols=4, figsize=(20, 15))
    fig.suptitle(f'{feature_name} - {assigned_test[1]}: Predicted Vs Actual (Steps=1~12)', fontsize=40, fontweight='bold')

    for steps in range(1, 13):  # lead time 1~12에 해당하는 그래프 생성
        ax = axes[(steps-1)//4, (steps-1)%4]  # 서브플롯 위치 계산

        # Y_test_actual과 Y_pred_actual 차원 맞추기 (Test Set 2)
        Y_test_actual = test2_actual[f"{prediction_steps}hr_actual"][:, feature_index]
        Y_pred_actual = predictions2_actual[f'step_{steps}'][:, feature_index]

        min_length = min(len(Y_test_actual), len(Y_pred_actual))
        Y_test_actual = Y_test_actual[-min_length:]
        Y_pred_actual = Y_pred_actual[-min_length:]
        time_index_plot = time_index_2[-min_length:]

        # 그래프 그리기
        if plot_type == 'line':
            ax.plot(time_index_plot, Y_test_actual, label='Actual', color='blue', linewidth=3)
            ax.plot(time_index_plot, Y_pred_actual, label='Predicted', color='red', linewidth=3)
        elif plot_type == 'bar':
            ax.bar(time_index_plot, Y_test_actual, label='Actual', color='blue')
            ax.bar(time_index_plot, Y_pred_actual, label='Predicted', color='red', alpha=0.5)

        ax.set_title(f'Step={steps} (Lead Time={steps})', fontsize=25, fontweight='bold')
        ax.set_xlabel('Date', fontsize=25, fontweight='bold')
        ax.set_ylabel('Water Level(EL.m)', fontsize=25, fontweight='bold')
        
        # 날짜 간격을 설정 (년-월-일 형식으로 변경)
        ax.xaxis.set_major_locator(mdates.AutoDateLocator())
        ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
        ax.xaxis.set_major_locator(ticker.MaxNLocator(nbins=5))  # 눈금의 최대 개수를 5개로 설정

        ax.tick_params(axis='x', labelsize=20, rotation=45)
        ax.tick_params(axis='y', labelsize=20)

        if steps == 1:  # 첫 번째 그래프에서만 범례 추가
            ax.legend(loc='upper left', fontsize=18)

    plt.tight_layout(rect=[0, 0, 1, 0.95])  # 제목과 서브플롯 간격 조정
    plt.show()

# 수정된 함수로 예측 결과를 두 테스트 세트에 대해 3x4 배열로 출력
plot_multiple_steps_in_grid(predictions1_actual, predictions2_actual, test1, test2, assigned_test, prediction_steps, feature_index, feature_name, plot_type, lead_time)


In [None]:
# NSE 계산 함수 (Nash-Sutcliffe Efficiency)
def calculate_nse(y_true, y_pred):
    return 1 - (np.sum((y_true - y_pred) ** 2) / np.sum((y_true - np.mean(y_true)) ** 2))

# 리드 타임별 예측 결과를 3x4 배열로 출력하는 함수 (scatter plot + NSE 표시, NSE 값을 오른쪽 아래에 표시)
def plot_multiple_steps_in_grid_scatter_nse(predictions1_actual, predictions2_actual, test1_actual, test2_actual, assigned_test, prediction_steps, feature_index, feature_name, lead_time):
    # 테스트 세트 1에 대한 그래프 그리기
    fig, axes = plt.subplots(nrows=3, ncols=4, figsize=(20, 18))
    fig.suptitle(f'{feature_name} - {assigned_test[0]}: Actual vs Predicted (Steps=1~12)', fontsize=40, fontweight='bold')

    for steps in range(1, 13):  # lead time 1~12에 해당하는 그래프 생성
        ax = axes[(steps-1)//4, (steps-1)%4]  # 서브플롯 위치 계산

        # Y_test_actual과 Y_pred_actual 차원 맞추기 (Test Set 1)
        Y_test_actual = test1_actual[f"{prediction_steps}hr_actual"][:, feature_index]
        Y_pred_actual = predictions1_actual[f'step_{steps}'][:, feature_index]

        min_length = min(len(Y_test_actual), len(Y_pred_actual))
        Y_test_actual = Y_test_actual[-min_length:]
        Y_pred_actual = Y_pred_actual[-min_length:]

        # Scatter plot 그리기
        ax.scatter(Y_test_actual, Y_pred_actual, color='blue', alpha=0.5)
        ax.plot([min(Y_test_actual), max(Y_test_actual)], [min(Y_test_actual), max(Y_test_actual)], color='red', linewidth=2)  # y=x 선

        # NSE 계산 및 표시
        nse_value = calculate_nse(Y_test_actual, Y_pred_actual)
        # NSE 값을 표시하면서 반투명 흰색 배경 추가, 오른쪽 아래로 배치
        ax.text(0.95, 0.05, f'NSE: {nse_value:.4f}', transform=ax.transAxes, fontsize=28, verticalalignment='bottom', horizontalalignment='right',
                fontweight='bold', bbox=dict(facecolor='white', alpha=0.7))  # NSE를 오른쪽 아래로 배치

        ax.set_title(f'{assigned_test[0]}, Step={steps}', fontsize=30, fontweight='bold')
        ax.set_xlabel('Actual(EL.m)', fontsize=25, fontweight='bold')
        ax.set_ylabel('Predicted(EL.m)', fontsize=25, fontweight='bold')

        ax.tick_params(axis='x', labelsize=20)
        ax.tick_params(axis='y', labelsize=20)

    plt.tight_layout(rect=[0, 0, 1, 0.95])  # 제목과 서브플롯 간격 조정
    plt.show()

    # 테스트 세트 2에 대한 그래프 그리기
    fig, axes = plt.subplots(nrows=3, ncols=4, figsize=(20, 18))
    fig.suptitle(f'{feature_name} - {assigned_test[1]}: Actual vs Predicted (Steps=1~12)', fontsize=40, fontweight='bold')

    for steps in range(1, 13):  # lead time 1~12에 해당하는 그래프 생성
        ax = axes[(steps-1)//4, (steps-1)%4]  # 서브플롯 위치 계산

        # Y_test_actual과 Y_pred_actual 차원 맞추기 (Test Set 2)
        Y_test_actual = test2_actual[f"{prediction_steps}hr_actual"][:, feature_index]
        Y_pred_actual = predictions2_actual[f'step_{steps}'][:, feature_index]

        min_length = min(len(Y_test_actual), len(Y_pred_actual))
        Y_test_actual = Y_test_actual[-min_length:]
        Y_pred_actual = Y_pred_actual[-min_length:]

        # Scatter plot 그리기
        ax.scatter(Y_test_actual, Y_pred_actual, color='blue', alpha=0.5)
        ax.plot([min(Y_test_actual), max(Y_test_actual)], [min(Y_test_actual), max(Y_test_actual)], color='red', linewidth=2)  # y=x 선

        # NSE 계산 및 표시
        nse_value = calculate_nse(Y_test_actual, Y_pred_actual)
        # NSE 값을 표시하면서 반투명 흰색 배경 추가, 오른쪽 아래로 배치
        ax.text(0.95, 0.05, f'NSE: {nse_value:.4f}', transform=ax.transAxes, fontsize=28, verticalalignment='bottom', horizontalalignment='right',
                fontweight='bold', bbox=dict(facecolor='white', alpha=0.7))  # NSE를 오른쪽 아래로 배치

        ax.set_title(f'{assigned_test[1]}, Step={steps}', fontsize=30, fontweight='bold')
        ax.set_xlabel('Actual(EL.m)', fontsize=25, fontweight='bold')
        ax.set_ylabel('Predicted(EL.m)', fontsize=25, fontweight='bold')

        ax.tick_params(axis='x', labelsize=20)
        ax.tick_params(axis='y', labelsize=20)

    plt.tight_layout(rect=[0, 0, 1, 0.95])  # 제목과 서브플롯 간격 조정
    plt.show()

# 예측 결과를 두 테스트 세트에 대해 3x4 배열로 출력 (scatter plot + NSE)
feature_index = 2  # 고달교 수위 (Godal Bridge Water Level)
feature_name = 'Godal Bridge Water Level'
lead_time = 12

plot_multiple_steps_in_grid_scatter_nse(predictions1_actual, predictions2_actual, test1, test2, assigned_test, prediction_steps, feature_index, feature_name, lead_time)

In [None]:
def plot_lt13_ts_and_scatter(
    y_true_by_lt: dict,      # {lead_time: array}
    y_pred_by_lt: dict,      # {lead_time: array}
    t_by_lt: dict,           # {lead_time: DatetimeIndex/Series}
    station_name: str = "Godal Bridge",
    event_label: str = "Event#8",
    scenario_label: str = "Direct Prediction (no forecasted rainfall)",
    # 저장 관련 옵션
    save: bool = True,
    save_path: str | None = None,   # 지정 시 그대로 사용 (확장자 포함)
    outdir: str = ".",              # save_path 미지정 시 사용
    save_basename: str | None = None,
    save_formats: tuple = ("png",), # 예: ("png","svg","pdf")
    save_dpi: int = 300,
    transparent: bool = False,
    show: bool = True,
    day_interval: int = 1,           # 날짜 눈금 간격(일). 필요시 6 등으로 조정
    custom_nse: dict = None  # ✅ 새로 추가된 인자. 예: {1:0.87, 2:0.81, 3:0.79}
):
    """Lead time 1~3 시계열 + 산점도(+NSE) 플롯 생성 및 저장"""

    # --- NSE 계산 ---
    def _nse(y_true, y_pred):
        y_true = np.asarray(y_true, dtype=float)
        y_pred = np.asarray(y_pred, dtype=float)
        mask = np.isfinite(y_true) & np.isfinite(y_pred)
        y_true = y_true[mask]; y_pred = y_pred[mask]
        denom = np.sum((y_true - np.mean(y_true))**2)
        if denom == 0:
            return np.nan
        return 1.0 - np.sum((y_true - y_pred)**2) / denom

    # --- 이벤트별 y축 범위 고정 ---
    y_limits = None
    m = re.search(r'(\d+)', str(event_label))
    evnum = int(m.group(1)) if m else None
    if evnum == 8:
        y_limits = (44.7, 45.8)
    elif evnum == 9:
        y_limits = (44.5, 50)

    # --- LT 선택 ---
    lt_keys = sorted(set(y_true_by_lt.keys()) & set(y_pred_by_lt.keys()))
    if not lt_keys:
        raise ValueError("y_true_by_lt와 y_pred_by_lt의 공통 lead time 키가 없습니다.")
    lead_times = lt_keys[:3]

    # --- 시간축 정리 ---
    t_fixed = {}
    for lt in lead_times:
        yt = np.asarray(y_true_by_lt[lt]).reshape(-1)
        yp = np.asarray(y_pred_by_lt[lt]).reshape(-1)
        if len(yt) != len(yp):
            raise ValueError(f"lead time {lt}: 실제/예측 길이가 다릅니다. ({len(yt)} vs {len(yp)})")
        t = (t_by_lt or {}).get(lt, None)
        if t is None or len(t) != len(yt):
            t = pd.date_range("2000-01-01", periods=len(yt), freq="H")
        t_fixed[lt] = pd.to_datetime(t)

    # --- 그리기 ---
    plt.close('all')
    fig = plt.figure(figsize=(20, 15))
    fig.suptitle(
        f'{station_name} - {event_label}: Predicted Vs Actual\n'
        f'{scenario_label} (Lead time = {", ".join(map(str, lead_times))} hr)',
        fontsize=45, fontweight='bold', y=0.98
    )

    # 위: 시계열
    for i, lt in enumerate(lead_times, start=1):
        ax = fig.add_subplot(2, 3, i)
        tt = t_fixed[lt]
        yt = np.asarray(y_true_by_lt[lt]).reshape(-1)
        yp = np.asarray(y_pred_by_lt[lt]).reshape(-1)

        ax.plot(tt, yt, label='Actual',  linewidth=2, color='blue')
        ax.plot(tt, yp, label='Predicted', linewidth=2, color='red')

        ax.set_title(f'Lead Time = {lt} hr', fontsize=35, fontweight='bold', pad=10)
        ax.set_xlabel('Date', fontsize=30, fontweight='bold')
        ax.set_ylabel('Water Level(EL.m)', fontsize=30, fontweight='bold')

        ax.xaxis.set_major_locator(mdates.DayLocator(interval=day_interval))
        ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))

        if y_limits is not None:
            ax.set_ylim(*y_limits)

        ax.tick_params(axis='x', labelsize=25, rotation=45)
        ax.tick_params(axis='y', labelsize=25)
        ax.grid(True)

        if i == 1:
            ax.legend(fontsize=18)

    # 아래: 산점도 + NSE (y=x만 표시)
    for j, lt in enumerate(lead_times, start=4):
        ax = fig.add_subplot(2, 3, j)
        yt = np.asarray(y_true_by_lt[lt]).reshape(-1)
        yp = np.asarray(y_pred_by_lt[lt]).reshape(-1)

        ax.scatter(yt, yp, s=18, alpha=0.8, color='blue')

        if y_limits is not None:
            lo, hi = y_limits
        else:
            lo = float(np.nanmin([yt.min(), yp.min()]))
            hi = float(np.nanmax([yt.max(), yp.max()]))

        ax.plot([lo, hi], [lo, hi], linewidth=2, color='red')  # 1:1 선
        ax.set_xlim(lo, hi)
        ax.set_ylim(lo, hi)

        # ✅ NSE 표시 (사용자 지정 시 대체)
        if custom_nse and lt in custom_nse:
            nse_val = custom_nse[lt]
        else:
            nse_val = _nse(yt, yp)

        ax.text(
            0.95, 0.05, f'NSE: {nse_val:.4f}',
            transform=ax.transAxes,
            fontsize=25, fontweight='bold',
            va='bottom', ha='right',
            bbox=dict(facecolor='white', alpha=0.7, edgecolor='none')
        )

        ax.set_xlabel('Actual(EL.m)', fontsize=30, fontweight='bold')
        ax.set_ylabel('Predicted(EL.m)', fontsize=30, fontweight='bold')
        ax.tick_params(axis='x', labelsize=25)
        ax.tick_params(axis='y', labelsize=25)
        ax.grid(True)

    plt.tight_layout(rect=[0, 0, 1, 0.95])

    # --- 저장 로직 ---
    saved_paths = []
    if save:
        if save_path is not None:
            # 지정 경로 그대로
            os.makedirs(os.path.dirname(save_path) or ".", exist_ok=True)
            fig.savefig(save_path, dpi=save_dpi, bbox_inches='tight',
                        facecolor='white', transparent=transparent)
            saved_paths.append(save_path)
        else:
            # 자동 파일명 생성
            def slug(s): return re.sub(r'[^0-9A-Za-z]+', '_', str(s)).strip('_')
            base = save_basename or f"{slug(station_name)}_{slug(event_label)}_{slug(scenario_label)}_LT{'-'.join(map(str, lead_times))}"
            os.makedirs(outdir, exist_ok=True)
            for fmt in save_formats:
                fp = os.path.join(outdir, f"{base}.{fmt}")
                fig.savefig(fp, dpi=save_dpi, bbox_inches='tight',
                            facecolor='white', transparent=transparent)
                saved_paths.append(fp)

    if show:
        plt.show()
    else:
        plt.close(fig)

    return saved_paths

In [None]:
# ---- LT=1~3: Event #1 (보통 Event#8)용 딕셔너리 구성 ----
y_true_by_lt_event1, y_pred_by_lt_event1, t_by_lt_event1 = {}, {}, {}

for lt in [1, 2, 3]:
    # 고달교 수위(열 index=2)만 사용
    y_true = test1[f"{prediction_steps}hr_actual"][:, 2]
    y_pred = predictions1_actual[f"step_{lt}"][:, 2]
    # 길이 맞춤
    n = min(len(y_true), len(y_pred))
    y_true = y_true[-n:]
    y_pred = y_pred[-n:]
    # 시간 인덱스 생성
    t_idx  = get_time_index(assigned_test[0], y_true)

    # 딕셔너리에 저장
    y_true_by_lt_event1[lt] = y_true
    y_pred_by_lt_event1[lt] = y_pred
    t_by_lt_event1[lt]      = t_idx

custom_nse_vals_1 = {1: 0.9939, 2: 0.9810, 3: 0.9627}    
    
# ---- 2x3(위: 시계열 / 아래: 산점도) 출력 ----
plot_lt13_ts_and_scatter(
    y_true_by_lt_event1,
    y_pred_by_lt_event1,
    t_by_lt_event1,
    station_name="Godal Bridge",
    event_label=assigned_test[0],
    scenario_label="Recursive Prediction (with Forecasted Rainfall)",
    save_path=f"RP_withForecast_{assigned_test[0]}_LT1-3.png",
    custom_nse=custom_nse_vals_1
)

# ---- LT=1~3: Event #2 (보통 Event#9)용 딕셔너리 구성 ----
y_true_by_lt_event2, y_pred_by_lt_event2, t_by_lt_event2 = {}, {}, {}

for lt in [1, 2, 3]:
    y_true = test2[f"{prediction_steps}hr_actual"][:, 2]               # 고달교
    y_pred = predictions2_actual[f"step_{lt}"][:, 2]
    n = min(len(y_true), len(y_pred))
    y_true = y_true[-n:]
    y_pred = y_pred[-n:]
    t_idx  = get_time_index(assigned_test[1], y_true)

    y_true_by_lt_event2[lt] = y_true
    y_pred_by_lt_event2[lt] = y_pred
    t_by_lt_event2[lt]      = t_idx

custom_nse_vals_2 = {1: 0.9798, 2: 0.9689, 3: 0.9574}    
    
plot_lt13_ts_and_scatter(
    y_true_by_lt_event2,
    y_pred_by_lt_event2,
    t_by_lt_event2,
    station_name="Godal Bridge",
    event_label=assigned_test[1],
    scenario_label="Recursive Prediction (with Forecasted Rainfall)",
    save_path=f"RP_withForecast_{assigned_test[1]}_LT1-3.png",
    day_interval=6,   # <-- 간격 크게
    custom_nse=custom_nse_vals_2
)
