In [None]:
%pip show numpy pandas matplotlib yfinance


In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
%pip install numpy==1.25.2 pandas==2.0.3 matplotlib==3.7.1 yfinance==0.2.38

需要套件

In [None]:
# Install Yahoo Finance package
%pip install yfinance

# Install HappyML
import os
from datetime import datetime, timedelta, date

# Import necessary libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import dateutil.parser as psr
import yfinance as yf
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
# Chinese Font Settings for Chart Plotting (Colab)
from matplotlib.font_manager import FontProperties
from tensorflow.keras.callbacks import TensorBoard, LearningRateScheduler, ReduceLROnPlateau
from tensorflow.keras.preprocessing.sequence import TimeseriesGenerator
from tensorflow.keras.models import Sequential
from tensorflow.keras import layers
from tensorflow.keras.optimizers import RMSprop


if not os.path.isdir("HappyML"):
  os.system("git clone https://github.com/cnchi/HappyML.git")

txt_path = "/content/drive/MyDrive/output.txt"

!fc-list :lang=zh family
!wget -O taipei_sans_tc_beta.ttf https://drive.google.com/uc?id=1eGAsTN1HBpJAkeVM57_C7ccp7hbgSz3_&export=download
!mv taipei_sans_tc_beta.ttf /usr/local/lib/python3.10/dist-packages/matplotlib/mpl-data/fonts/ttf
myfont = FontProperties(fname=r'/usr/local/lib/python3.10/dist-packages/matplotlib/mpl-data/fonts/ttf/taipei_sans_tc_beta.ttf')


決定預測股票與時間

In [None]:
# Download the Stock Data
stockID = input("請輸入台灣股票名稱、或代號：")
#2017-01-01
startDate = psr.parse(input("請輸入查詢起始日期："))
#2024-05-23
endDate = psr.parse(input("請輸入查詢截止日期："))
stockQuery = "{}.TW".format(stockID)
dataset = yf.download(stockQuery, start=startDate.strftime("%Y-%m-%d"), end=endDate.strftime("%Y-%m-%d"))




模型訓練

In [None]:
def lstm(train_size, val_size, win_size, sample_step, win_moving, data_batch, train_epoch, cor_train_epoch, lr, unit1, unit2, unit3, dropout1, dropout2, dropout3, n):
    X = dataset.iloc[:-1, :]
    Y = dataset.iloc[1:, 3:4]


    # Preprocessing: Feature Scaling (Normalization) with MinMaxScaler
    scaler = MinMaxScaler(feature_range=(0, 1))
    X_scale = scaler.fit_transform(X)
    Y_scale = scaler.fit_transform(Y)


    # Preprocessing: Split Training & Testing Data
    X_train, X_val, X_test = np.split(X_scale,
                    [int(train_size * len(X_scale)), int((train_size + val_size) * len(X_scale))])
    Y_train, Y_val, Y_test = np.split(Y_scale,
                    [int(train_size * len(Y_scale)), int((train_size + val_size) * len(Y_scale))])


    # Preprocessing: Generate Recurrent Data
    train_set = TimeseriesGenerator(
            data=X_train,
            targets=Y_train,
            length=win_size,
            sampling_rate=sample_step,
            stride=win_moving,
            batch_size=data_batch)

    val_set = TimeseriesGenerator(
            data=X_val,
            targets=Y_val,
            length=win_size,
            sampling_rate=sample_step,
            stride=win_moving,
            batch_size=data_batch)

    test_set = TimeseriesGenerator(
            data=X_test,
            targets=Y_test,
            length=win_size,
            sampling_rate=sample_step,
            stride=win_moving,
            batch_size=data_batch)


        # Create Model
    model = Sequential()
    model.add(layers.LSTM(units=unit1, return_sequences=True, input_shape=(win_size, X.shape[1])))
    model.add(layers.Dropout(dropout1))
    model.add(layers.LSTM(units=unit2, return_sequences=True))
    model.add(layers.Dropout(dropout2))
    model.add(layers.LSTM(units=unit3))
    model.add(layers.Dropout(dropout3))
    model.add(layers.Dense(Y.shape[1], activation="linear"))
    model.compile(optimizer=RMSprop(learning_rate=lr), loss="mse", metrics=["mse"])

    # Learning Rate Scheduler
    def scheduler(epoch, lr):

        if epoch % 50 == 0 and epoch != 0:
            return lr * 0.85
        return lr

    lr_scheduler = LearningRateScheduler(scheduler)
    # reduce_lr_on_plateau = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=10, min_lr=1e-6, verbose=1)

    # Load the TensorBoard notebook extension
    %load_ext tensorboard
    logdir = os.path.join("logs", datetime.now().strftime("%Y%m%d-%H%M%S"))
    tensorboard_callback = TensorBoard(logdir, histogram_freq=1)
    # Start the TensorBoard
    %tensorboard --logdir logs


    # Train Model
    model.fit(train_set, validation_data=val_set, epochs=train_epoch, callbacks=[tensorboard_callback])
    print("first train")
    # # Train Model (After Correction)
    model.fit(train_set, validation_data=val_set, epochs=cor_train_epoch)

    # In[] Model Evaluation
    test_loss, test_mse = model.evaluate(test_set)

    # Get the Predict Price (with date)
    Y_pred = model.predict(test_set)
    Y_pred_price = scaler.inverse_transform(Y_pred)
    Y_pred_price = pd.DataFrame(data=Y_pred_price)
    Y_pred_price.columns = ["收盤價"]
    Y_pred_price.index = dataset.index[-len(Y_pred_price):].strftime("%Y-%m-%d").tolist()

    # Get the Real Price (with date)
    Y_real_price = dataset.iloc[-len(Y_pred_price):]["Close"].to_frame()
    Y_real_price.columns = ["收盤價"]
    Y_real_price.index = dataset.index[-len(Y_pred_price):].strftime("%Y-%m-%d").tolist()


    # Get Trade Day
    this_trade_day = dataset.index[-1].to_pydatetime()
    next_trade_day = this_trade_day + timedelta(days=1)
    if next_trade_day.isoweekday() in set((6, 7)):
        next_trade_day += timedelta(days=8-next_trade_day.isoweekday())

    # Show Predict Price
    lookback_data = [[]]
    for i in range(-win_size, 0):
        lookback_data[0].append(X_scale[i].tolist())

    lookback_data = np.reshape(lookback_data, (1, win_size, X_scale.shape[1]))
    tomorrow_pred = scaler.inverse_transform(model.predict(lookback_data))



    if next_trade_day.date() < date.today():
        next_trade_day_end = next_trade_day + timedelta(days=1)
        tomorrow_real = yf.download(stockQuery,
                      start=next_trade_day.date().strftime("%Y-%m-%d"),
                      end=next_trade_day_end.date().strftime("%Y-%m-%d"))
        while tomorrow_real.empty:
          next_trade_day += timedelta(days=1)
          next_trade_day_end = next_trade_day + timedelta(days=1)
          tomorrow_real = yf.download(stockQuery,
                        start=next_trade_day.date().strftime("%Y-%m-%d"),
                        end=next_trade_day_end.date().strftime("%Y-%m-%d"))

    content_to_write = """
        __________________
        第{}次嘗試
        超參數：
        train_size = {},
        val_size = {},
        win_size = {},
        sample_step = {},
        win_moving = {},
        data_batch = {},
        train_epoch = {},
        cor_train_epoch = {},
        lr = {},
        unit1 = {},
        unit2 = {},
        unit3 = {},
        dropout1 = {},
        dropout2 = {},
        dropout3 = {}
        結果：
        Loss of Test Set:, {}
        MSE of Test Set:, {}
        預測收盤價 ----------
        最後一日（{}）：{:.2f}
        次交易日（{}）：{:.2f}
        預測漲跌：{:+.2f}
        次交易日（{}）：{:.2f}
        真實漲跌：{:+.2f}
        __________________
        """.format(n, train_size, val_size, win_size, sample_step, win_moving, data_batch, train_epoch, cor_train_epoch, lr, unit1, unit2, unit3, dropout1, dropout2, dropout3, test_loss, test_mse,
          this_trade_day.strftime("%Y/%m/%d"), Y_pred_price.iloc[-1][0], next_trade_day.strftime("%Y/%m/%d"), tomorrow_pred[0][0],
          (tomorrow_pred[0][0] - Y_pred_price.iloc[-1][0]), next_trade_day.strftime("%Y/%m/%d"), tomorrow_real.iloc[0]["Close"],
          (tomorrow_real.iloc[0]["Close"] - dataset.iloc[-1]["Close"])
                   )
    print(content_to_write)
    with open(txt_path, 'a') as f:
      f.write(content_to_write + "\n")

    return model, scaler, train_set, val_set, test_set, X_scale, Y_scale

複製這段然後修改每個變數，讓你可以一次多跑一些

In [None]:
n = 0
for a in range(16, 32, 8):
  for b in range(1, 4, 1):
    for c in range (5, 10, 5):
      for d in range (300, 500, 100):
        for e in range (20, 32, 12):
          model, scaler, train_set, val_set, test_set, X_scale, Y_scale = lstm(
              train_size = 0.6,
              val_size = 0.2,
              win_size = a,
              sample_step = 1,
              win_moving = 10,
              data_batch = e,
              train_epoch = d,
              cor_train_epoch = 250,
              lr = 0.01,
              unit1 = 64,
              unit2 = 64,
              unit3 = 64,
              dropout1 = 0.2,
              dropout2 = 0.2,
              dropout3 = 0.2,
              n = n  # n
              )
          n = n + 1

In [None]:
            model, scaler, train_set, val_set, test_set, X_scale, Y_scale = lstm(
              train_size = 0.6,
              val_size = 0.2,
              win_size = 32,
              sample_step = 1,
              win_moving = 5,
              data_batch = 20,
              train_epoch = 500,
              cor_train_epoch = 250,
              lr = 0.01,
              unit1 = 64,
              unit2 = 64,
              unit3 = 64,
              dropout1 = 0.2,
              dropout2 = 0.2,
              dropout3 = 0.2,
              n = 1  # n
              )

In [None]:
# Get the Predict Price (with date)
Y_pred = model.predict(test_set)
Y_pred_price = scaler.inverse_transform(Y_pred)
Y_pred_price = pd.DataFrame(data=Y_pred_price)
Y_pred_price.columns = ["收盤價"]
Y_pred_price.index = dataset.index[-len(Y_pred_price):].strftime("%Y-%m-%d").tolist()

# Get the Real Price (with date)
Y_real_price = dataset.iloc[-len(Y_pred_price):]["Close"].to_frame()
Y_real_price.columns = ["收盤價"]
Y_real_price.index = dataset.index[-len(Y_pred_price):].strftime("%Y-%m-%d").tolist()

# Plot the predict vs. real price
import matplotlib.ticker as ticker

fig, ax = plt.subplots(1, 1)

ax.plot(Y_pred_price, color="blue", label="Predict")
ax.plot(Y_real_price, color="red", label="Real")

tick_spacing = 5
ax.xaxis.set_major_locator(ticker.MultipleLocator(tick_spacing))
fig.autofmt_xdate()

plt.title("{} 收盤價盲測結果".format(stockID), fontproperties=myfont)
plt.xlabel("日期", fontproperties=myfont)
plt.ylabel("收盤價", fontproperties=myfont)
plt.legend(loc="best")
plt.show()

In [None]:
            model, scaler, train_set, val_set, test_set, X_scale, Y_scale = lstm(
              train_size = 0.6,
              val_size = 0.2,
              win_size = 30,
              sample_step = 1,
              win_moving = 1,
              data_batch = 16,
              train_epoch = 300,
              cor_train_epoch = 100,
              lr = 0.0001,
              unit1 = 64,
              unit2 = 64,
              unit3 = 16,
              dropout1 = 0.2,
              dropout2 = 0.2,
              dropout3 = 0.2,
              n = 1  # n
              )


畫出預測結果圖

In [None]:

# Get the Predict Price (with date)
Y_pred = model.predict(test_set)
Y_pred_price = scaler.inverse_transform(Y_pred)
Y_pred_price = pd.DataFrame(data=Y_pred_price)
Y_pred_price.columns = ["收盤價"]
Y_pred_price.index = dataset.index[-len(Y_pred_price):].strftime("%Y-%m-%d").tolist()

# Get the Real Price (with date)
Y_real_price = dataset.iloc[-len(Y_pred_price):]["Close"].to_frame()
Y_real_price.columns = ["收盤價"]
Y_real_price.index = dataset.index[-len(Y_pred_price):].strftime("%Y-%m-%d").tolist()

# Plot the predict vs. real price
import matplotlib.ticker as ticker

fig, ax = plt.subplots(1, 1)

ax.plot(Y_pred_price, color="blue", label="Predict")
ax.plot(Y_real_price, color="red", label="Real")

tick_spacing = 5
ax.xaxis.set_major_locator(ticker.MultipleLocator(tick_spacing))
fig.autofmt_xdate()

plt.title("{} 收盤價盲測結果".format(stockID), fontproperties=myfont)
plt.xlabel("日期", fontproperties=myfont)
plt.ylabel("收盤價", fontproperties=myfont)
plt.legend(loc="best")
plt.show()

比較預測收盤價與真實收盤價