<a href="https://colab.research.google.com/github/adrianwu8516/PersonalQuantResearch/blob/main/TQQQLab_ver1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
# 第一次嘗試，只用 TQQQ 數據跑模型，透過 ChatGPT vide coding


# 來源：你的 TQQQ CSV 的 Google Drive file id（請確保「知道連結者可檢視」）
SOURCE_DRIVE_FILE_ID = "1QrHvtRjFOY0Hq7EzK3LmWH47FL7U91ej"

# 目的地：上傳結果 CSV 的 Google Drive folder id
DESTINATION_FOLDER_ID = "1QhdQskcFWmODDxxtRjcXzUTK0Y8ckcfh"

# 是否要做「隔日暴跌」預警（T-1 發警示）。True=做隔日，False=做當日
PREDICT_NEXT_DAY = True

# 極端下跌的門檻（標準差倍數）。2.0 是常見設定
SIGMA = 2.0

# 訓練/測試切割比例（時間序列切割）
SPLIT_RATIO = 0.7

In [12]:
!pip -q install pandas numpy scikit-learn gdown yfinance

In [27]:
%%writefile tqqq_warning_models.py
import argparse, os
from typing import Tuple
import numpy as np
import pandas as pd

def download_drive_csv(file_id: str, out_path: str) -> str:
    try:
        import gdown
    except ImportError as e:
        raise RuntimeError("請先安裝 gdown：!pip install gdown") from e
    url = f"https://drive.google.com/uc?id={file_id}"
    gdown.download(url, out_path, quiet=False)
    if not os.path.exists(out_path) or os.path.getsize(out_path) == 0:
        raise RuntimeError("下載失敗，請確認 Google Drive 檔案已設為『知道連結者可檢視』。")
    return out_path

def load_prices(csv_path: str) -> pd.DataFrame:
    df = pd.read_csv(csv_path)
    df.columns = [c.strip().lower() for c in df.columns]
    # 日期欄位
    if 'date' in df.columns:
        df['date'] = pd.to_datetime(df['date'])
    elif 'timestamp' in df.columns:
        df['date'] = pd.to_datetime(df['timestamp'])
    else:
        raise ValueError("需要 'Date' 或 'Timestamp' 欄位。")
    # 價格欄位
    price_col = None
    for c in ['adj close','adj_close','adjclose','close']:
        if c in df.columns:
            price_col = c; break
    if price_col is None:
        raise ValueError("需要 'Adj Close' 或 'Close' 欄位。")
    # 其他欄位（可選）
    high_col = 'high' if 'high' in df.columns else None
    low_col  = 'low'  if 'low'  in df.columns else None
    vol_col  = 'volume' if 'volume' in df.columns else None

    use = ['date', price_col]
    if high_col: use.append(high_col)
    if low_col:  use.append(low_col)
    if vol_col:  use.append(vol_col)
    df = df[use].copy().sort_values('date').reset_index(drop=True)
    df.rename(columns={price_col:'price'}, inplace=True)
    if high_col: df.rename(columns={high_col:'high'}, inplace=True)
    if low_col:  df.rename(columns={low_col:'low'}, inplace=True)
    if vol_col:  df.rename(columns={vol_col:'volume'}, inplace=True)
    return df

def build_features(df: pd.DataFrame) -> pd.DataFrame:
    out = df.copy()
    out['ret'] = out['price'].pct_change()
    for k in [1,2,3]:
        out[f'ret_lag{k}'] = out['ret'].shift(k)
    for w in [5,20,50]:
        out[f'sma{w}'] = out['price'].rolling(w).mean()
        out[f'vol{w}'] = out['ret'].rolling(w).std()
    out['px_vs_sma20'] = out['price']/out['sma20'] - 1
    out['px_vs_sma50'] = out['price']/out['sma50'] - 1
    if 'volume' in out.columns:
        out['vol_ratio5'] = out['volume']/out['volume'].rolling(5).mean()
    else:
        out['vol_ratio5'] = np.nan
    if 'high' in out.columns and 'low' in out.columns:
        out['range'] = (out['high']-out['low'])/out['price']
    else:
        out['range'] = np.nan
    return out

def make_labels(df_feat: pd.DataFrame, sigma=2.0):
    std = df_feat['ret'].std(skipna=True)
    thr = sigma * std
    y = (df_feat['ret'] <= -thr).astype(int)   # 同日極端下跌（不做隔日）
    return y, thr

def prepare_xy(df_feat: pd.DataFrame, sigma=2.0):
    y, thr = make_labels(df_feat, sigma=sigma)
    feat_cols = [
        'ret_lag1','ret_lag2','ret_lag3',
        'sma5','sma20','sma50',
        'vol5','vol20','vol50',
        'px_vs_sma20','px_vs_sma50',
        'vol_ratio5','range'
    ]
    X = df_feat[feat_cols]
    m = X.notna().all(axis=1) & y.notna()
    X = X[m]; y = y[m]
    dates = df_feat.loc[m,'date']
    rets  = df_feat.loc[m,'ret']
    return X, y, dates, rets, thr

def train_models(X_train, y_train):
    from sklearn.ensemble import RandomForestClassifier
    # 高精度：balanced_subsample（偏少報警、高準度）
    high = RandomForestClassifier(
        n_estimators=200,
        class_weight='balanced_subsample',
        random_state=42, n_jobs=-1
    )
    # 低精度（高召回）：淺樹 + balanced（偏多報警、抓更多）
    low = RandomForestClassifier(
        n_estimators=200,
        max_depth=4,
        min_samples_leaf=5,
        class_weight='balanced',
        random_state=42, n_jobs=-1
    )
    high.fit(X_train, y_train)
    low.fit(X_train, y_train)
    return high, low

def generate_warnings(model_name, pred, y_true, dates, rets):
    tag = '高精度警示' if model_name=='high' else '低精度警示'
    idx = np.where(pred==1)[0]
    rows = []
    for i in idx:
        rows.append({
            'Date': dates.iloc[i].strftime('%Y-%m-%d'),
            'Model': tag,
            'PredictedExtreme': int(pred[i]),
            'ActualExtreme': int(y_true.iloc[i]),
            'ReturnPct': rets.iloc[i]*100.0
        })
    return pd.DataFrame(rows)

def main():
    parser = argparse.ArgumentParser(description='Detect same-day extreme down with two RandomForest models and output ONE CSV.')
    parser.add_argument('--input', help='本機 CSV 路徑')
    parser.add_argument('--drive-id', help='Google Drive 檔案 ID（需公開分享）')
    parser.add_argument('--sigma', type=float, default=2.0, help='標準差倍數門檻，預設 2.0')
    parser.add_argument('--split', type=float, default=0.7, help='時間切割比例，預設 0.7')
    parser.add_argument('--output', required=True, help='輸出警示 CSV 路徑（單一檔案）')
    args = parser.parse_args()

    if not args.input and not args.drive_id:
        raise SystemExit("請提供 --input 或 --drive-id 其一。")

    tmp_path = None
    if args.drive_id:
        tmp_path = '/content/_drive_data.csv'
        download_drive_csv(args.drive_id, tmp_path)
        csv_path = tmp_path
    else:
        csv_path = args.input

    df = load_prices(csv_path)
    df_feat = build_features(df)
    X, y, dates, rets, thr = prepare_xy(df_feat, sigma=args.sigma)

    cut = int(len(X)*args.split)
    X_train, X_test = X.iloc[:cut], X.iloc[cut:]
    y_train, y_test = y.iloc[:cut], y.iloc[cut:]
    dates_test = dates.iloc[cut:]
    rets_test = rets.iloc[cut:]

    high, low = train_models(X_train, y_train)
    pred_high = high.predict(X_test)
    pred_low  = low.predict(X_test)

    warn_high = generate_warnings('high', pred_high, y_test, dates_test, rets_test)
    warn_low  = generate_warnings('low',  pred_low,  y_test, dates_test, rets_test)
    warnings_df = pd.concat([warn_high, warn_low], ignore_index=True).sort_values('Date').reset_index(drop=True)

    os.makedirs(os.path.dirname(args.output), exist_ok=True)
    warnings_df.to_csv(args.output, index=False)
    print("Saved CSV to", args.output)
    if tmp_path and os.path.exists(tmp_path):
        os.remove(tmp_path)

if __name__ == '__main__':
    main()



In [29]:
SOURCE_DRIVE_FILE_ID = "1QrHvtRjFOY0Hq7EzK3LmWH47FL7U91ej"  # 你的 TQQQ CSV（需「知道連結者可檢視」）
OUTPUT_CSV = "/content/tqqq_warnings_same_day.csv"

!python tqqq_warning_models.py \
  --drive-id {SOURCE_DRIVE_FILE_ID} \
  --sigma 2.0 \
  --split 0.7 \
  --output {OUTPUT_CSV}

Downloading...
From: https://drive.google.com/uc?id=1QrHvtRjFOY0Hq7EzK3LmWH47FL7U91ej
To: /content/_drive_data.csv
  0% 0.00/846k [00:00<?, ?B/s]100% 846k/846k [00:00<00:00, 19.4MB/s]


In [30]:
from google.colab import auth
auth.authenticate_user()

from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
import datetime as dt, os

def upload_to_drive_folder(file_path, folder_id, new_name=None, mime_type="text/csv"):
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"File not found: {file_path}")

    # 產生預設檔名（附時間戳）以免覆蓋
    if new_name is None:
        stamp = dt.datetime.now().strftime("%Y%m%d_%H%M")
        base = os.path.splitext(os.path.basename(file_path))[0]
        new_name = f"{base}_{stamp}.csv"

    service = build("drive", "v3")
    body = {"name": new_name, "parents": [folder_id]}
    media = MediaFileUpload(file_path, mimetype=mime_type, resumable=True)

    file = service.files().create(
        body=body, media_body=media, fields="id, webViewLink"
    ).execute()

    print(f"Uploaded: {file['webViewLink']}  (fileId={file['id']})")
    return file

# ---- 使用範例（只上傳同日輸出）----
# 確保你在前面的步驟有設定：
# OUT_SAME_DAY = "/content/tqqq_warnings_same_day.csv"
# DESTINATION_FOLDER_ID = "<你的 folder id>"
if os.path.exists(OUT_SAME_DAY):
    upload_to_drive_folder(
        OUT_SAME_DAY,
        DESTINATION_FOLDER_ID,
        new_name=None  # 不指定則自動加上時間戳
    )
else:
    print(f"Output not found: {OUT_SAME_DAY}")

Uploaded: https://drive.google.com/file/d/1KmTFdc1io0pum2B5_TKwGiLYGErw9VPR/view?usp=drivesdk  (fileId=1KmTFdc1io0pum2B5_TKwGiLYGErw9VPR)


In [23]:
!grep -n "predict-" tqqq_warning_models.py || true
!sed -i 's/args\.predict-full/args.predict_full/g' tqqq_warning_models.py

157:    parser.add_argument('--predict-full', action='store_true', help='訓練後對「全期間」都做推論並輸出警示（含訓練段與測試段）')
195:    if args.predict-full:
