<a href="https://colab.research.google.com/github/fender8185/Stock-predict/blob/main/lstm3stock.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [92]:
pip install FinMind==1.5.4




In [93]:
import requests
import pandas as pd
from datetime import datetime, timedelta ,date
import numpy as np
from FinMind.data import DataLoader


In [94]:

url = 'https://api.finmindtrade.com/api/v4/taiwan_stock_trading_daily_report'
token="your token"
stock_id="2330"
bp = {"data_id": stock_id,"token": token}
# 設定開始和結束日期
start_date = datetime(2022, 8, 1)
current_date=start_date
end_date = datetime.today()- timedelta(days=1)
all_data_frames = []  # 儲存每天的數據
all_brokers = set()   # 儲存所有出現過的券商ID



In [95]:

while current_date <= end_date:
    # 根據當前日期設定API請求參數
    current_parameters = {**bp, "date": current_date.strftime('%Y-%m-%d')}
    print(current_date)
    # 進行API請求
    data = requests.get(url, params=current_parameters)
    data = data.json()
    df = pd.DataFrame(data['data'])

    # 如果當天沒有數據，則跳過並進入下一天
    if df.empty:
        current_date += timedelta(days=1)
        continue

    # 計算每家券商每天的平均買價、平均賣價、總買量和總賣量
    df['buy_weighted_price'] = df['price'] * df['buy']
    df['sell_weighted_price'] = df['price'] * df['sell']

    # 對日期和券商ID進行分組，並計算所需的統計值
    grouped = df.groupby(['date', 'securities_trader_id']).agg(
        avg_buy_price=('buy_weighted_price', 'sum'),
        avg_sell_price=('sell_weighted_price', 'sum'),
        total_volume=('buy', 'sum'),
        total_sell=('sell', 'sum')
    ).reset_index()

    # 計算平均買價和平均賣價
    grouped['avg_buy_price'] = grouped['avg_buy_price'] / grouped['total_volume']
    grouped['avg_sell_price'] = grouped['avg_sell_price'] / grouped['total_sell']

    # 更新所有出現過的券商ID
    all_brokers.update(df['securities_trader_id'].unique())

    # 將當天的數據添加到列表中
    all_data_frames.append(grouped)

    # 移至下一天
    current_date += timedelta(days=1)

# 將所有天的數據合併成一個DataFrame
final_data = pd.concat(all_data_frames)
print(final_data)
# 轉換數據為三維張量格式
tensor_data = []
for feature in ['avg_buy_price', 'avg_sell_price', 'total_volume', 'total_sell']:
    # 對日期和券商ID進行透視，以獲得特定特徵的數據
    pivot_data = final_data.pivot(index='date', columns='securities_trader_id', values=feature)

    # 確保所有券商都在列中，並填充缺失值為0
    pivot_data = pivot_data.reindex(columns=list(all_brokers)).fillna(0)

    # 將數據添加到列表中
    tensor_data.append(pivot_data.values)

# 將列表的數據堆疊成三維numpy數組
tensor_data = np.stack(tensor_data, axis=-1)

print(tensor_data.shape)  # 打印張量的形狀


# 初始化DataLoader
params = {
        'stock_id': stock_id,
        'start_date': start_date.date(),
        'end_date': end_date.date()
    }


dl = DataLoader()
dl.login_by_token(api_token=token)

# 使用DataLoader獲取股票的日常數據
stock_data = dl.taiwan_stock_daily(**params)

#print(stock_data)
# 提取close價格
close_prices = stock_data[['date', 'close']]
print(close_prices)
# 設置日期為索引
close_prices.set_index('date', inplace=True)

#print(close_prices)
# 轉換為numpy數組
y_values = close_prices['close'].values
print(len(y_values))
print(y_values)


2022-08-01 00:00:00
2022-08-02 00:00:00
2022-08-03 00:00:00
2022-08-04 00:00:00
2022-08-05 00:00:00
2022-08-06 00:00:00
2022-08-07 00:00:00
2022-08-08 00:00:00
2022-08-09 00:00:00
2022-08-10 00:00:00
2022-08-11 00:00:00
2022-08-12 00:00:00
2022-08-13 00:00:00
2022-08-14 00:00:00
2022-08-15 00:00:00
2022-08-16 00:00:00
2022-08-17 00:00:00
2022-08-18 00:00:00
2022-08-19 00:00:00
2022-08-20 00:00:00
2022-08-21 00:00:00
2022-08-22 00:00:00
2022-08-23 00:00:00
2022-08-24 00:00:00
2022-08-25 00:00:00
2022-08-26 00:00:00
2022-08-27 00:00:00
2022-08-28 00:00:00
2022-08-29 00:00:00
2022-08-30 00:00:00
2022-08-31 00:00:00
2022-09-01 00:00:00
2022-09-02 00:00:00
2022-09-03 00:00:00
2022-09-04 00:00:00
2022-09-05 00:00:00
2022-09-06 00:00:00
2022-09-07 00:00:00
2022-09-08 00:00:00
2022-09-09 00:00:00
2022-09-10 00:00:00
2022-09-11 00:00:00
2022-09-12 00:00:00
2022-09-13 00:00:00
2022-09-14 00:00:00
2022-09-15 00:00:00
2022-09-16 00:00:00
2022-09-17 00:00:00
2022-09-18 00:00:00
2022-09-19 00:00:00


In [96]:

# 從final_data中提取日期
dates_in_final_data = final_data['date'].unique()

# 使用這些日期來過濾close_prices
filtered_close_prices = close_prices[close_prices.index.isin(dates_in_final_data)]

# 轉換過濾後的close_prices為numpy數組
y_values_filtered = filtered_close_prices['close'].values

# 確保X和Y的形狀一致
assert tensor_data.shape[0] == len(y_values_filtered)

print(tensor_data.shape)  # 打印X的形狀
print(len(y_values_filtered))  # 打印Y的長度


(266, 895, 4)
266


In [97]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler

# 將y_values_filtered向前移動一天
y_shifted = np.roll(y_values_filtered, -1)

# 移除y_shifted的最後一筆數據
y_shifted = y_shifted[:-1]

# 移除tensor_data的最後一筆數據
X = tensor_data[:-1]

# 確保X和y_shifted的長度一致
assert X.shape[0] == len(y_shifted)

# 初始化scaler
scaler_X = MinMaxScaler()
scaler_y = MinMaxScaler()

# 將X和y_shifted重塑為2D，以便可以使用scaler
X_2D = X.reshape(-1, X.shape[-1])
y_2D = y_shifted.reshape(-1, 1)

# 擬合scaler到X和y_shifted並轉換
X_scaled = scaler_X.fit_transform(X_2D)
y_scaled = scaler_y.fit_transform(y_2D)

# 將X和y_shifted重塑回原始形狀
X = X_scaled.reshape(X.shape[0], X.shape[1], X.shape[2])
y_shifted = y_scaled.flatten()

# 分割資料
X_train, X_test, y_train, y_test = train_test_split(X, y_shifted, test_size=0.2, shuffle=False)

print("Training data shape:", X_train.shape)
print("Test data shape:", X_test.shape)
print("Training labels shape:", y_train.shape)
print("Test labels shape:", y_test.shape)


Training data shape: (212, 895, 4)
Test data shape: (53, 895, 4)
Training labels shape: (212,)
Test labels shape: (53,)


In [98]:
from keras.models import Sequential
from keras.layers import LSTM, Dense, Dropout
from keras.optimizers import Adam

# 設定學習率和時間步長
learning_rate = 0.0001  # 這裡可以調整學習率
#timesteps = 932  # 這裡可以調整時間步長
timesteps=X.shape[1]

# 重塑資料以匹配新的時間步長
X_train_reshaped = X_train.reshape((X_train.shape[0], timesteps, -1))
X_test_reshaped = X_test.reshape((X_test.shape[0], timesteps, -1))

print(X_train_reshaped.shape)
print(X_test_reshaped.shape)

# 資料維度
input_shape = (timesteps, X_train_reshaped.shape[2])

# 建立模型
model = Sequential()
model.add(LSTM(128, input_shape=input_shape, return_sequences=True))  # 增加到100個單元
#model.add(Dropout(0.1))
model.add(LSTM(256, return_sequences=False))
model.add(Dropout(0.1))
model.add(Dense(128, activation='relu'))  # 添加一個新的Dense層
model.add(Dense(1))

# 定義優化器並設定學習率
optimizer = Adam(learning_rate=learning_rate)

# 編譯模型
model.compile(optimizer=optimizer, loss='mean_squared_error')


from keras.callbacks import EarlyStopping
# 定義早停回調，並設定監控的指標為驗證集上的loss ('val_loss')，並設定耐心值
early_stopping = EarlyStopping(monitor='loss', patience=300)
# 在模型call早停
model.fit(X_train_reshaped, y_train, epochs=10000, batch_size=32, validation_data=(X_test_reshaped, y_test), verbose=1, callbacks=[early_stopping])

# 訓練模型
#model.fit(X_train_reshaped, y_train, epochs=10000, batch_size=32, validation_data=(X_test_reshaped, y_test), verbose=1)

# 評估模型
train_loss = model.evaluate(X_train_reshaped, y_train, verbose=0)
print(f"Training Loss: {train_loss:.4f}")

test_loss = model.evaluate(X_test_reshaped, y_test, verbose=0)
print(f"Test Loss: {test_loss:.4f}")


[1;30;43m串流輸出內容已截斷至最後 5000 行。[0m
Epoch 980/10000
Epoch 981/10000
Epoch 982/10000
Epoch 983/10000
Epoch 984/10000
Epoch 985/10000
Epoch 986/10000
Epoch 987/10000
Epoch 988/10000
Epoch 989/10000
Epoch 990/10000
Epoch 991/10000
Epoch 992/10000
Epoch 993/10000
Epoch 994/10000
Epoch 995/10000
Epoch 996/10000
Epoch 997/10000
Epoch 998/10000
Epoch 999/10000
Epoch 1000/10000
Epoch 1001/10000
Epoch 1002/10000
Epoch 1003/10000
Epoch 1004/10000
Epoch 1005/10000
Epoch 1006/10000
Epoch 1007/10000
Epoch 1008/10000
Epoch 1009/10000
Epoch 1010/10000
Epoch 1011/10000
Epoch 1012/10000
Epoch 1013/10000
Epoch 1014/10000
Epoch 1015/10000
Epoch 1016/10000
Epoch 1017/10000
Epoch 1018/10000
Epoch 1019/10000
Epoch 1020/10000
Epoch 1021/10000
Epoch 1022/10000
Epoch 1023/10000
Epoch 1024/10000
Epoch 1025/10000
Epoch 1026/10000
Epoch 1027/10000
Epoch 1028/10000
Epoch 1029/10000
Epoch 1030/10000
Epoch 1031/10000
Epoch 1032/10000
Epoch 1033/10000
Epoch 1034/10000
Epoch 1035/10000
Epoch 1036/10000
Epoch 1037/10000

In [99]:
import plotly.graph_objects as go
from plotly.subplots import make_subplots
# 使用模型預測訓練集和測試集的結果
y_pred_train = model.predict(X_train_reshaped)
y_pred_test = model.predict(X_test_reshaped)

# 從final_data中提取日期
all_dates = final_data['date'].unique()

# 根據訓練和測試數據的分割來調整日期
train_dates = all_dates[:len(X_train)]
test_dates = all_dates[len(X_train):]
timesteps=1
# 調整日期以匹配預測值
adjusted_train_dates = train_dates[timesteps:]
adjusted_test_dates = test_dates[timesteps:]

fig = make_subplots(rows=1, cols=2, subplot_titles=('Training Data', 'Testing Data with Predictions'))

# 添加訓練數據到第一個子圖
fig.add_trace(go.Scatter(x=adjusted_train_dates, y=y_train[timesteps:], mode='lines', name='Training Data'), row=1, col=1)
fig.add_trace(go.Scatter(x=adjusted_train_dates, y=y_pred_train.flatten(), mode='lines', name='Predicted Training Data', marker=dict(symbol='circle-open')), row=1, col=1)

# 添加測試數據的真實值到第二個子圖
fig.add_trace(go.Scatter(x=adjusted_test_dates, y=y_test[timesteps:], mode='lines', name='Real Test Values'), row=1, col=2)
fig.add_trace(go.Scatter(x=adjusted_test_dates, y=y_pred_test.flatten(), mode='lines', name='Predicted Test Values', marker=dict(symbol='circle-open')), row=1, col=2)

fig.update_layout(hovermode='x unified')

# 顯示圖形
fig.show()




In [103]:
import plotly.graph_objects as go
from plotly.subplots import make_subplots

# 使用模型預測訓練集和測試集的結果
y_pred_train = model.predict(X_train_reshaped)
y_pred_test = model.predict(X_test_reshaped)

# 使用scaler_y的inverse_transform方法還原數據
y_train_original = scaler_y.inverse_transform(y_train.reshape(-1, 1)).flatten()
y_test_original = scaler_y.inverse_transform(y_test.reshape(-1, 1)).flatten()
y_pred_train_original = scaler_y.inverse_transform(y_pred_train).flatten()
y_pred_test_original = scaler_y.inverse_transform(y_pred_test).flatten()

# 從final_data中提取日期
all_dates = final_data['date'].unique()

# 根據訓練和測試數據的分割來調整日期
train_dates = all_dates[:len(X_train)]
test_dates = all_dates[len(X_train):]

# 調整日期以匹配預測值
adjusted_train_dates = train_dates[timesteps:]
adjusted_test_dates = test_dates[timesteps:]

fig = make_subplots(rows=1, cols=2, subplot_titles=('Training Data', 'Testing Data with Predictions'))

# 添加訓練數據到第一個子圖
fig.add_trace(go.Scatter(x=adjusted_train_dates, y=y_train_original[timesteps:], mode='lines', name='Training Data'), row=1, col=1)
fig.add_trace(go.Scatter(x=adjusted_train_dates, y=y_pred_train_original, mode='lines', name='Predicted Training Data', marker=dict(symbol='circle-open')), row=1, col=1)

# 添加測試數據的真實值到第二個子圖
fig.add_trace(go.Scatter(x=adjusted_test_dates, y=y_test_original[timesteps:], mode='lines', name='Real Test Values'), row=1, col=2)
fig.add_trace(go.Scatter(x=adjusted_test_dates, y=y_pred_test_original, mode='lines', name='Predicted Test Values', marker=dict(symbol='circle-open')), row=1, col=2)

fig.update_layout(hovermode='x unified')

# 顯示圖形
fig.show()




In [104]:
import plotly.graph_objects as go
from plotly.subplots import make_subplots

# 計算實際和預測的日漲跌
real_train_price_diff = np.diff(y_train_original)
real_test_price_diff = np.diff(y_test_original)
pred_train_price_diff = np.diff(y_pred_train_original)
pred_test_price_diff = np.diff(y_pred_test_original)

fig = make_subplots(rows=1, cols=2, subplot_titles=('Training Data Daily Price Change', 'Testing Data Daily Price Change'))

# 添加訓練數據的實際和預測日漲跌到第一個子圖
fig.add_trace(go.Scatter(x=adjusted_train_dates[1:], y=real_train_price_diff, mode='lines', name='Real Daily Change in Training Data'), row=1, col=1)
fig.add_trace(go.Scatter(x=adjusted_train_dates[1:], y=pred_train_price_diff, mode='lines', name='Predicted Daily Change in Training Data', marker=dict(symbol='circle-open')), row=1, col=1)

# 添加測試數據的實際和預測日漲跌到第二個子圖
fig.add_trace(go.Scatter(x=adjusted_test_dates[1:], y=real_test_price_diff, mode='lines', name='Real Daily Change in Testing Data'), row=1, col=2)
fig.add_trace(go.Scatter(x=adjusted_test_dates[1:], y=pred_test_price_diff, mode='lines', name='Predicted Daily Change in Testing Data', marker=dict(symbol='circle-open')), row=1, col=2)

fig.update_layout(hovermode='x unified')

# 顯示圖形
fig.show()


In [105]:
# 計算實際和預測的日漲跌
real_train_price_diff = np.diff(y_train_original)
real_test_price_diff = np.diff(y_test_original)
predicted_train_price_diff = np.diff(y_pred_train_original)
predicted_test_price_diff = np.diff(y_pred_test_original)

# 創建一個布爾陣列來表示預測的漲跌方向是否與實際的漲跌方向一致
correct_train_predictions = np.sign(real_train_price_diff) == np.sign(predicted_train_price_diff)
correct_test_predictions = np.sign(real_test_price_diff) == np.sign(predicted_test_price_diff)

# 計算正確預測的百分比
train_accuracy = np.mean(correct_train_predictions) * 100
test_accuracy = np.mean(correct_test_predictions) * 100

print(f"Training data accuracy: {train_accuracy:.2f}%")
print(f"Test data accuracy: {test_accuracy:.2f}%")


Training data accuracy: 90.52%
Test data accuracy: 44.23%
