In [799]:
# 延用上週爬取的股價資料，完成本次作業要求：
# 1. 計算技術指標（SMA、RSI、MACD）
# 2. 可視化 K 線圖
# 3. 技術指標結合機器學習（線性回歸、決策樹、隨機森林）預測股價漲跌。使用技術指標作為特徵來預測第 n 天的股價

In [800]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import datetime
import os
import requests, json
import plotly.graph_objects as go

In [801]:
# 爬取股價資料
def fetch_stock_data(code, year, month):
    if len(str(month)) == 1:
        month = '0' + str(month)
    query_interval = str(year) + str(month) + '01'
    url = f'https://www.twse.com.tw/exchangeReport/STOCK_DAY?response=json&date={query_interval}&stockNo={code}'
    html = requests.get(url)
    content = json.loads(html.text)
    if content['total'] != 0:
        stock_data = content['data']
        col_name = content['fields']
        # 預先新增一個西元年欄位, 把民國年換成西元年
        df = pd.DataFrame(data=stock_data, columns=col_name)
        df['西元年'] = df['日期'].str[:3].astype('int') + 1911
        df['日期'] = df['西元年'].astype('str') + df['日期'].str[3:]
        # 將日期轉成datetime格式
        df['日期'] = pd.to_datetime(df['日期'])
        df.drop(columns=['西元年'], inplace=True)
        return df
    else:
        print(f"查無資料: {code} - {year} / {month}")
        return None
    
n = 2
target_code = '2330'
# 爬取近 n 年股價資料
def fetch_stock_data_years(code, n):
    end_date = datetime.datetime.now()
    start_date = end_date - datetime.timedelta(weeks=52*n)
    data = []
    for year in range(start_date.year, end_date.year + 1):
        if year == end_date.year:
            for month in range(1, end_date.month + 1):
                print(f"爬取資料: {code} - {year} / {month}")
                df = fetch_stock_data(code, year, month)
                if df is not None:
                    data.append(df)
        else:
            for month in range(1, 13):
                print(f"爬取資料: {code} - {year} / {month}")
                df = fetch_stock_data(code, year, month)
                if df is not None:
                    data.append(df)
    return pd.concat(data)


df = fetch_stock_data_years(target_code, n)
df['成交股數'] = df['成交股數'].str.replace(',', '').astype(int)
df['成交金額'] = df['成交金額'].str.replace(',', '').astype(int)
df['開盤價'] = df['開盤價'].str.replace(',', '').astype(float)
df['最高價'] = df['最高價'].str.replace(',', '').astype(float)
df['最低價'] = df['最低價'].str.replace(',', '').astype(float)
df['收盤價'] = df['收盤價'].str.replace(',', '').astype(float)
df['成交筆數'] = df['成交筆數'].str.replace(',', '').astype(int)


爬取資料: 2330 - 2022 / 1
爬取資料: 2330 - 2022 / 2
爬取資料: 2330 - 2022 / 3
爬取資料: 2330 - 2022 / 4
爬取資料: 2330 - 2022 / 5
爬取資料: 2330 - 2022 / 6
爬取資料: 2330 - 2022 / 7
爬取資料: 2330 - 2022 / 8
爬取資料: 2330 - 2022 / 9
爬取資料: 2330 - 2022 / 10
爬取資料: 2330 - 2022 / 11
爬取資料: 2330 - 2022 / 12
爬取資料: 2330 - 2023 / 1
爬取資料: 2330 - 2023 / 2
爬取資料: 2330 - 2023 / 3
爬取資料: 2330 - 2023 / 4
爬取資料: 2330 - 2023 / 5
爬取資料: 2330 - 2023 / 6
爬取資料: 2330 - 2023 / 7
爬取資料: 2330 - 2023 / 8
爬取資料: 2330 - 2023 / 9
爬取資料: 2330 - 2023 / 10
爬取資料: 2330 - 2023 / 11
爬取資料: 2330 - 2023 / 12
爬取資料: 2330 - 2024 / 1
爬取資料: 2330 - 2024 / 2
爬取資料: 2330 - 2024 / 3
爬取資料: 2330 - 2024 / 4
爬取資料: 2330 - 2024 / 5
爬取資料: 2330 - 2024 / 6
爬取資料: 2330 - 2024 / 7
爬取資料: 2330 - 2024 / 8
爬取資料: 2330 - 2024 / 9
爬取資料: 2330 - 2024 / 10


In [802]:
df.head()

Unnamed: 0,日期,成交股數,成交金額,開盤價,最高價,最低價,收盤價,漲跌價差,成交筆數
0,2022-01-03,73703302,46249716919,619.0,632.0,618.0,631.0,16.0,88508
1,2022-01-04,90945643,59188199534,645.0,656.0,644.0,656.0,25.0,106409
2,2022-01-05,72505550,47582832784,669.0,669.0,646.0,650.0,-6.0,64712
3,2022-01-06,57490736,36817638522,638.0,646.0,636.0,644.0,-6.0,53430
4,2022-01-07,39847766,25358237656,643.0,646.0,632.0,634.0,-10.0,44497


In [803]:
# 1. 計算技術指標（SMA、RSI、MACD）
def calculate_sma(data, period):
    return data['收盤價'].rolling(window=period).mean()

sma5 = calculate_sma(df, 5)
sma10 = calculate_sma(df, 10)
sma20 = calculate_sma(df, 20)


In [804]:
def calculate_rsi(data, period):
    delta = data['收盤價'].diff()
    gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
    rs = gain / loss
    rsi = 100 - (100 / (1 + rs))
    return rsi

rsi = calculate_rsi(df, 14)

In [805]:
def calculate_macd(data, slow=26, fast=12, signal=9):
    exp1 = data['收盤價'].ewm(span=fast, adjust=False).mean()
    exp2 = data['收盤價'].ewm(span=slow, adjust=False).mean()
    macd = exp1 - exp2
    macd_signal = macd.ewm(span=signal, adjust=False).mean()
    return macd, macd_signal

macd, macd_signal = calculate_macd(df, 26, 12, 9)


In [806]:
# 2. 可視化 K 線圖
df['SMA5'] = sma5
df['SMA10'] = sma10
df['SMA20'] = sma20
df['RSI'] = rsi
df['MACD'] = macd
df['MACD_Signal'] = macd_signal
df.dropna(subset=['SMA5', 'SMA10', 'SMA20','RSI'], inplace=True)

# 移除不需要的欄位 
df.drop(columns=['成交股數', '成交金額', '成交筆數','漲跌價差'], inplace=True)
df.info()

df.to_csv(f'stock_data_{target_code}_with_features.csv', index=False)

current_data = pd.read_csv(f'stock_data_{target_code}_with_features.csv')

# 可視化 K 線圖
def plot_candlestick(data, title):
    fig = go.Figure(data=[go.Candlestick(x=data['日期'],
                                        open=data['開盤價'],
                                        high=data['最高價'],
                                        low=data['最低價'],
                                        close=data['收盤價'],
                                        increasing_line_color='red',
                                        decreasing_line_color='green')])
    
    fig.add_trace(go.Scatter(x=data['日期'], y=data['SMA5'], mode='lines', name='SMA5'))
    fig.add_trace(go.Scatter(x=data['日期'], y=data['SMA10'], mode='lines', name='SMA10'))
    fig.add_trace(go.Scatter(x=data['日期'], y=data['SMA20'], mode='lines', name='SMA20'))
    fig.add_trace(go.Scatter(x=data['日期'], y=data['RSI'], mode='lines', name='RSI'))
    fig.add_trace(go.Scatter(x=data['日期'], y=data['MACD'], mode='lines', name='MACD'))
    fig.add_trace(go.Scatter(x=data['日期'], y=data['MACD_Signal'], mode='lines', name='MACD_Signal'))

    fig.update_layout(title=title, xaxis_title='日期', yaxis_title='價格',width=1200, height=800)
    fig.update_yaxes(autorange=True)
    
    fig.show()


# 把存下來的資料畫出來
plot_candlestick(current_data, f'{target_code} 近{n}年 K 線圖')




<class 'pandas.core.frame.DataFrame'>
Int64Index: 647 entries, 1 to 0
Data columns (total 11 columns):
 #   Column       Non-Null Count  Dtype         
---  ------       --------------  -----         
 0   日期           647 non-null    datetime64[ns]
 1   開盤價          647 non-null    float64       
 2   最高價          647 non-null    float64       
 3   最低價          647 non-null    float64       
 4   收盤價          647 non-null    float64       
 5   SMA5         647 non-null    float64       
 6   SMA10        647 non-null    float64       
 7   SMA20        647 non-null    float64       
 8   RSI          647 non-null    float64       
 9   MACD         647 non-null    float64       
 10  MACD_Signal  647 non-null    float64       
dtypes: datetime64[ns](1), float64(10)
memory usage: 60.7 KB


In [807]:
# 3. 技術指標結合機器學習（線性回歸、決策樹、隨機森林）預測股價漲跌。
# 使用(SMA5, SMA10, SMA20, SMA60, RSI, MACD, MACD_Signal)技術指標作為特徵來預測第 n 天的股價

from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error

lr_model = LinearRegression()
by_feature = ['SMA5', 'SMA10', 'SMA20', 'RSI', 'MACD', 'MACD_Signal']

# 檢查是否有缺失值
print(current_data[by_feature].isna().sum())

SMA5           0
SMA10          0
SMA20          0
RSI            0
MACD           0
MACD_Signal    0
dtype: int64


In [808]:
current_data.info()


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 647 entries, 0 to 646
Data columns (total 11 columns):
 #   Column       Non-Null Count  Dtype  
---  ------       --------------  -----  
 0   日期           647 non-null    object 
 1   開盤價          647 non-null    float64
 2   最高價          647 non-null    float64
 3   最低價          647 non-null    float64
 4   收盤價          647 non-null    float64
 5   SMA5         647 non-null    float64
 6   SMA10        647 non-null    float64
 7   SMA20        647 non-null    float64
 8   RSI          647 non-null    float64
 9   MACD         647 non-null    float64
 10  MACD_Signal  647 non-null    float64
dtypes: float64(10), object(1)
memory usage: 55.7+ KB


In [809]:
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error

future_data = current_data.copy()[-1:]
future_data.info()


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1 entries, 646 to 646
Data columns (total 11 columns):
 #   Column       Non-Null Count  Dtype  
---  ------       --------------  -----  
 0   日期           1 non-null      object 
 1   開盤價          1 non-null      float64
 2   最高價          1 non-null      float64
 3   最低價          1 non-null      float64
 4   收盤價          1 non-null      float64
 5   SMA5         1 non-null      float64
 6   SMA10        1 non-null      float64
 7   SMA20        1 non-null      float64
 8   RSI          1 non-null      float64
 9   MACD         1 non-null      float64
 10  MACD_Signal  1 non-null      float64
dtypes: float64(10), object(1)
memory usage: 220.0+ bytes


### 隨機森林演算法 - 預測未來七天股價

In [810]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error

predict_days = 7

# 讀取資料
data = pd.read_csv('stock_data_2330_with_features.csv')

# 特徵選擇
features = ['SMA5', 'SMA10', 'SMA20', 'RSI', 'MACD', 'MACD_Signal']
target_columns = ['開盤價', '最高價', '最低價', '收盤價']

# 分割資料集
X = data[features]
y = data[target_columns]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 訓練模型
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(X_train, y_train)

# 預測
y_pred = model.predict(X_test)
print(f'Mean Squared Error: {mean_squared_error(y_test, y_pred)}')

# 預測未來七天
def predict_future(data, model, features, n_days):
    future_predictions = []
    last_known_features = data[features].iloc[-1].values.reshape(1, -1)
    
    for _ in range(n_days):
        next_prediction = model.predict(pd.DataFrame(last_known_features, columns=features))
        future_predictions.append(next_prediction[0])
        
        # 更新特徵，假設未來的技術指標不變
        last_known_features = np.roll(last_known_features, -1, axis=1)
        last_known_features[0, -len(next_prediction[0]):] = next_prediction[0]
    
    return future_predictions

future_predictions = predict_future(data, model, features, predict_days)

# 生成未來七天的工作日日期
last_date = pd.to_datetime(data['日期'].iloc[-1])
future_dates = pd.bdate_range(start=last_date, periods=predict_days+1)[1:]  # +1 是因為 bdate_range 包含起始日期

# 將日期與預測結果結合
future_df = pd.DataFrame(future_predictions, columns=target_columns)
future_df['日期'] = future_dates

future_df = future_df[['日期'] + target_columns]

print(f'未來七天的預測: {future_df}')


Mean Squared Error: 90.25979711538453
未來七天的預測:           日期      開盤價      最高價      最低價      收盤價
0 2024-10-02   973.50   983.21   966.95   971.19
1 2024-10-03  1043.95  1061.64  1035.36  1057.74
2 2024-10-04   974.83   985.90   969.84   981.13
3 2024-10-07  1043.75  1060.19  1034.94  1056.03
4 2024-10-08  1050.28  1067.10  1040.79  1062.80
5 2024-10-09  1041.80  1058.34  1033.08  1053.78
6 2024-10-10  1050.68  1067.35  1041.19  1062.50


In [813]:
# 繪製 stock_data_2330_with_features.csv 接上 未來七天的預測 線圖

# 將歷史資料與預測結果結合
future_df = pd.DataFrame(future_predictions, columns=target_columns)
future_df['日期'] = future_dates

# 將日期欄位移到最前面
future_df = future_df[['日期'] + target_columns]

# 合併歷史資料與未來預測資料
combined_data = pd.concat([current_data, future_df], ignore_index=True)

# 繪製 K 線圖
def plot_combined_candlestick(data, title, subtitle):
    fig = go.Figure(data=[go.Candlestick(x=data['日期'],
                                        open=data['開盤價'],
                                        high=data['最高價'],
                                        low=data['最低價'],
                                        close=data['收盤價'],
                                        increasing_line_color='red',
                                        decreasing_line_color='green')])
    
    fig.add_trace(go.Scatter(x=data['日期'], y=data['SMA5'], mode='lines', name='SMA5', visible='legendonly'))
    fig.add_trace(go.Scatter(x=data['日期'], y=data['SMA10'], mode='lines', name='SMA10', visible='legendonly'))
    fig.add_trace(go.Scatter(x=data['日期'], y=data['SMA20'], mode='lines', name='SMA20', visible='legendonly'))
    fig.add_trace(go.Scatter(x=data['日期'], y=data['RSI'], mode='lines', name='RSI', visible='legendonly'))
    fig.add_trace(go.Scatter(x=data['日期'], y=data['MACD'], mode='lines', name='MACD', visible='legendonly'))
    fig.add_trace(go.Scatter(x=data['日期'], y=data['MACD_Signal'], mode='lines', name='MACD_Signal', visible='legendonly'))

    fig.update_layout(  title={
            'text': f"{title}<br><sup>{subtitle}</sup>",
            'y':0.9,
            'x':0.5,
            'xanchor': 'center',
            'yanchor': 'top'
        }, xaxis_title='日期', yaxis_title='價格', width=1200, height=800,)
    fig.update_xaxes(range=[data['日期'].iloc[-predict_days-30], data['日期'].iloc[-1]])
    fig.update_yaxes(autorange=True)
    fig.show()

# 繪製合併後的資料
plot_combined_candlestick(combined_data, f'{target_code} 歷史資料與未來{predict_days}天預測 K 線圖', f'製圖時間：{datetime.datetime.now().strftime("%Y-%m-%d")}')

