In [None]:
import pandas as pd
import numpy as np
import glob
import cv2
from datetime import datetime
import os
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.callbacks import EarlyStopping
from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.tsa.stattools import adfuller
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score

In [None]:
#import time

#while True:
#    print("세션을 유지하는 중...")
#    time.sleep(60)  # 60초마다 메시지 출력

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# 이미지와 정답 데이터 경로 설정(colab)
image_folder = '/content/drive/MyDrive/Colab Notebooks/시계열/sun_spot_images/train_images'  # 흑점 이미지 데이터 폴더
test_image_folder = '/content/drive/MyDrive/Colab Notebooks/시계열/sun_spot_images/test_images'  # 흑점 이미지 데이터 폴더 _ TEST
csv_file = '/content/drive/MyDrive/Colab Notebooks/시계열/daily_sunspots_time_series_1850-01_2024-05.csv'  # 정답 데이터 CSV 파일



In [None]:
# 이미지와 정답 데이터 경로 설정(local)
image_folder = r'C:\Users\JinSeong\jupyter_notebook\files\train_images'  # 흑점 이미지 데이터 폴더
test_image_folder = r'C:\Users\JinSeong\jupyter_notebook\files\test_images'  # 흑점 이미지 데이터 폴더 _ TEST
csv_file = r'C:\Users\JinSeong\jupyter_notebook\files\daily_sunspots_time_series_1850-01_2024-05.csv'  # 정답 데이터 CSV 파일



In [None]:
# 정답 데이터 로드 및 필터링
sunspot_data = pd.read_csv(csv_file)
sunspot_data.head()

In [None]:
train_data = sunspot_data[(sunspot_data['date'] >= '2010-08-12') & (sunspot_data['date'] <= '2024-03-31')]
test_data = sunspot_data[(sunspot_data['date'] >= '2024-04-01') & (sunspot_data['date'] <= '2024-04-30')]
print(train_data.head())
print()
print(test_data.head())

In [None]:
def add_gaussian_noise(image, mean=0, std=25):
    gaussian_noise = np.random.normal(mean, std, image.shape).astype(np.float32)
    noisy_image = cv2.add(image.astype(np.float32), gaussian_noise)
    return np.clip(noisy_image, 0, 255).astype(np.uint8)

In [None]:
# 이미지 파일 리스트 로드
image_files = sorted(glob.glob(f"{image_folder}/*.jpg"))
image_data = []

for file in image_files:
    # 이미지 파일명에서 날짜 추출 (예: YYYY-MM-DD 형식일 경우)
    date_str = os.path.basename(file).split('.')[0]  # 파일명에서 확장자 제거

    if date_str in train_data['date'].values:
        # 이미지를 로드 (전처리된 이미지 사용)
        img = cv2.imread(file, cv2.IMREAD_GRAYSCALE)  # 필요에 따라 색상 형식 수정
        noisy_image = add_gaussian_noise(img) #가우시안 노이즈 추가
        reg_img = noisy_image / 255.0  # 이미지 크기 조정 및 정규화
        image_data.append((date_str, reg_img))

# DataFrame으로 변환하여 날짜 매칭
image_df = pd.DataFrame(image_data, columns=['date', 'image'])
train_data = pd.merge(train_data, image_df, on='date')


In [None]:
# 테스트 이미지 파일 리스트 로드
test_image_files = sorted(glob.glob(f"{test_image_folder}/*.jpg"))
test_image_data = []

for file in test_image_files:
    # 이미지 파일명에서 날짜 추출 (예: YYYY-MM-DD 형식일 경우)
    date_str = os.path.basename(file).split('.')[0]  # 파일명에서 확장자 제거

    if date_str in test_data['date'].values:
        # 이미지를 로드 (전처리된 이미지 사용)
        img = cv2.imread(file, cv2.IMREAD_GRAYSCALE)  # 필요에 따라 색상 형식 수정
        noisy_image = add_gaussian_noise(img) #가우시안 노이즈 추가
        reg_img = noisy_image / 255.0  # 이미지 크기 조정 및 정규화
        test_image_data.append((date_str, reg_img))

# DataFrame으로 변환하여 날짜 매칭
test_image_df = pd.DataFrame(test_image_data, columns=['date', 'image'])
test_data = pd.merge(test_data, test_image_df, on='date')

In [None]:
print(len(image_data))
print(test_data.head())

In [None]:
# 시계열 분해 (추세, 계절성, 잔차로 분해)
decomposition = seasonal_decompose(train_data['counts'], model='additive', period=3)
trend = decomposition.trend
seasonal = decomposition.seasonal
residual = decomposition.resid

# 시각화
plt.figure(figsize=(10, 8))
plt.subplot(4, 1, 1)
plt.plot(train_data['counts'], label='Original')
plt.legend(loc='upper left')
plt.title('Original Time Series')

plt.subplot(4, 1, 2)
plt.plot(trend, label='Trend')
plt.legend(loc='upper left')
plt.title('Trend')

plt.subplot(4, 1, 3)
plt.plot(seasonal, label='Seasonality')
plt.legend(loc='upper left')
plt.title('Seasonality')

plt.subplot(4, 1, 4)
plt.plot(residual, label='Residuals')
plt.legend(loc='upper left')
plt.title('Residuals')

plt.tight_layout()
plt.show()

# 정상성 검정
print("ADF Test for Original Data:")
adf_result = adfuller(train_data['counts'].dropna())
print(f"ADF Statistic: {adf_result[0]}")
print(f"p-value: {adf_result[1]}")
print("Critical Values:")
for key, value in adf_result[4].items():
    print(f"   {key}: {value}")

In [None]:
train_data.to_pickle(r'C:\Users\JinSeong\jupyter_notebook\files\train_data_pre.pkl')
test_data.to_pickle(r'C:\Users\JinSeong\jupyter_notebook\files\test_data_pre.pkl')

In [None]:
train_data = pd.read_pickle(r'C:\Users\JinSeong\jupyter_notebook\files\train_data_pre.pkl')
test_data = pd.read_pickle(r'C:\Users\JinSeong\jupyter_notebook\files\test_data_pre.pkl')

In [None]:
# CNN 모델 정의
cnn_model = models.Sequential([
    layers.Conv2D(32, (3, 3), activation='relu', input_shape=(256, 256, 1)),
    layers.BatchNormalization(),
    layers.MaxPooling2D((2, 2)),
    layers.Dropout(0.2),

    layers.Conv2D(64, (3, 3), activation='relu'),
    layers.BatchNormalization(),
    layers.MaxPooling2D((2, 2)),
    layers.Dropout(0.2),

    layers.Conv2D(128, (3, 3), activation='relu'),
    layers.BatchNormalization(),
    layers.MaxPooling2D((2, 2)),
    layers.Dropout(0.2),

    layers.Conv2D(256, (3, 3), activation='relu'),
    layers.BatchNormalization(),
    layers.MaxPooling2D((2, 2)),
    layers.Dropout(0.2),

    layers.Flatten(),
    layers.Dense(256, activation='relu'),
    layers.Dropout(0.2),
    layers.Dense(1)  # 회귀 문제인 경우, 마지막 출력층의 유닛 수는 1
])

early_stopping = EarlyStopping(
    monitor='val_loss',  # 검증 손실을 모니터링
    patience=5,          # 5 에포크 동안 개선이 없으면 중단
    restore_best_weights=True  # 가장 좋은 가중치 복원
) # EarlyStopping 콜백 정의

image_data = np.stack(train_data['image'].values)  # Stack images into a 4D array
image_data = image_data.reshape(image_data.shape[0], 256, 256, 1) # Reshape to (num_samples, height, width, channels)

# # 옵티마이저 설정 (Adam + 스케줄링)
# initial_lr = 0.001
# lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
#     initial_learning_rate=initial_lr, decay_steps=10000, decay_rate=0.9)
# optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule)

cnn_model.compile(optimizer='adam', loss='mean_squared_error', metrics=['mean_absolute_error'])  # 회귀용 손실 함수와 지표
cnn_model.fit(
    image_data,
    train_data['counts'],
    epochs=100,           # 충분히 많은 에포크로 설정
    batch_size=32,
    validation_split=0.2, # 훈련 데이터의 20%를 검증 데이터로 사용
    callbacks=[early_stopping]  # EarlyStopping 콜백 추가
)

feature_extractor = models.Sequential(cnn_model.layers[:-2])  # 마지막 레이어 제외

In [None]:
# CNN 특징 추출
features = []
for img in train_data['image']:
    img = img.reshape(1, 256, 256, 1)
    feature = feature_extractor.predict(img)
    features.append(feature[0])

# 추출한 특징을 DataFrame에 추가
train_data['features'] = features

In [None]:
train_data.head()

In [None]:
# TEST Data CNN 특징 추출
test_features = []

for img in test_data['image']:
    img = img.reshape(1, 256, 256, 1)
    feature = feature_extractor.predict(img)
    test_features.append(feature[0])

# 추출한 특징을 DataFrame에 추가
test_data['features'] = test_features

In [None]:
test_data.head()

In [None]:
train_data.to_pickle('/content/drive/MyDrive/Colab Notebooks/시계열/train_data.pkl')

In [None]:
train_data = pd.read_pickle('/content/drive/MyDrive/Colab Notebooks/시계열/sun_spot_images/pki/train_data.pkl')

In [None]:
train_data.to_pickle(r'C:\Users\JinSeong\jupyter_notebook\files\train_data.pkl')

In [None]:
train_data = pd.read_pickle(r'C:\Users\JinSeong\jupyter_notebook\files\train_data.pkl')

In [None]:
train_data.tail()

In [None]:
test_data.to_pickle('/content/drive/MyDrive/Colab Notebooks/이기언_강사님_자료/프로젝트/시계열/test_data.pkl')

In [None]:
test_data = pd.read_pickle('/content/drive/MyDrive/Colab Notebooks/시계열/sun_spot_images/pki/test_data.pkl')

In [None]:
test_data.to_pickle(r'C:\Users\JinSeong\jupyter_notebook\files\test_data.pkl')

In [None]:
test_data = pd.read_pickle(r'C:\Users\JinSeong\jupyter_notebook\files\test_data.pkl')

In [None]:
test_data.tail()

In [None]:
features = train_data['features']

In [None]:
# 시계열 데이터를 위한 입력 생성
time_step = 5  # time_step 5 10 30
learning_rate = 0.01 # 0.01 0.001 0.0001
batch_size = 64 # 16 32 64
node = 128 #32 64 128
X_train, y_train = [], []

for i in range(len(features) - time_step):
    X_train.append(np.stack(features[i:i+time_step]))
    y_train.append(train_data['counts'].iloc[i+time_step])

X_train, y_train = np.array(X_train), np.array(y_train)

print(X_train.shape)

In [None]:
# RNN 모델
rnn_model = models.Sequential([
    layers.SimpleRNN(node, activation='relu', return_sequences=False, input_shape=(time_step, X_train.shape[2])),
    # layers.Dropout(0.2),
    # layers.BatchNormalization(),
    # layers.SimpleRNN(64, activation='relu', return_sequences=True),
    # layers.Dropout(0.2),
    # layers.BatchNormalization(),
    # layers.SimpleRNN(32, activation='relu'),
    # layers.Dropout(0.2),
    layers.Dense(1)
])

# LSTM 모델
# lstm_model = models.Sequential([
#     layers.LSTM(256, activation='relu', return_sequences=True, input_shape=(time_step, X_train.shape[2])),
#     # layers.Dropout(0.2),
#     layers.BatchNormalization(),
#     layers.LSTM(128, activation='relu', return_sequences=True),
#     # layers.Dropout(0.2),
#     layers.BatchNormalization(),
#     layers.LSTM(64, activation='relu', return_sequences=True),
#     # layers.Dropout(0.2),
#     layers.BatchNormalization(),
#     layers.LSTM(32, activation='relu'),
#     # layers.Dropout(0.2),
#     layers.Dense(1)
# ])

rnn_model.summary()
# lstm_model.summary()

In [None]:
# 학습과 검증 데이터의 비율을 정합니다 (예: 80%는 학습, 20%는 검증).
train_size = int(len(X_train) * 0.8)

# 시계열 데이터 순서를 유지한 채 학습 및 검증 데이터로 분할합니다.
X_tra, X_val = X_train[:train_size], X_train[train_size:]
y_tra, y_val = y_train[:train_size], y_train[train_size:]

# 옵티마이저 설정 (Adam + 스케줄링)
initial_lr = learning_rate
# lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
#     initial_learning_rate=initial_lr, decay_steps=10000, decay_rate=0.9)
optimizer_rnn = tf.keras.optimizers.Adam(learning_rate=initial_lr)
# optimizer_lstm = tf.keras.optimizers.Adam(learning_rate=lr_schedule)

# 모델 컴파일
rnn_model.compile(optimizer=optimizer_rnn, loss='mse')
# lstm_model.compile(optimizer='adam', loss='mse')

# 조기 종료 설정
early_stopping_rnn = EarlyStopping(
    monitor='val_loss',      # 모니터할 지표 (검증 손실)
    patience=5,             # 10 에포크 동안 개선되지 않으면 종료
    min_delta=0.001,         # 0.001 이상의 개선만 인정
    restore_best_weights=True  # 가장 좋았던 가중치 복원)
)
# early_stopping_lstm = EarlyStopping(
#     monitor='val_loss',      # 모니터할 지표 (검증 손실)
#     patience=5,             # 10 에포크 동안 개선되지 않으면 종료
#     min_delta=0.001,         # 0.001 이상의 개선만 인정
#     restore_best_weights=True  # 가장 좋았던 가중치 복원)
# )

# 모델 학습
rnn_history = rnn_model.fit(X_tra, y_tra, epochs=100, batch_size=batch_size, validation_data=(X_val, y_val), callbacks=[early_stopping_rnn])
# lstm_history = lstm_model.fit(X_tra, y_tra, epochs=100, batch_size=batch_size, validation_data=(X_val, y_val), callbacks=[early_stopping_lstm])

In [None]:
# 학습 및 검증 손실 그래프
plt.figure(figsize=(10, 5))
plt.plot(rnn_history.history['loss'], label='Train Loss')
plt.plot(rnn_history.history['val_loss'], label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('RNN Train vs Validation Loss')
plt.legend()
plt.show()

In [None]:
# # 학습 및 검증 손실 그래프
# plt.figure(figsize=(10, 5))
# plt.plot(lstm_history.history['loss'], label='Train Loss')
# plt.plot(lstm_history.history['val_loss'], label='Validation Loss')
# plt.xlabel('Epochs')
# plt.ylabel('Loss')
# plt.title('LSTM Train vs Validation Loss')
# plt.legend()
# plt.show()

In [None]:
test_features = test_data['features']

In [None]:
def predict_future(model, train, test_features, day, time_step):
    future_predictions = []
    use_predict_data = X_train[-1].reshape(1, time_step, train.shape[2])
    next_pred = model.predict(use_predict_data)
    future_predictions.append(next_pred)

    for i in range(day - 1):
        use_predict_data = np.append(use_predict_data[-1,1:], test_features[i])
        use_predict_data = use_predict_data.reshape(1, time_step, train.shape[2])
        next_pred = model.predict(use_predict_data)
        future_predictions.append(next_pred)

    return future_predictions

future_predictions_rnn = predict_future(rnn_model, X_train, test_features, 30, time_step)
# future_predictions_lstm = predict_future(lstm_model, X_train, test_features, 30, time_step)


In [None]:
# 모든 배열을 하나의 NumPy 배열로 결합
combined_array = np.concatenate(future_predictions_rnn, axis=0)
flat_list_rnn = combined_array.flatten().tolist()
print(flat_list_rnn)
# # 모든 배열을 하나의 NumPy 배열로 결합
# combined_array = np.concatenate(future_predictions_lstm, axis=0)
# flat_list_lstm = combined_array.flatten().tolist()
# print(flat_list_lstm)

In [None]:
print(test_data['counts'])

In [None]:
# 시각화
plt.figure(figsize=(15, 6))
plt.plot(test_data['counts'], label='Actual Sunspot Counts')
plt.plot(flat_list_rnn, label='RNN Predicted Sunspot Counts')
# plt.plot(flat_list_lstm, label='LSTM Predicted Sunspot Counts')
plt.xlabel('Days')
plt.ylabel('Sunspot Counts')
plt.title('Actual vs. Predicted Sunspot Counts (RNN and LSTM)')
plt.legend()
plt.grid(True)
plt.show()

In [None]:
def calculate_metrics(y_pred):
    metrics = {
        'MAE': "{:.4f}".format(mean_absolute_error(test_data['counts'], y_pred)),
        'MSE': "{:.4f}".format(mean_squared_error(test_data['counts'], y_pred)),
        'R2': "{:.4f}".format(r2_score(test_data['counts'], y_pred))
    }
    return metrics


In [None]:
print(calculate_metrics(flat_list_rnn))
# print(calculate_metrics(flat_list_lstm))