## 環境初始化

In [None]:
# 下載資料及與上課套件
!git clone https://github.com/leeivan1007/NTU-PGPT-training-program.git
!cp NTU-PGPT-training-program/NLP/* .

In [None]:
# torch
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset

# scikit-learn
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler

# 其他套件
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

## 1. 記憶模塊介紹：RNN, LSTM, Transformer

In [None]:
#@title 1.1 建立模型參數與測試資料

# 網路設計
input_dim = 8   # 輸入維度（資料的特徵數量）
output_dim = 8  # 輸出維度

# 測試資料
batch_size = 16 # 16 筆資料
seq_length = 7 # 7 天

data_input = torch.rand(batch_size, seq_length, input_dim)

print(f'資料維度：{data_input.shape}')

In [None]:
#@title 1.2 RNN 模塊建立
rnn = nn.RNN(input_dim, output_dim, batch_first=True)

rnn_output, _ = rnn(data_input)
print(f'輸出維度：{rnn_output.shape}')

In [None]:
#@title 1.3 LSTM 模塊建立
lstm = nn.LSTM(input_dim, output_dim, batch_first=True)

lstm_output, _ = lstm(data_input)
print(f'輸出維度：{lstm_output.shape}')

In [None]:
#@title 1.4 Transformer 模塊建立

num_heads = 8
transformer = nn.TransformerEncoderLayer(d_model=input_dim, nhead=num_heads,batch_first=True)

transformer_output = transformer(data_input)
print(f'輸出維度：{transformer_output.shape}')

## 2. 股票的收盤價預測

In [None]:
#@title 2.1 單筆測試1~7天的資料
a_week = np.arange(1,8)

print(a_week)

In [None]:
# 資料將往右邊推動
print(a_week[0:-1])

In [None]:
# 將被修改的資料
print(a_week[1:])

In [None]:
# 新的擴增資料
new_week = a_week.copy()
print(new_week)

In [None]:
# 修改後的資料
new_week[1:] = a_week[:-1]
print(new_week)

In [None]:
# 沒有推到的地方補0
new_week[:1] = 0
print(new_week)

In [None]:
# 擴增資料後的結果
print(f'原始的資料:{a_week}')
print(f'偏移的資料:{new_week}')

In [None]:
#@title 2.2 使用迴圈做資料擴增 (時間軸)
window_size = 3

a_week = np.expand_dims(a_week, axis=0)
a_week = np.tile(a_week, (window_size, 1))

for window in range(window_size):

  if window != 0:
    a_week[window, window:] = a_week[window, :-window]
    a_week[window, :window] = 0

print(a_week)

In [None]:
#@title 2.3 資料擴增模組化
def expand_window(dataset, window_size):

  dataset = np.expand_dims(dataset, axis=1)
  dataset = np.tile(dataset, (1, window_size, 1))

  for window in range(window_size):

    if window != 0:
      dataset[window:, window, :] = dataset[:-window, window, :]
      dataset[:window, window, :] = 0

  return dataset

In [None]:
#@title 2.4 資料載入與前處理
data = pd.read_csv('20230817_0050.csv', index_col=0)

data.loc[:3]

In [None]:
#@title 2.5 時間資料處理
# 加上 小時:分鐘:秒數
data['date'] = pd.to_datetime(data['date'])
data.loc[0]

In [None]:
#@title 2.5.1 新增時間戳
data['timestamp'] = data['date'].apply(lambda x: x.timestamp())

# 原本的日期拿掉
X = data.drop(['date'], axis=1)

In [None]:
#@title 2.5.2 設定預測值 (標籤)
# 收盤價格做預測的值
y = data['close']

n_features = X.shape[1]

In [None]:
#@title 2.6 特徵正規化 (Feature Normalization)
scaler = MinMaxScaler()
X_scaled = scaler.fit_transform(X)
y_scaled = scaler.fit_transform(y.values.reshape(-1, 1))

y_scaled[1:] = y_scaled[:-1] # 收盤價
y_scaled[0] = 0

# 資料切割 -> 分為訓練與測試資料集
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y_scaled, test_size=0.2, shuffle=False, random_state=42)

In [None]:
#@title 2.7 資料天數擴增(時間軸)
window_size = 7

X_train = expand_window(X_train, window_size)
X_test = expand_window(X_test, window_size)

# 時間排序顛倒
X_train = X_train[:,::-1,:].copy()
X_test = X_test[:,::-1,:].copy()

In [None]:
#@title 2.8 numpy -> torch

X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32)

In [None]:
#@title 2.9 創建資料集與迭代器
# 創建 Dataset
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

# 創建 DataLoader
train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False)

In [None]:
#@title 2.10 建立模型
# 超參數設置

# 定義模型
class StockModel(nn.Module):
    def __init__(self):
        super(StockModel, self).__init__()

        self.memory_1 = nn.RNN(10, 128, batch_first=True)
        self.memory_2 = nn.RNN(128, 256, batch_first=True)
        self.memory_3 = nn.RNN(256, 256, batch_first=True)
        self.dropout = nn.Dropout(0.2)
        self.fc = nn.Linear(256, 1)

    def forward(self, x):

        x, _ = self.memory_1(x)
        x, _ = self.memory_2(x)
        x, _ = self.memory_3(x)
        x = self.dropout(x)

        x = x[:, -1, :] # 取最後一個時間
        x = self.fc(x)
        return x

model = StockModel()

batch_size = 32
test_data = torch.randn((batch_size, window_size, n_features))
test_output = model(test_data)
print(f'測試一筆隨機資料：{test_output.shape}')

In [None]:
#@title 2.11 定義超參數、損失函數和優化器
epochs = 15
learn_rate = 0.001
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=learn_rate)

In [None]:
#@title 2.12 訓練模型
model.train()
for epoch in range(epochs):
    for X_batch, y_batch in train_loader:
        optimizer.zero_grad()
        outputs = model(X_batch)
        loss = criterion(outputs, y_batch)
        loss.backward()
        optimizer.step()
    print(f"Epoch {epoch+1}/{epochs}, Loss: {loss.item():.6f}")

# 評估模型
model.eval()
all_preds = []
all_targets = []
with torch.no_grad():
    for X_batch, y_batch in test_loader:
        outputs = model(X_batch)
        all_preds.append(outputs.numpy())
        all_targets.append(y_batch.numpy())

y_pred_scaled = np.concatenate(all_preds, axis=0)
y_test_original = np.concatenate(all_targets, axis=0)

y_pred = scaler.inverse_transform(y_pred_scaled)
y_test_original = scaler.inverse_transform(y_test_original)

mse = mean_squared_error(y_test_original, y_pred)
print(f"Mean Squared Error: {mse:.6f}")

In [None]:
#@title 2.13 資料視覺化
plt.figure(figsize=(10, 6))
plt.plot(y_test_original, label='Original', color='blue', linestyle='-')
plt.plot(y_pred, label='Prediction', color='red', linestyle='--')

plt.title('Original vs Prediction')
plt.xlabel('Index')
plt.ylabel('Values')
plt.legend()

plt.show()

## 隨堂練習

In [None]:
#@title 練習 1：新增股市的特徵資料
### 老闆想新增一筆資料為，最高價(highprice)與最低價(lowprice)的平均數 -> 相加除2
### 請試著使用資料操作的方式，取得這個數字，並加回到原本的表格中，新增一個欄位為 average_price
### 參考：（2.5.1 新增時間戳）

data = pd.read_csv('20230817_0050.csv', index_col=0)

### 實作練習 ###

?

### 實作練習 ###


print(data.loc[:2, 'average_price'])
### 實作結果
# 0    56.350
# 1    56.525
# 2    57.125
# Name: average_price, dtype: float64

In [None]:
#@title 練習 2：Windos size 手動切割
### 現在有一筆資料，為店面的(7/1~7/10)的營運獲利
### 假設目前已有一個訓練好的模型，處理windos size為 3 的資料
### 請在預測這筆資料前，先對這筆資料做切割。
### 參考：（2.1 單筆測試 1~7天的資料）

#                  共 15 天的連續資料，單位為(萬元)
profit = np.array([5.3, 11.0, 7.8, 9.1, 3.2, 7.7, 1.8, 5.3, 2.3, 7.3])

profit_1 = profit.copy()
profit_2 = profit.copy()

### 實作練習 ###

### 不用寫邏輯操作
### 直接手動切割



### 實作練習 ###

print(f'第一維資料：{profit}')
print(f'第二維資料：{profit_1}')
print(f'第三維資料：{profit_2}')

### 實作結果
# 第一維資料：[ 5.3 11.   7.8  9.1  3.2  7.7  1.8  5.3  2.3  7.3]
# 第二維資料：[ 0.   5.3 11.   7.8  9.1  3.2  7.7  1.8  5.3  2.3]
# 第三維資料：[ 0.   0.   5.3 11.   7.8  9.1  3.2  7.7  1.8  5.3]

In [None]:
#@title 練習 3.1 ：收盤價預測優化 -> [文件](https://pytorch.org/docs/stable/generated/torch.nn.LSTM.html)
### 情境：對於股票收盤價的預測，老闆希望模型的效果更好
### 試著改為 LSTM，以及加入 bidirectional

# 定義模型
class StockModel(nn.Module):
    def __init__(self):
        super(StockModel, self).__init__()

        ### 實作練習 ###

        # 參數修改 ⭣
        self.memory_1 = nn.LSTM(10, 128, batch_first=True)

        # 參數修改 ⭣
        self.memory_2 = nn.LSTM(128, 256, batch_first=True)

        # 參數修改 ⭣
        self.memory_3 = nn.LSTM(256, 256, batch_first=True)

        self.dropout = nn.Dropout(0.2)

        # 參數修改 ⭣
        self.fc = nn.Linear(256, 1)

        ### 實作練習 ###

    def forward(self, x):

        x, _ = self.memory_1(x)
        x, _ = self.memory_2(x)
        x, _ = self.memory_3(x)
        x = self.dropout(x)

        x = x[:, -1, :] # 取最後一個時間
        x = self.fc(x)
        return x

model = StockModel()

test_data = torch.randn((10, 1, 10))
test_output = model(test_data)
print(f'測試一筆隨機資料：{test_output.shape}')

### 實作結果
# 測試一筆隨機資料：torch.Size([10, 1])

In [None]:
#@title 練習 3.2：訓練模型

epochs = 15
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

model.train()
for epoch in range(epochs):
    for X_batch, y_batch in train_loader:
        optimizer.zero_grad()
        outputs = model(X_batch)
        loss = criterion(outputs, y_batch)
        loss.backward()
        optimizer.step()
    print(f"Epoch {epoch+1}/{epochs}, Loss: {loss.item():.6f}")

# 評估模型
model.eval()
all_preds = []
all_targets = []
with torch.no_grad():
    for X_batch, y_batch in test_loader:
        outputs = model(X_batch)
        all_preds.append(outputs.numpy())
        all_targets.append(y_batch.numpy())

y_pred_scaled = np.concatenate(all_preds, axis=0)
y_test_original = np.concatenate(all_targets, axis=0)

y_pred = scaler.inverse_transform(y_pred_scaled)
y_test_original = scaler.inverse_transform(y_test_original)

mse = mean_squared_error(y_test_original, y_pred)
print(f"Mean Squared Error: {mse:.6f}")

In [None]:
#@title 練習 3.3：資料視覺化
plt.figure(figsize=(10, 6))
plt.plot(y_test_original, label='Original', color='blue', linestyle='-')
plt.plot(y_pred, label='Prediction', color='red', linestyle='--')

plt.title('Original vs Prediction')
plt.xlabel('Index')
plt.ylabel('Values')
plt.legend()

plt.show()

## 補充：爬取股票套件 (twstock)

In [None]:
#@title 安裝套件 專案資訊 -> [網址](https://twstock.readthedocs.io/zh-tw/latest/)
!pip install twstock

import twstock

In [None]:
# 設定股票號碼
stock = twstock.Stock("1301")

#設定時間
stocklist = stock.fetch(2024, 0)

In [None]:
# 觀察資料
stocklist[0]

In [None]:
# 迴圈拉出一個月的資料
for stock in stocklist:
  strdate = stock.date.strftime("%Y-%m-%d")

  #    ["日期",  "成交股數",      "成交金額",       "開盤價",    "最高價",    "最低價",   "收盤價",     "漲跌價差",    "成交筆數"]
  li = [strdate, stock.capacity, stock.turnover, stock.open, stock.high, stock.low, stock.close, stock.change, stock.transaction]

print(f'總共爬到的資料天數：{len(stocklist)}')
print(f'最後一天的資料：{li}')

In [None]:
#@title 設定路徑與超參數
import csv
import twstock
import os

filepath = "stock2024.csv"
month_length = 4

In [None]:
#@title 使用迴圈，把多個月份資料存放到 data
data = []

for i in range(1, month_length):
    stock = twstock.Stock("1301")
    stocklist = stock.fetch(2024, i)

    for stock in stocklist:
        strdate = stock.date.strftime("%Y-%m-%d")
        li = [strdate, stock.capacity, stock.turnover, stock.open, stock.high, stock.low, stock.close, stock.change, stock.transaction]
        data.append(li)

In [None]:
#@title 資料儲存
outputfile = open(filepath, "w", newline = "", encoding = "big5")
outputwriter = csv.writer(outputfile)

# 先撰寫標題
title = ["Date", "Volume", "Transaction Amount", "Opening Price", "Highest Price", "Lowest Price", "Closing Price", "Price Change", "Number of Transactions"]
outputwriter.writerow(title)

for dataline in (data):
    outputwriter.writerow(dataline)
outputfile.close()