<a href="https://colab.research.google.com/github/banshee0716/Financial-Big-Data-Analysis/blob/master/%E9%87%91%E8%9E%8D%E6%95%B8%E6%93%9A%E5%88%86%E6%9E%90W7_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
%%capture
!pip install mplfinance

In [2]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import yfinance as yf
from datetime import datetime
import matplotlib.pyplot as plt
import mplfinance as mpf
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns
import matplotlib.dates as mdates
from mplfinance.original_flavor import candlestick_ohlc

In [3]:
# 股票參數設定
stock_id = "2330.TW"
start_date = "2024-01-01"
end_date = datetime.today().strftime("%Y-%m-%d")

# 下載股票資料
df = yf.download(stock_id, start=start_date, end=end_date)

# 確保所有價格數據為float類型
price_cols = ['Open', 'High', 'Low', 'Close', 'Volume']
df[price_cols] = df[price_cols].astype(float)

# 計算技術指標
df['MA5'] = df['Close'].rolling(window=5).mean()
df['MA10'] = df['Close'].rolling(window=10).mean()
df['Label'] = (df['MA5'] > df['MA10']).astype(int)

# 移除NaN值
df = df.dropna()

# 分割訓練集和測試集 (8:2)
train_size = int(len(df) * 0.8)
train_data = df[:train_size]
test_data = df[train_size:]

[*********************100%***********************]  1 of 1 completed


In [4]:
def create_sequences(data, sequence_length):
  sequences = []
  labels = []

  # 使用開高低收作為特徵
  price_data = data[['Open', 'High', 'Low', 'Close']].values

  for i in range(len(data) - sequence_length):
    seq = price_data[i:(i + sequence_length)]
    label = data['Label'].values[i + sequence_length]
    sequences.append(seq)
    labels.append(label)

  return np.array(sequences), np.array(labels)

# 設定序列長度
sequence_length = 10

# 創建訓練集和測試集序列
X_train, y_train = create_sequences(train_data, sequence_length)
X_test, y_test = create_sequences(test_data, sequence_length)

# 標準化數據
scaler = StandardScaler()
X_train_reshaped = X_train.reshape(-1, X_train.shape[-1])
X_test_reshaped = X_test.reshape(-1, X_test.shape[-1])

X_train_scaled = scaler.fit_transform(X_train_reshaped)
X_test_scaled = scaler.transform(X_test_reshaped)

X_train_scaled = X_train_scaled.reshape(X_train.shape)
X_test_scaled = X_test_scaled.reshape(X_test.shape)

# 轉換為PyTorch張量
X_train_tensor = torch.FloatTensor(X_train_scaled)
y_train_tensor = torch.LongTensor(y_train)
X_test_tensor = torch.FloatTensor(X_test_scaled)
y_test_tensor = torch.LongTensor(y_test)

In [5]:
class StockHybridNet(nn.Module):
  def __init__(self, sequence_length, n_features):
    super(StockHybridNet, self).__init__()

      # LSTM層
    self.lstm_hidden = 64
    self.lstm = nn.LSTM(n_features, self.lstm_hidden,
                           num_layers=2, batch_first=True,
                           dropout=0.2, bidirectional=True)

      # CNN層
    self.conv1 = nn.Conv1d(self.lstm_hidden*2, 128, kernel_size=3)
    self.conv2 = nn.Conv1d(128, 64, kernel_size=3)
    self.conv3 = nn.Conv1d(64, 32, kernel_size=3)

      # 其他層
    self.relu = nn.LeakyReLU(0.1)
    self.dropout = nn.Dropout(0.3)
    self.batch_norm1 = nn.BatchNorm1d(128)
    self.batch_norm2 = nn.BatchNorm1d(64)

      # 計算全連接層輸入維度
    L_out = sequence_length - 6  # 3個CNN層的kernel_size=3
    self.fc_input = 32 * L_out

      # 全連接層
    self.fc1 = nn.Linear(self.fc_input, 128)
    self.fc2 = nn.Linear(128, 32)
    self.fc3 = nn.Linear(32, 2)

  def forward(self, x):
      # LSTM
    lstm_out, _ = self.lstm(x)

      # CNN
    x = lstm_out.permute(0, 2, 1)
    x = self.relu(self.conv1(x))
    x = self.batch_norm1(x)
    x = self.dropout(x)

    x = self.relu(self.conv2(x))
    x = self.batch_norm2(x)
    x = self.dropout(x)

    x = self.relu(self.conv3(x))
    x = self.dropout(x)

      # 全連接
    x = x.flatten(1)
    x = self.relu(self.fc1(x))
    x = self.dropout(x)
    x = self.relu(self.fc2(x))
    x = self.fc3(x)

    return x

In [6]:
model = StockHybridNet(sequence_length=sequence_length, n_features=4)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5)

# 學習率調整器
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode='min', factor=0.5, patience=5, verbose=True
)

# 訓練設置
num_epochs = 50
batch_size = 32
train_loader = DataLoader(list(zip(X_train_tensor, y_train_tensor)),
                         batch_size=batch_size, shuffle=True)

# 用於存儲訓練過程
train_losses = []
train_accuracies = []

# 早停設置
best_loss = float('inf')
patience = 10
patience_counter = 0

# 訓練迴圈
for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    correct = 0
    total = 0

    for batch_X, batch_y in train_loader:
        # 前向傳播
        optimizer.zero_grad()
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y)

        # 反向傳播
        loss.backward()
        # 梯度裁剪
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()

        # 計算損失和準確率
        total_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total += batch_y.size(0)
        correct += (predicted == batch_y).sum().item()

    # 計算平均損失和準確率
    avg_loss = total_loss / len(train_loader)
    accuracy = 100 * correct / total

    train_losses.append(avg_loss)
    train_accuracies.append(accuracy)

    # 學習率調整
    scheduler.step(avg_loss)

    # 早停檢查
    if avg_loss < best_loss:
        best_loss = avg_loss
        patience_counter = 0
        # 保存最佳模型
        torch.save(model.state_dict(), 'best_model.pth')
    else:
        patience_counter += 1
        if patience_counter >= patience:
            print(f'Early stopping at epoch {epoch+1}')
            break

    # 每10個epoch輸出一次結果
    if (epoch + 1) % 10 == 0:
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}, Accuracy: {accuracy:.2f}%')
        print(f'Current learning rate: {optimizer.param_groups[0]["lr"]:.6f}')



Epoch [10/50], Loss: 0.3567, Accuracy: 85.00%
Current learning rate: 0.001000
Epoch [20/50], Loss: 0.2399, Accuracy: 89.29%
Current learning rate: 0.001000
Epoch [30/50], Loss: 0.1771, Accuracy: 93.57%
Current learning rate: 0.000500
Epoch [40/50], Loss: 0.0985, Accuracy: 96.43%
Current learning rate: 0.000250
Epoch [50/50], Loss: 0.1118, Accuracy: 95.71%
Current learning rate: 0.000125


In [7]:
# 準備測試數據的K線圖
test_start_idx = train_size + sequence_length
test_plot_data = df[test_start_idx:].copy()

# 重設數據結構
test_plot_data = test_plot_data.reset_index()
test_plot_data.columns = test_plot_data.columns.get_level_values(0)
test_plot_data.set_index('Date', inplace=True)
import plotly.graph_objects as go

# 進行預測（保持原樣）
model.eval()  # 設置模型為評估模式
with torch.no_grad():
    outputs = model(X_test_tensor)
    predicted = torch.argmax(outputs.data, 1)

# 準備實際數據和預測數據
actual_data = test_plot_data[['Open', 'High', 'Low', 'Close']].copy()
pred_data = actual_data.copy()

import numpy as np

# 計算最近N天的平均漲跌幅
N = 5  # 您可以根據需要調整這個值
price_changes = actual_data['Close'].pct_change()
average_change = price_changes.rolling(window=N).mean().shift(-1)  # 向前移動一個單位，與當前行對齊

# 根據預測結果修改預測數據
for i, pred in enumerate(predicted[:-1]):
    current_close = float(pred_data['Close'].iloc[i])
    avg_change = average_change.iloc[i]
    if pd.isna(avg_change):
        avg_change = 0  # 如果平均變化為NaN，則設為0

    if pred == 1:  # 預測漲
        # 使用近期平均漲幅，確保至少有0.5%的增長
        change_factor = 1 + max(avg_change, 0.005)
    else:  # 預測跌
        # 使用近期平均跌幅，確保至少有0.5%的下降
        change_factor = 1 - max(-avg_change, 0.005)

    # 限制變化因子在合理範圍內，防止過大波動
    change_factor = max(min(change_factor, 1.10), 0.90)

    # 計算新的開盤價和收盤價
    open_price = current_close
    close_price = current_close * change_factor

    # 隨機生成High和Low在Open和Close之間的值，增加動態性
    high_price = max(open_price, close_price) * (1 + np.random.uniform(0, 0.01))
    low_price = min(open_price, close_price) * (1 - np.random.uniform(0, 0.01))

    pred_data.iloc[i+1, pred_data.columns.get_loc('Open')] = open_price
    pred_data.iloc[i+1, pred_data.columns.get_loc('Close')] = close_price
    pred_data.iloc[i+1, pred_data.columns.get_loc('High')] = high_price
    pred_data.iloc[i+1, pred_data.columns.get_loc('Low')] = low_price

# 確保所有數據為float類型
for col in ['Open', 'High', 'Low', 'Close']:
    actual_data[col] = actual_data[col].astype(float)
    pred_data[col] = pred_data[col].astype(float)

# 重置索引並將日期轉換為 datetime 格式
actual_data = actual_data.reset_index()
pred_data = pred_data.reset_index()
actual_data['Date'] = pd.to_datetime(actual_data['Date'])
pred_data['Date'] = pd.to_datetime(pred_data['Date'])

# 創建圖表
fig = go.Figure()

# 添加實際數據的 K 線圖
fig.add_trace(go.Candlestick(
    x=actual_data['Date'],
    open=actual_data['Open'],
    high=actual_data['High'],
    low=actual_data['Low'],
    close=actual_data['Close'],
    name='實際數據',
    increasing_line_color='lightgreen',
    decreasing_line_color='lightcoral',
    showlegend=True
))

# 添加預測數據的 K 線圖
fig.add_trace(go.Candlestick(
    x=pred_data['Date'],
    open=pred_data['Open'],
    high=pred_data['High'],
    low=pred_data['Low'],
    close=pred_data['Close'],
    name='預測數據',
    increasing_line_color='green',
    decreasing_line_color='red',
    opacity=0.5,
    showlegend=True
))

# 更新圖表布局
fig.update_layout(
    title=f'{stock_id} 股價走勢圖',
    xaxis_title='日期',
    yaxis_title='價格',
    xaxis_rangeslider_visible=False,
    width=1000,
    height=600
)

# 顯示圖表
fig.show()

# 顯示預測準確度（保持原樣）
accuracy = (predicted == y_test_tensor).float().mean().item()
print(f'\n預測準確度: {accuracy*100:.2f}%')

# 顯示最後一天的預測結果（保持原樣）
last_actual = "漲" if y_test_tensor[-1] == 1 else "跌"
last_pred = "漲" if predicted[-1] == 1 else "跌"
print(f'\n最後一天預測:')
print(f'實際結果: {last_actual}')
print(f'預測結果: {last_pred}')


預測準確度: 92.86%

最後一天預測:
實際結果: 跌
預測結果: 跌
