In [None]:
import json
import shioaji as sj

with open('API.json', 'r', encoding='utf-8') as f:
    s = json.load(f)
api = sj.Shioaji(simulation=True)
accounts = api.login(api_key=s['api_key'], secret_key=s['secret_key'])
accounts_ca = api.activate_ca(s['ca_path'], s['ca_passwd'], s['person_id'])

In [None]:
import os
import time
import threading
from collections import deque
from datetime import datetime, timedelta, time as dt_time
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed
from shioaji import Shioaji

# ===== 抓取資料 =====
# 資料設定
FETCH_TYPE    = "TICKS"  # "KBARS"/"TICKS"/"BOTH"
FETCH_FUTURES = True
START_DATE    = "2024-03-12"
END_DATE      = "2024-03-29"
contract      = api.Contracts.Futures.TXF.TXFR1
STOCKS_TO_FETCH = ["2330", "2454", "2317", "2881", "2412", "2382", "2308", "2882", "2891", "3711"]
SAVE_DIR      = r"C:\Users\USER\Desktop\雲端同步\Shioaji\Backtesting"
# ===== 速率限制器與全域計數設定 =====
class RateLimiter:
    def __init__(self, limit, window):
        self.limit = limit
        self.window = window
        self.calls = deque()
        self.lock = threading.Lock()
    
    def acquire(self):
        with self.lock:
            now = time.time()
            while self.calls and now - self.calls[0] > self.window:
                self.calls.popleft()
            if len(self.calls) >= self.limit:
                sleep_t = self.window - (now - self.calls[0]) + 0.01
                time.sleep(sleep_t)
                now = time.time()
                while self.calls and now - self.calls[0] > self.window:
                    self.calls.popleft()
            self.calls.append(now)

limiters = {
    'market': RateLimiter(limit=50, window=5),  
    'account': RateLimiter(limit=25, window=5),  
    'order': RateLimiter(limit=250, window=10),  
}

market_limits = {
    'ticks': 10,   
    'kbars': 50, 
}
market_counts = {k: 0 for k in market_limits}

daily_limits = {
    'login': 1000,
    'kbars': 1000
}

daily_counts = {k: 0 for k in daily_limits}

max_connections = 5
current_connections = 0
conn_lock = threading.Lock()

max_subscriptions = 200
subscription_count = 0
sub_lock = threading.Lock()

def is_trading_hours():
    now = datetime.now().time()
    return dt_time(9, 0) <= now <= dt_time(13, 30)

ENDPOINT_GROUP = {
    'credit_enquires': 'market',
    'short_stock_sources': 'market',
    'snapshots': 'market',
    'ticks': 'market',
    'kbars': 'market',
    'list_profit_loss_detail': 'account',
    'account_balance': 'account',
    'list_settlements': 'account',
    'list_profit_loss': 'account',
    'list_positions': 'account',
    'margin': 'account',
    'place_order': 'order',
    'update_status': 'order',
    'cancel_order': 'order',
}

# 包裝 API 方法：檢查方法是否存在，並進行速率限制
for name, grp in ENDPOINT_GROUP.items():
    if hasattr(api, name): 
        orig = getattr(api, name)
        def make_wrapper(func, endpoint, group):
            def wrapper(*args, **kwargs):
                limiters[group].acquire()
        
                if endpoint in ('ticks', 'kbars') and is_trading_hours():
                    if endpoint not in market_counts:
                        market_counts[endpoint] = 0
                    if market_counts[endpoint] >= market_limits[endpoint]:
                        raise RuntimeError(f"{endpoint} 盤中次數已達上限")
                    market_counts[endpoint] += 1
        
                return func(*args, **kwargs)
            return wrapper
        setattr(api, name, make_wrapper(orig, name, grp))
    else:
        print(f"方法 {name} 不存在於 API 中")

# ===== 資料檢查 =====
def load_existing(path, date_col):
    if os.path.exists(path):
        df = pd.read_parquet(path)
        existing = set(df[date_col].dt.date.astype(str))
    else:
        df = pd.DataFrame()
        existing = set()
    return df, existing

kbars_df, existing_kbar_dates = load_existing(
    os.path.join(SAVE_DIR, "txf_kbars.parquet"), 'ts'
)
ticks_df, existing_tick_dates = load_existing(
    os.path.join(SAVE_DIR, "txf_ticks.parquet"), 'ts'
)
stock_exist = {}
for sid in STOCKS_TO_FETCH:
    _, exist = load_existing(os.path.join(SAVE_DIR,f"stock_kbars_{sid}.parquet"), 'ts')
    stock_exist[sid] = exist

# 建日期列表
dates = []
s = datetime.strptime(START_DATE,"%Y-%m-%d")
e = datetime.strptime(END_DATE,  "%Y-%m-%d")
while s <= e:
    dates.append(s.strftime("%Y-%m-%d"))
    s += timedelta(days=1)

# ===== 抓取函式 =====
def fetch_future(d):
    res = {'date': d, 'kbar': None, 'tick': None}
    if FETCH_FUTURES and FETCH_TYPE in ("KBARS", "BOTH") and d not in existing_kbar_dates:
        k = api.kbars(contract=contract, start=d, end=d)
        if k:
            df = pd.DataFrame({**k})
            df.ts = pd.to_datetime(df.ts)
            res['kbar'] = df
    if FETCH_FUTURES and FETCH_TYPE in ("TICKS", "BOTH") and d not in existing_tick_dates:
        t = api.ticks(contract=contract, date=d)
        if t:
            df = pd.DataFrame({**t})
            df.ts = pd.to_datetime(df.ts)
            res['tick'] = df
    return res

def fetch_stock(sid, d):
    if d in stock_exist[sid]:
        return None
    c = api.Contracts.Stocks.TSE[sid]
    k = api.kbars(c, start=d, end=d)
    if k:
        df = pd.DataFrame({**k})
        df.ts = pd.to_datetime(df.ts)
        return df
    return None

# ===== 執行抓取 & 儲存 =====
if FETCH_FUTURES:
    new_kbars, new_ticks = [], []
    with ThreadPoolExecutor(max_workers=2) as executor:
        futures = {executor.submit(fetch_future, d): d for d in dates}
        for fut in as_completed(futures):
            res = fut.result()
            d = res['date']
            if res['kbar'] is not None:
                new_kbars.append(res['kbar'])
                print(f"[TXF] {d} kbars: {len(res['kbar'])}")
            else:
                print(f"[TXF] {d} kbars 已存在或無資料")
            if res['tick'] is not None:
                new_ticks.append(res['tick'])
                print(f"[TXF] {d} ticks: {len(res['tick'])}")
            else:
                print(f"[TXF] {d} ticks 已存在或無資料")
    # 儲存期貨
    if new_kbars:
        df = pd.concat([kbars_df] + new_kbars).drop_duplicates('ts').sort_values('ts')
        df.to_parquet(os.path.join(SAVE_DIR, "txf_kbars.parquet"), index=False)
    if new_ticks:
        df = pd.concat([ticks_df] + new_ticks).drop_duplicates('ts').sort_values('ts')
        df.to_parquet(os.path.join(SAVE_DIR, "txf_ticks.parquet"), index=False)
else:
    print("已設定不抓取台指期資料 (FETCH_FUTURES=False)")

# 各股票 KBARS
if FETCH_TYPE in ("KBARS", "BOTH"):
    for sid in STOCKS_TO_FETCH:
        stock_path = os.path.join(SAVE_DIR, f"stock_kbars_{sid}.parquet")
        stock_df, _ = load_existing(stock_path, 'ts')
        new_stock_kbars = []
        with ThreadPoolExecutor(max_workers=3) as executor:
            futures = {executor.submit(fetch_stock, sid, d): d for d in dates}
            for fut in as_completed(futures):
                df = fut.result()
                d = futures[fut]
                if df is not None:
                    new_stock_kbars.append(df)
                    print(f"[{sid}] {d} kbars: {len(df)}")
                else:
                    print(f"[{sid}] {d} 已存在或無資料")
        if new_stock_kbars:
            merged = pd.concat([stock_df] + new_stock_kbars).drop_duplicates('ts').sort_values('ts')
            merged.to_parquet(stock_path, index=False)


In [27]:
import polars as pl
from datetime import datetime

# 設定時間範圍
EXPECTED_KBARS = 300

# ===== 讀取 KBARS =====
kbars_df = (
    pl.read_parquet("txf_kbars.parquet")
    .filter(
        (pl.col("ts").dt.time() >= datetime.strptime("08:45", "%H:%M").time()) &
        (pl.col("ts").dt.time() <= datetime.strptime("13:45", "%H:%M").time())
    )
    .with_columns(pl.col("ts").dt.date().alias("date"))
    .group_by("date")
    .agg(pl.len().alias("kbar_count"))
)

# ===== 讀取 TICKS =====
ticks_df = (
    pl.read_parquet("txf_ticks.parquet")
    .filter(
        (pl.col("ts").dt.time() >= datetime.strptime("08:45", "%H:%M").time()) &
        (pl.col("ts").dt.time() <= datetime.strptime("13:45", "%H:%M").time())
    )
    .with_columns(pl.col("ts").dt.date().alias("date"))
    .group_by("date")
    .agg(pl.len().alias("tick_count"))
)

# ===== 合併比對 =====
summary_df = (
    kbars_df.join(ticks_df, on="date", how="full")
    .with_columns([
        (pl.col("kbar_count") == EXPECTED_KBARS).fill_null(False).alias("kbar_ok"),
        (pl.col("tick_count") > 0).fill_null(False).alias("tick_ok"),
    ])
    .with_columns([
        (pl.col("kbar_ok") & pl.col("tick_ok")).alias("both_ok")
    ])
    .sort("date")
)

# ===== 排序 + 輸出 CSV =====
summary_df = summary_df.sort("date")

# 輸出為 CSV
summary_df.write_csv("kbars_ticks_check_2024.csv")


In [39]:
import os
import polars as pl
from datetime import datetime

STOCKS_TO_FETCH = ["2330", "2454", "2317", "2881", "2412", "2382", "2308", "2882", "2891", "3711"]
SAVE_DIR = r"C:\Users\USER\Desktop\雲端同步\Shioaji\Backtesting"
EXPECTED_COUNT = 300
all_missing = []

# 逐檔讀取並檢查
for stock in STOCKS_TO_FETCH:
    file_path = os.path.join(SAVE_DIR, f"stock_kbars_{stock}.parquet")
    if not os.path.exists(file_path):
        continue

    df = (
        pl.read_parquet(file_path)
        .filter(
            (pl.col("ts").dt.time() >= datetime.strptime("09:00", "%H:%M").time()) &
            (pl.col("ts").dt.time() <= datetime.strptime("13:30", "%H:%M").time())
        )
        .with_columns(pl.col("ts").dt.date().alias("date"))
        .group_by("date")
        .agg(pl.len().alias("count"))
        .filter(pl.col("count") != EXPECTED_COUNT)
        .with_columns(pl.lit(stock).alias("stock"))
    )
    if df.height > 0:
        all_missing.append(df)

# 合併所有缺漏記錄
if all_missing:
    summary = pl.concat(all_missing).sort(["date", "stock"])
    summary.write_csv(os.path.join(SAVE_DIR, "缺漏報表.csv"))
else:
    print(" 所有檔案資料完整無缺漏")


In [None]:
api.usage()
api.logout()