In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import pandas as pd
import numpy as np
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from sklearn.preprocessing import MinMaxScaler

In [None]:
# 訓練檔案路徑
train_folder = '/content/drive/MyDrive/太陽能競賽/training_data'
test_file = '/content/drive/MyDrive/太陽能競賽/official_test/upload(no answer).csv'
output_file = f'{train_folder}/upload(no answer)-temp.csv'

In [None]:
# 設定超參數
LOOKBACK = 60  # 以過去60分鐘資料預測下一步
EPOCHS = 3    # 訓練回合數
BATCH_SIZE = 32
FEATURES = ["WindSpeed(m/s)", "Pressure(hpa)", "Temperature(°C)", "Humidity(%)", "Sunlight(Lux)"]

In [None]:
# 資料處理函數
def preprocess_train_data(file_path, lookback=LOOKBACK):
    df = pd.read_csv(file_path)
    df["DateTime"] = pd.to_datetime(df["DateTime"])
    df = df.sort_values("DateTime").reset_index(drop=True)
    df = df[FEATURES]

    # 正規化
    scaler = MinMaxScaler()
    data_scaled = scaler.fit_transform(df)

    # 製作LSTM輸入格式
    X, y = [], []
    for i in range(len(data_scaled) - lookback):
        X.append(data_scaled[i:i+lookback])
        y.append(data_scaled[i+lookback])
    return np.array(X), np.array(y), scaler

def preprocess_test_data(file_path, location_code):
    df = pd.read_csv(file_path)
    df["DateTime"] = pd.to_datetime(df["DATE"].astype(str).str[:12], format="%Y%m%d%H%M")
    df["LocationCode"] = df["DATE"].astype(str).str[-2:].astype(int)
    df = df[df["LocationCode"] == location_code]
    # 選擇FEATURES中的特徵，與訓練資料一致
    df = df[["DateTime"] + FEATURES] #  Include DateTime and FEATURES
    df = df.sort_values("DateTime").reset_index(drop=True) # Sort by DateTime
    # 移除 DateTime 欄位
    df = df[FEATURES] # Select only FEATURES
    return df.reset_index(drop=True)

# 模型建構函數
def build_model(input_shape):
    model = Sequential([
        LSTM(64, input_shape=input_shape, return_sequences=True),
        LSTM(32),
        Dense(len(FEATURES))
    ])
    model.compile(optimizer="adam", loss="mse")
    return model

In [None]:
# 主迴圈：針對每個地區與上下半年進行訓練與預測
results = []
for location in range(1, 18):
    for half in ["上半年", "下半年"]:
        # 選擇資料範圍
        file_path = f"{train_folder}/L{location}_Train.csv"
        df = pd.read_csv(file_path)
        if half == "上半年":
            df = df[df["DateTime"].str[:7].between("2024-01", "2024-06")]
        else:
            df = df[df["DateTime"].str[:7] >= "2024-07"]

        # 資料預處理
        X_train, y_train, scaler = preprocess_train_data(file_path)

        # 模型建立與訓練
        model = build_model(X_train.shape[1:])
        model.fit(X_train, y_train, epochs=EPOCHS, batch_size=BATCH_SIZE, verbose=1)

        # 預測
        test_data = preprocess_test_data(test_file, location)
        X_test = []
        for i in range(len(test_data) - LOOKBACK):
            X_test.append(test_data[i:i+LOOKBACK])
        X_test = np.array(X_test)
        predictions = model.predict(X_test)
        predictions = scaler.inverse_transform(predictions)

        # 儲存結果
        for i, pred in enumerate(predictions):
            results.append({
                "DATE": test_data.loc[i + LOOKBACK, "DATE"],
                "WindSpeed(m/s)": pred[0],
                "Pressure(hpa)": pred[1],
                "Temperature(°C)": pred[2],
                "Humidity(%)": pred[3],
                "Sunlight(Lux)": pred[4],
            })

# 將結果存成新的CSV檔
results_df = pd.DataFrame(results)
results_df.to_csv(output_file, index=False)
print(f"預測結果已儲存至 {output_file}")