# **Download and Convert Stock Price**
- 使用 shioaji API 來下載台股的分 K 資料到 local 資料夾，並存儲為 .csv 檔案
  * 路徑為 config.DATA_DIR ，預設為 data
- 若已經有舊的資料，會根據 local 端的資料新舊程度進行更新
- 若每日流量已經用完，也會登出結束程式
- 每個有登入 shioaji API 的 block 都要記得登出，否則帳號容易被暫時停用，詳情請見 https://sinotrade.github.io/zh_TW/tutor/login/


### Import package

In [None]:
import shioaji as sj
import pandas as pd
import os
import datetime
import re
import json5
from config import config

## **Download**

### 取得股票種類之對照資訊
* 路徑為 config.STOCK_CATEGORY ，預設為 data/stock_category.json5

In [None]:
with open(config.STOCK_CATEGORY, "r", encoding="utf-8") as f:
    stock_category = json5.load(f)

### 取得 API 金鑰資訊

In [None]:
# 建立 Shioaji api 物件
api = sj.Shioaji()

# API key 以檔案形式存在，預設為 api.key
with open(config.API_KEY, "r") as f:
    api_key = f.readline().strip()
    secret_key = f.readline().strip()

### 取得台股股號名稱對照表
* 路徑為 config.STOCK_SYMBOL_MAPPING ，預設為 data/stock_symbol_mapping.json5

In [None]:
accounts = api.login(api_key=api_key, secret_key=secret_key)

all_contracts = []

# 上市股票
all_contracts.extend(api.Contracts.Stocks.TSE)

# 上櫃股票
all_contracts.extend(api.Contracts.Stocks.OTC)

# 大盤指數
all_contracts.append(api.Contracts.Indexs.TSE["001"])

# 建立股號股名對照表
stock_symbol_mapping = {item.code: item.name for item in all_contracts}
stock_symbol_mapping = dict(sorted(stock_symbol_mapping.items()))

# 將對照表存儲為 JSON5 檔案
with open(config.STOCK_SYMBOL_MAPPING, "w", encoding="utf-8") as f:
    json5.dump(stock_symbol_mapping, f, ensure_ascii=False, indent=4)
    print(f"對照表已經儲存至 {config.STOCK_SYMBOL_MAPPING}")

api.logout()

### 自訂台股下載列表
* 表列上市櫃股票和大盤指數，避免下載期權和乙特等特殊股票

In [None]:
accounts = api.login(api_key=api_key, secret_key=secret_key)

TSE_contract_list = api.Contracts.Stocks.TSE
OTC_contract_list = api.Contracts.Stocks.OTC

# 取得代下載的列表
contract_list = []

# 上市股票
option_code = [key for key, value in stock_category["TSE"].items() if value == "期權"]
for contract in TSE_contract_list:
    if contract.category not in option_code and re.match("^[0-9]+$", contract.code):
        contract_list.append(contract)

# 上櫃股票
option_code = [key for key, value in stock_category["OTC"].items() if value == "期權"]
for contract in OTC_contract_list:
    if contract.category not in option_code and re.match("^[0-9]+$", contract.code):
        contract_list.append(contract)

# 大盤指數
contract_list.append(api.Contracts.Indexs.TSE["001"])

# 排序
contract_list = sorted(contract_list, key=lambda x: x.code)

api.logout()

### 下載分 K 資料
* 自動比較更新時間
* 超過每日流量上限自動結束並登出

In [None]:
start_time = datetime.datetime.now()
today_date = start_time
today_str = today_date.strftime("%Y-%m-%d")

accounts = api.login(api_key=api_key, secret_key=secret_key)
downloadable = True

for contract in contract_list:
    # 股票資料檔案名稱
    stock_file = f"{config.DATA_DIR}/{contract.code}_min.csv"

    start_date_str = config.SHIOAJI_START_DATE
    start_date = datetime.datetime.strptime(start_date_str, "%Y-%m-%d")
    if os.path.isfile(stock_file):
        # 讀取本地股票資料
        print(f"讀取{contract.name} ({contract.code}) 的資料")
        try:
            df = pd.read_csv(stock_file)
            df.ts = pd.to_datetime(df.ts)
            if not df.empty:
                start_date_str = (
                    df.iloc[-1]["ts"] + datetime.timedelta(days=1)
                ).strftime("%Y-%m-%d")
                start_date = datetime.datetime.strptime(
                    f"{start_date_str} 08:00", "%Y-%m-%d %H:%M"
                )
        except Exception:
            print(f"{stock_file}讀取失敗")
            api.logout()
            raise Exception

    # 若本地股票資料已經更新過就不再更新
    if start_date >= today_date:
        print(
            f"跳過{contract.name} ({contract.code}) 因為已經擁有 {start_date_str} 之前的資料"
        )
        continue

    success = False
    while not success:
        print(
            f"嘗試取得{contract.name} ({contract.code}) 從 {start_date_str} 到 {today_str} 的資料"
        )

        # 取得 K 棒資料
        stock_get_start = datetime.datetime.now()
        kbars = api.kbars(contract=contract, start=start_date_str, end=today_str)
        stock_get_end = datetime.datetime.now()
        print(f"共花費: {(stock_get_end - stock_get_start).total_seconds()} 秒")
        # 將資料轉換成 DataFrame
        df = pd.DataFrame({**kbars})
        if df.empty:
            usage_bytes = api.usage().bytes
            if usage_bytes < 524288000:
                print(f"今日已使用 {usage_bytes} B")
                break
            else:
                print(f"今日已使用 {usage_bytes} B")
                downloadable = False
                api.logout()
        else:
            success = True

    if not downloadable:
        break

    if not success:
        print(
            f"跳過{contract.name} ({contract.code}) 因為從 {start_date_str} 到 {today_str} 的資料取得失敗"
        )
        continue

    if os.path.isfile(stock_file):
        # 讀取本地股票資料
        print(f"再次讀取{contract.name} ({contract.code}) 的資料")
        try:
            df_old = pd.read_csv(stock_file)
            print("合併資料")
            merged_df = pd.concat([df_old, df], axis=0)
            df = merged_df
        except Exception:
            print(f"{stock_file}讀取失敗")
            api.logout()
            raise Exception

    # 將資料儲存到本地股票資料
    try:
        print("將資料存儲到本地股票資料")
        df.to_csv(stock_file, index=False)
    except Exception:
        print(f"{stock_file} 儲存失敗")
        try:
            os.remove(stock_file)
        except OSError:
            pass

api.logout()
end_time = datetime.datetime.now()
print(f"結束: {end_time.strftime('%Y-%m-%d %H:%M:%S')}")

total_time = (end_time - start_time).total_seconds()
print(f"程式共花費: {total_time} 秒")

## **Convert**

### 以分 K 資料生成日 K
* 因為 multiple process 搭配 Jupyter Notebook 有一些 OS 不支援，因此把相關程式碼寫在 convert_stock_price.py ，再去執行他

In [None]:
import subprocess

# 開始執行時間
start_time = datetime.datetime.now()
print(f"開始: {start_time.strftime('%Y-%m-%d %H:%M:%S')}")

print("執行: python convert_stock_price.py")
os.system("python convert_stock_price.py")
# result = subprocess.run(
#     ["python", "convert_stock_price.py"], capture_output=True, text=True
# )

# # 輸出命令的標準輸出和標準錯誤
# print("標準輸出:", result.stdout)
# print("標準錯誤:", result.stderr)

# 結束執行時間
end_time = datetime.datetime.now()
print(f"結束: {end_time.strftime('%Y-%m-%d %H:%M:%S')}")

# 計算執行時間
total_time = (end_time - start_time).total_seconds()
print(f"程式共花費: {total_time} 秒")