In [14]:
# Block 1 — Config & Login
# =======================================
import os, json
from datetime import datetime
from FiinQuantX import FiinSession

# -------------------
# Step 1: Tạo config.json nếu chưa có
# -------------------
default_config = {
    "username": "DSTC_18@fiinquant.vn",
    "password": "Fiinquant0606",
    "batch_size": 50,
    "freq_per_day": 2,
    "execution_lag": 2,
    "quota_monthly": 480,
    "telegram_token": "",
    "telegram_chat_id": "",
    "google_sheet_id": "",
    "google_service_json": "service_account.json",
    "RSI_buy_threshold": 35,
    "vol_spike_mult": 1.5,
    "macd_diff_thresh": 0,
    "rule_min_score": 3,
    "alert_min_alloc": 0.005,
    "cooldown_hours": 24
}

if not os.path.exists("config.json"):
    with open("config.json","w") as f:
        json.dump(default_config, f, indent=2)
    print("⚠️ config.json not found. Default file created.")

# -------------------
# Step 2: Load config
# -------------------
with open("config.json","r") as f:
    config = json.load(f)

username = config.get("username")
password = config.get("password")

# -------------------
# Step 3: Login FiinQuant
# -------------------
try:
    client = FiinSession(username, password).login()
    print("✅ Login FiinQuant thành công")
except Exception as e:
    raise RuntimeError(f"❌ Login FiinQuant thất bại: {e}")

# -------------------
# Step 4: Init Telegram (nếu có)
# -------------------
bot, CHAT_ID = None, None
if config.get("telegram_token") and config.get("telegram_chat_id"):
    import telegram
    try:
        bot = telegram.Bot(token=config["telegram_token"])
        CHAT_ID = config["telegram_chat_id"]
        print("✅ Telegram Bot ready")
    except Exception as e:
        print(f"⚠️ Telegram init lỗi: {e}")

# -------------------
# Step 5: Init Google Sheets (nếu có)
# -------------------
gs_client, sheet_id = None, None
if config.get("google_sheet_id") and os.path.exists(config.get("google_service_json","")):
    import gspread
    from oauth2client.service_account import ServiceAccountCredentials
    try:
        scope = ["https://spreadsheets.google.com/feeds","https://www.googleapis.com/auth/drive"]
        creds = ServiceAccountCredentials.from_json_keyfile_name(config["google_service_json"], scope)
        gs_client = gspread.authorize(creds)
        sheet_id = config["google_sheet_id"]
        print("✅ Google Sheets client ready")
    except Exception as e:
        print(f"⚠️ Google Sheets init lỗi: {e}")

# -------------------
# Step 6: Init runtime_state.json
# -------------------
runtime_file = "runtime_state.json"
if not os.path.exists(runtime_file):
    runtime_state = {
        "api_used": 0,
        "month": datetime.now().strftime("%Y-%m"),
        "last_universe": None,
        "last_run_time": None,
        "last_alerts": {}
    }
    with open(runtime_file, "w") as f:
        json.dump(runtime_state, f, indent=2)
    print("✅ runtime_state.json created")
else:
    with open(runtime_file, "r") as f:
        runtime_state = json.load(f)
    print("ℹ️ Loaded existing runtime_state.json")

# -------------------
# Done
# -------------------
print("🎯 Block 1 hoàn tất: Config & Login")

✅ Login FiinQuant thành công
ℹ️ Loaded existing runtime_state.json
🎯 Block 1 hoàn tất: Config & Login


In [6]:
# Block 2 — Universe Selection (Monthly job)
# ================================================

import pandas as pd
import numpy as np
import os, json
from datetime import datetime

# ---- Step 1: Lấy danh sách tickers toàn thị trường ----
tickers_hose  = list(client.TickerList(ticker="VNINDEX"))
tickers_all = list(set(tickers_hose))
print(f"Tổng số tickers toàn thị trường: {len(tickers_all)}")

# ---- Step 2: Lấy dữ liệu OHLCV 6 tháng gần nhất ----
ohlcv = client.Fetch_Trading_Data(
    realtime=False,
    tickers=tickers_all,
    fields=["close","volume"],
    adjusted=True,
    by="1d",
    from_date="2025-03-01"   # ~6 tháng gần đây
).get_data()

ohlcv["timestamp"] = pd.to_datetime(ohlcv["timestamp"])

# ---- Step 3: Tính liquidity (20d avg value traded) ----
ohlcv["value"] = ohlcv["close"] * ohlcv["volume"]
liquidity_20d = ohlcv.groupby("ticker")["value"].rolling(20).mean().reset_index(level=0, drop=True)
latest_liquidity = liquidity_20d.groupby(ohlcv["ticker"]).last()

# ---- Step 4: Tính momentum (90d, 252d) ----
def calc_momentum(series, window):
    if len(series) < window: 
        return np.nan
    return series.iloc[-1] / series.iloc[-window] - 1

momentum_90d = ohlcv.groupby("ticker")["close"].apply(lambda x: calc_momentum(x, 90))
momentum_252d = ohlcv.groupby("ticker")["close"].apply(lambda x: calc_momentum(x, 252))

# ---- Step 5: Lấy dữ liệu FA (check FA flag) ----
fa_flags = {}
for t in tickers_all:
    try:
        fi_list = client.FundamentalAnalysis().get_ratios(
            tickers=[t],
            TimeFilter="Quarterly",
            LatestYear=2025,
            NumberOfPeriod=8,
            Consolidated=True
        )
        fa_flags[t] = 1 if fi_list else 0
    except Exception as e:
        print(f"⚠️ Lỗi khi lấy FA {t}: {e}")
        fa_flags[t] = 0

fa_flag_series = pd.Series(fa_flags, name="FA_flag")

# ---- Step 6: Gộp dữ liệu ----
df_universe = pd.DataFrame({
    "ticker": list(latest_liquidity.index),
    "liquidity_20d": latest_liquidity.values,
    "momentum_90d": momentum_90d.values,
    "momentum_252d": momentum_252d.values
}).set_index("ticker")

df_universe = df_universe.join(fa_flag_series, how="left").fillna(0)
df_universe = df_universe.reset_index()

# ---- Step 7: Ranking & chọn Universe ----
df_universe["liquidity_rank"] = df_universe["liquidity_20d"].rank(pct=True)
df_universe["momentum_rank"]  = df_universe["momentum_90d"].rank(pct=True)

# Composite score (simple average)
df_universe["score"] = (df_universe["liquidity_rank"] + df_universe["momentum_rank"]) / 2

# Chọn top 200
univ_final = df_universe.sort_values("score", ascending=False).head(200)

# ---- Step 8: Save results ----
os.makedirs("universe", exist_ok=True)

csv_file = "universe/universe_list.csv"
meta_file = "universe/universe_metadata.json"

univ_final.to_csv(csv_file, index=False)
print(f"💾 Saved {csv_file} with {len(univ_final)} tickers")

meta = {
    "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
    "universe_size": len(univ_final),
    "selection_method": "liquidity + momentum + FA_flag",
}
with open(meta_file, "w") as f:
    json.dump(meta, f, indent=4)

print(f"📘 Metadata saved: {meta_file}")
print("✅ Universe sample:")
print(univ_final.head())

Tổng số tickers toàn thị trường: 413
Fetching data, it may take a while. Please wait...
⚠️ Lỗi khi lấy FA FUETPVND: 'FUETPVND'
💾 Saved universe/universe_list.csv with 200 tickers
📘 Metadata saved: universe/universe_metadata.json
✅ Universe sample:
    ticker  liquidity_20d  momentum_90d  momentum_252d  FA_flag  \
385    VIX   1.456158e+12      1.821318            0.0        1   
50     CII   7.529223e+11      1.023096            0.0        1   
311    SSI   2.146069e+12      0.688285            0.0        1   
145    GEX   4.919533e+11      0.974061            0.0        1   
395    VPB   1.533244e+12      0.638814            0.0        1   

     liquidity_rank  momentum_rank     score  
385        0.990291       0.997573  0.993932  
50         0.970874       0.985437  0.978155  
311        0.997573       0.949029  0.973301  
145        0.946602       0.983010  0.964806  
395        0.992718       0.927184  0.959951  


In [15]:
# Cell: Build / Backfill OHLCV master parquet (compatible with pipeline)
# Place this cell BEFORE your main realtime pipeline cell (the 1000+ lines)
# ---------------------------
import os, json, time, math, traceback
from datetime import datetime, timedelta
from itertools import islice
import pandas as pd

# --- Config: chỉnh nếu cần hoặc tạo config.json với keys username/password ---
CONFIG_PATH = "config.json"
cfg = {}
if os.path.exists(CONFIG_PATH):
    with open(CONFIG_PATH,"r") as f:
        try:
            cfg = json.load(f)
        except:
            cfg = {}
# If config.json missing or no creds, fill here (you can keep them in config.json instead)
USERNAME = cfg.get("username") or "DSTC_18@fiinquant.vn"    # <-- hoặc để trống và sửa
PASSWORD = cfg.get("password") or "Fiinquant0606"          # <-- hoặc để trống và sửa

DATA_DIR = "./data"
os.makedirs(DATA_DIR, exist_ok=True)
MASTER_PARQUET = os.path.join(DATA_DIR, "ohlcv_master.parquet")

# Backfill params
BACKFILL_MONTHS = int(cfg.get("backfill_months", 6))   # 6 months default
BATCH_SIZE = int(cfg.get("batch_size", 50))
FIELDS = cfg.get("fetch_fields", ['open','high','low','close','volume'])  # thêm fields nếu muốn
FORCE_BACKFILL = bool(cfg.get("force_backfill", True))  # nếu True thì tải lại dù file đã tồn tại

# runtime_state file used by pipeline
RUNTIME_FILE = "runtime_state.json"
if os.path.exists(RUNTIME_FILE):
    with open(RUNTIME_FILE,"r") as f:
        runtime_state = json.load(f)
else:
    runtime_state = {"api_used":0, "month": datetime.now().strftime("%Y-%m"), "last_universe": None, "last_run_time": None, "last_alerts": {}}
    with open(RUNTIME_FILE,"w") as f:
        json.dump(runtime_state, f, indent=2)

def save_runtime_state():
    runtime_state["last_run_time"] = datetime.now().isoformat()
    with open(RUNTIME_FILE,"w") as f:
        json.dump(runtime_state, f, indent=2)

# --- Import FiinQuantX and login ---
try:
    from FiinQuantX import FiinSession
except Exception as e:
    raise ImportError("Không tìm thấy FiinQuantX. Cài đặt thư viện hoặc đặt đúng PYTHONPATH. Lỗi: " + str(e))

print("[init] Logging in to FiinQuant...")
client = None
try:
    client = FiinSession(username=USERNAME, password=PASSWORD).login()
    print("[init] Fiin client logged in -> client variable available")
except Exception as e:
    raise RuntimeError("Fiin login failed: " + str(e))

# make indicator helper available to pipeline
try:
    fi = client.FiinIndicator()
    print("[init] FiinIndicator available as fi")
except Exception:
    fi = None
    print("[init] Warning: client.FiinIndicator() not available (fi=None)")

# --- helper: chunk iterator ---
def chunked_iterable(iterable, size):
    it = iter(iterable)
    while True:
        batch = list(islice(it, size))
        if not batch:
            break
        yield batch

# --- helper: safe fetch per batch with retry ---
def fetch_batch(tickers_batch, from_date, to_date=None, retries=3, wait=2):
    """Return DataFrame or raise."""
    attempt = 0
    while attempt < retries:
        try:
            # Use Fetch_Trading_Data with realtime=False as in your examples
            params = {
                "realtime": False,
                "tickers": tickers_batch,
                "fields": FIELDS,
                "adjusted": True,
                "by": "1d",
                "from_date": from_date
            }
            if to_date:
                params["to_date"] = to_date
            res = client.Fetch_Trading_Data(**params).get_data()
            # Expect DataFrame-like
            if isinstance(res, pd.DataFrame):
                return res
            else:
                # try to coerce to DataFrame
                return pd.DataFrame(res)
        except Exception as e:
            attempt += 1
            print(f"[fetch_batch] attempt {attempt}/{retries} failed for batch size {len(tickers_batch)}: {e}")
            time.sleep(wait * attempt)
    raise RuntimeError(f"All retries failed for batch: {tickers_batch[:5]}...")

# --- main backfill routine ---
def build_master_ohlcv(tickers=None, months=BACKFILL_MONTHS, batch_size=BATCH_SIZE, force=FORCE_BACKFILL):
    """
    If MASTER_PARQUET exists and not force -> skip.
    Otherwise fetch historical data for tickers (chunked) from (today - months) to today,
    save to MASTER_PARQUET (parquet).
    Returns path or None.
    """
    if os.path.exists(MASTER_PARQUET) and (not force):
        print(f"[run] {MASTER_PARQUET} exists. Set force=True to overwrite.")
        return MASTER_PARQUET

    # If tickers not provided, try to read universe file (universe/universe_list.csv), else default to VN30 members
    if tickers is None:
        univ_path = os.path.join("universe","universe_list.csv")
        if os.path.exists(univ_path):
            try:
                tt = pd.read_csv(univ_path)["ticker"].tolist()
                tickers = [t for t in tt if isinstance(t, str)]
                print(f"[run] Loaded {len(tickers)} tickers from {univ_path}")
            except Exception as e:
                print("[run] cannot read universe file:", e)
                tickers = None
    if tickers is None:
        # fallback: try to request VN30 or VNINDEX tickers (example usage)
        try:
            print("[run] universe not provided -> trying client.TickerList(ticker='VNINDEX')")
            tlist = client.TickerList(ticker="VNINDEX")
            # TickerList may return list-like or objects; try to coerce
            if isinstance(tlist, (list,tuple,set)):
                tickers = list(tlist)
            else:
                # If object, try iterate
                tickers = list(tlist)
            print(f"[run] got {len(tickers)} tickers from VNINDEX")
        except Exception as e:
            print("[run] fallback failed, set tickers manually or create universe/universe_list.csv. Error:", e)
            raise RuntimeError("No tickers available to backfill")

    # compute from_date
    to_date = datetime.now().strftime("%Y-%m-%d")
    from_date_dt = datetime.now() - pd.DateOffset(months=months)
    from_date = from_date_dt.strftime("%Y-%m-%d")
    print(f"[backfill] requesting historical OHLCV for {len(tickers)} tickers from {from_date} to {to_date}")

    all_dfs = []
    failed = []
    calls = 0

    for batch in chunked_iterable(tickers, batch_size):
        try:
            df = fetch_batch(batch, from_date=from_date, to_date=to_date)
            calls += 1
            # normalize column names if needed
            if df is None or df.empty:
                print(f"[backfill] batch returned empty for {len(batch)} tickers")
                continue
            # Ensure ticker column exists
            if "ticker" not in df.columns and "ticker_code" in df.columns:
                df = df.rename(columns={"ticker_code":"ticker"})
            # Convert timestamp column name variations to 'timestamp'
            if "timestamp" in df.columns:
                df["timestamp"] = pd.to_datetime(df["timestamp"])
            elif "date" in df.columns:
                df["timestamp"] = pd.to_datetime(df["date"])
            elif "trade_date" in df.columns:
                df["timestamp"] = pd.to_datetime(df["trade_date"])
            else:
                # try to infer index datetime
                try:
                    df = df.reset_index()
                    if "index" in df.columns:
                        df = df.rename(columns={"index":"timestamp"})
                        df["timestamp"] = pd.to_datetime(df["timestamp"])
                except:
                    pass
            # ensure ticker column present by checking unique tickers in batch
            if "ticker" not in df.columns:
                # try to add ticker column if the API returned single-ticker DF
                if len(batch) == 1:
                    df["ticker"] = batch[0]
                else:
                    # attempt to detect ticker column name
                    for cand in ["symbol","code","ticker_code"]:
                        if cand in df.columns:
                            df = df.rename(columns={cand:"ticker"})
                            break
            # keep minimal columns and cast
            keep_cols = [c for c in ["timestamp","ticker","open","high","low","close","volume"] if c in df.columns]
            df = df[keep_cols].copy()
            all_dfs.append(df)
            print(f"[backfill] fetched batch {len(batch)} -> rows {len(df)}")
            # update runtime_state api count
            runtime_state["api_used"] = runtime_state.get("api_used",0) + 1
            save_runtime_state()
            time.sleep(0.5)  # small pause to be gentle
        except Exception as e:
            print(f"[backfill] batch failed: {e}")
            failed.extend(batch)
            # don't stop; continue with next batch
            time.sleep(1)

    # concat
    if len(all_dfs) == 0:
        print("[backfill] no data fetched.")
        return None
    df_master = pd.concat(all_dfs, ignore_index=True)
    # drop duplicate (timestamp, ticker)
    if "timestamp" in df_master.columns and "ticker" in df_master.columns:
        df_master = df_master.drop_duplicates(subset=["timestamp","ticker"])
    # sort
    if "timestamp" in df_master.columns:
        df_master = df_master.sort_values(["ticker","timestamp"]).reset_index(drop=True)
    # save parquet
    df_master.to_parquet(MASTER_PARQUET, index=False)
    print(f"[backfill] saved master parquet {MASTER_PARQUET} rows={len(df_master):,} api_calls={calls} failed_tickers={len(failed)}")
    if failed:
        print("[backfill] failed tickers sample:", failed[:50])
    return MASTER_PARQUET

# --- Run backfill if needed (auto) ---
if (not os.path.exists(MASTER_PARQUET)) or FORCE_BACKFILL:
    try:
        path = build_master_ohlcv(months=BACKFILL_MONTHS, batch_size=BATCH_SIZE, force=FORCE_BACKFILL)
        if path:
            print("[run] Master parquet ready at:", path)
    except Exception as e:
        print("[run] backfill error:", e)
else:
    print("[run] Master parquet already present at", MASTER_PARQUET, "- skipping backfill (set force=True to overwrite)")

# Export globals for downstream pipeline cells
# client and fi are defined above; ensure they are in global namespace for other cells to import/use.
print("Global objects available: client (FiinSession), fi (FiinIndicator or None).")

[init] Logging in to FiinQuant...
[init] Fiin client logged in -> client variable available
[init] FiinIndicator available as fi
[run] Loaded 200 tickers from universe\universe_list.csv
[backfill] requesting historical OHLCV for 200 tickers from 2025-03-21 to 2025-09-21
Fetching data, it may take a while. Please wait...
[backfill] fetched batch 50 -> rows 6242
Fetching data, it may take a while. Please wait...
[backfill] fetched batch 50 -> rows 6242
Fetching data, it may take a while. Please wait...
[backfill] fetched batch 50 -> rows 6250
Fetching data, it may take a while. Please wait...
[backfill] fetched batch 50 -> rows 6250
[backfill] saved master parquet ./data\ohlcv_master.parquet rows=24,984 api_calls=4 failed_tickers=0
[run] Master parquet ready at: ./data\ohlcv_master.parquet
Global objects available: client (FiinSession), fi (FiinIndicator or None).


In [32]:
# realtime_pipeline_hybrid.py
# Full pipeline: Block1..Block9 + stress-replay + monitoring
# Paste this entire file into a notebook cell or .py and run.
# WARNING: This script uses network APIs (FiinQuant, Telegram, Google Sheets).
# Make sure credentials files exist and environment is configured.

import os
import sys
import time
import json
import math
import gc
import csv
import traceback
from datetime import datetime, timedelta
import threading
from typing import Optional, Tuple, List

import numpy as np
import pandas as pd
import requests

# Try import torch (optional)
try:
    import torch
    import torch.nn as nn
    TORCH = True
except Exception:
    TORCH = False

# FiinQuantX client (user must have installed this lib)
from FiinQuantX import FiinSession, RealTimeData

# Google Sheets
import gspread
from gspread_dataframe import set_with_dataframe
from google.oauth2.service_account import Credentials

# ---------------------------
# CONFIG (edit if needed)
# ---------------------------
# Credentials provided by user (you gave these earlier)
FQ_USERNAME = "DSTC_18@fiinquant.vn"
FQ_PASSWORD = "Fiinquant0606"

# Google Sheets service account JSON (you said it's dstround3.json)
GS_SERVICE_JSON = "dstround3.json"
GS_SHEET_ID = "17FRrF63TFE3bmAseoV4vQK5EA9nYaTaRRnLyd1F8MGU"
GS_WORKSHEET = "DSTRound3"

# Telegram bot config (use supergroup chat id that worked for you)
TG_TOKEN = "8454050043:AAG_quR7eSALqh9WVRvx6DRZVxtRde_OpFQ"
TG_CHAT_ID = "-1002692813170"   # you confirmed this worked
TG_THREAD_ID = 2                # thread id (you asked to set 2); note: only used if available

# Pipeline params (tune)
BATCH_SIZE = 50                # max tickers per realtime stream
SNAPSHOT_SECONDS = 60          # total snapshot time (split across batches)
RSI_TH = 35
VOL_SPIKE_MULT = 1.5
RULE_MIN_SCORE = 3
MIN_ALLOC = 0.005              # 0.5% min allocation to alert
COOLDOWN_HOURS = 24
EXEC_LAG = 2                   # execution lag T+2
TOPK = 10
STATE_LKBK = 10

# Paths
DATA_DIR = "./data"
FEATURE_DIR = "./features"
SIGNAL_DIR = "./signals"
ALLOC_DIR = "./allocations"
ALERT_DIR = "./alerts"
MONITOR_DIR = "./monitor"
MODEL_DIR = "./models"
TENSOR_DIR = "./tensors"
UNIVERSE_DIR = "./universe"

# Ensure directories exist
for d in [DATA_DIR, FEATURE_DIR, SIGNAL_DIR, ALLOC_DIR, ALERT_DIR, MONITOR_DIR, MODEL_DIR, TENSOR_DIR, UNIVERSE_DIR]:
    os.makedirs(d, exist_ok=True)

RUNTIME_FILE = "runtime_state.json"
if os.path.exists(RUNTIME_FILE):
    with open(RUNTIME_FILE,"r") as f:
        runtime_state = json.load(f)
else:
    runtime_state = {"api_used":0, "month": datetime.now().strftime("%Y-%m"), "last_run_time": None, "last_alerts": {}}
    with open(RUNTIME_FILE,"w") as f:
        json.dump(runtime_state, f, indent=2)

def save_runtime_state():
    runtime_state["last_run_time"] = datetime.now().isoformat()
    with open(RUNTIME_FILE,"w") as f:
        json.dump(runtime_state, f, indent=2)

# ---------------------------
# INIT: clients
# ---------------------------
print("[init] Logging into FiinQuant...")
client = None
client_fq = None
fi = None
try:
    client = FiinSession(username=FQ_USERNAME, password=FQ_PASSWORD).login()
    if client is None:
        raise RuntimeError("FiinSession.login() returned None")
    client_fq = client
    # Correct way to get indicator helpers (client-provided)
    try:
        fi = client_fq.FiinIndicator()
    except Exception:
        # some older libs call FiinIndicator differently; try module-level fallback
        try:
            from FiinQuantX import FiinIndicator as FI_CLASS
            fi = FI_CLASS()
        except Exception:
            fi = None
    print("[init] Fiin client logged in. FiinIndicator ok?", fi is not None)
except Exception as e:
    print("[init] Fiin login failed:", e)
    client = None
    client_fq = None
    fi = None

# Google Sheets init
gs_client = None
gs_spread = None
gs_ws = None
if not os.path.exists(GS_SERVICE_JSON):
    print(f"[init] Google service JSON missing: {GS_SERVICE_JSON} — skip gsheet init")
else:
    try:
        gs_creds = Credentials.from_service_account_file(GS_SERVICE_JSON,
            scopes=["https://www.googleapis.com/auth/spreadsheets","https://www.googleapis.com/auth/drive"])
        gs_client = gspread.authorize(gs_creds)
        gs_spread = gs_client.open_by_key(GS_SHEET_ID)
        try:
            gs_ws = gs_spread.worksheet(GS_WORKSHEET)
        except Exception:
            # create worksheet if not present
            gs_ws = gs_spread.add_worksheet(title=GS_WORKSHEET, rows="1000", cols="20")
        print("[init] Google Sheets ready:", gs_spread.title)
    except Exception as e:
        print("[init] Google Sheets init error:", e)
        gs_client = None
        gs_spread = None
        gs_ws = None

# ---------------------------
# TELEGRAM helpers
# ---------------------------
def send_telegram_text(msg: str, thread_id: Optional[int] = None) -> bool:
    url = f"https://api.telegram.org/bot{TG_TOKEN}/sendMessage"
    payload = {"chat_id": TG_CHAT_ID, "text": msg, "parse_mode": "Markdown"}
    # optionally include thread id if you are certain the bot has permissions and the chat is forum
    if thread_id is not None:
        payload["message_thread_id"] = int(thread_id)
    try:
        r = requests.post(url, json=payload, timeout=12)
        if r.status_code != 200:
            # retry without parse_mode
            payload2 = {"chat_id": TG_CHAT_ID, "text": msg}
            r2 = requests.post(url, json=payload2, timeout=12)
            if r2.status_code != 200:
                print("[tg] send failed:", r.status_code, r.text)
                return False
            return True
        return True
    except Exception as e:
        print("[tg] exception send:", e)
        return False

def send_telegram_file(path: str, caption: str = "") -> bool:
    url = f"https://api.telegram.org/bot{TG_TOKEN}/sendDocument"
    data = {"chat_id": TG_CHAT_ID, "caption": caption}
    try:
        with open(path,"rb") as fh:
            files = {"document": fh}
            r = requests.post(url, data=data, files=files, timeout=30)
        if r.status_code != 200:
            print("[tg] sendDocument failed:", r.status_code, r.text)
            return False
        return True
    except Exception as e:
        print("[tg] sendDocument exception:", e)
        return False

# ---------------------------
# Google Sheets helpers
# ---------------------------
def append_to_gsheet(df: pd.DataFrame, worksheet_name: Optional[str] = None) -> bool:
    """
    Append dataframe to worksheet. If worksheet_name provided, attempt to open/create that worksheet.
    Otherwise append to default gs_ws.
    """
    if df is None or df.empty:
        return True
    try:
        if gs_client is None or gs_spread is None:
            print("[gs] gsheet client not initialized")
            return False
        ws = gs_ws
        if worksheet_name:
            try:
                ws = gs_spread.worksheet(worksheet_name)
            except Exception:
                ws = gs_spread.add_worksheet(title=worksheet_name, rows=str(len(df)+10), cols=str(len(df.columns)+5))
        # convert to list of lists
        rows = df.values.tolist()
        ws.append_rows(rows, value_input_option="RAW")
        return True
    except Exception as e:
        print("[gs] append error:", e)
        return False

def push_ohlcv_master_to_gsheet(master_parquet_path: str, worksheet_name: str = "OHLCV_Master") -> bool:
    """
    Push the master parquet (subset / last N rows) to Google Sheet.
    For large datasets don't push everything - push last 20000 rows or last 6 months.
    """
    try:
        if gs_client is None:
            print("[gs] gsheet client not initialized")
            return False
        df = pd.read_parquet(master_parquet_path)
        if df.empty:
            return True
        # push only last 20000 rows (safe)
        to_push = df.tail(20000).copy()
        # convert timestamp to string
        if "timestamp" in to_push.columns:
            to_push["timestamp"] = to_push["timestamp"].astype(str)
        # push into worksheet (overwrite if exists by creating new)
        try:
            ws = gs_spread.worksheet(worksheet_name)
            # clear and write header+data
            ws.clear()
        except Exception:
            ws = gs_spread.add_worksheet(title=worksheet_name, rows=str(len(to_push)+20), cols=str(len(to_push.columns)+5))
        set_with_dataframe(ws, to_push, include_index=False, include_column_header=True)
        print("[gs] pushed ohlcv master to sheet:", worksheet_name)
        return True
    except Exception as e:
        print("[gs] push ohlcv error:", e)
        return False

def push_csv_to_gsheet(csv_path: str, worksheet_name_prefix: str):
    """
    Push a small csv (signals/features) to a worksheet named worksheet_name_prefix + timestamp.
    """
    try:
        if not os.path.exists(csv_path):
            return False
        df = pd.read_csv(csv_path)
        sheet_name = f"{worksheet_name_prefix}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
        return append_to_gsheet(df, worksheet_name=sheet_name)
    except Exception as e:
        print("[gs] push csv error:", e)
        return False

# ---------------------------
# Utilities
# ---------------------------
def gfloat(x):
    try:
        return float(x)
    except Exception:
        return float("nan")

# ---------------------------
# Block 3: fetch_snapshot (realtime short stream, batch)
# ---------------------------
def fetch_snapshot(universe: List[str], seconds_snapshot: int = SNAPSHOT_SECONDS) -> Optional[str]:
    """
    Create short streaming snapshot across batches to collect the latest ticks/EOD data.
    Returns path to incremental CSV created (or None if nothing collected).
    """
    ts_tag = datetime.now().strftime("%Y%m%d_%H%M%S")
    out_csv = os.path.join(DATA_DIR, f"ohlcv_incremental_{ts_tag}.csv")
    collected = 0
    lock = threading.Lock()

    def on_event(data: RealTimeData):
        nonlocal collected
        try:
            df = data.to_dataFrame()
            if df is None or df.empty:
                return
            if "timestamp" in df.columns:
                df["timestamp"] = pd.to_datetime(df["timestamp"])
            # append to file
            with lock:
                df.to_csv(out_csv, mode="a", header=not os.path.exists(out_csv), index=False)
                collected += len(df)
                print(f"[fetch] saved {len(df)} rows (total {collected})")
        except Exception as e:
            print("[fetch] callback error:", e)

    # If small universe, open one stream
    if len(universe) == 0:
        print("[fetch] empty universe")
        return None

    try:
        if len(universe) <= BATCH_SIZE:
            ev = client_fq.Trading_Data_Stream(tickers=universe, callback=on_event)
            ev.start()
            # increment api_used for this stream
            runtime_state["api_used"] = runtime_state.get("api_used",0) + 1
            save_runtime_state()
            time.sleep(seconds_snapshot)
            try: ev.stop()
            except: pass
        else:
            # split into sequential batches to avoid many simultaneous sockets
            n_batches = math.ceil(len(universe)/BATCH_SIZE)
            per_batch_seconds = max(1, int(seconds_snapshot / n_batches))
            for i in range(0, len(universe), BATCH_SIZE):
                batch = universe[i:i+BATCH_SIZE]
                print(f"[fetch] [Batch {i//BATCH_SIZE+1}/{n_batches}] Fetching {len(batch)} tickers...")
                ev = client_fq.Trading_Data_Stream(tickers=batch, callback=on_event)
                ev.start()
                runtime_state["api_used"] = runtime_state.get("api_used",0) + 1
                save_runtime_state()
                time.sleep(per_batch_seconds)
                try: ev.stop()
                except: pass
    except Exception as e:
        print("[fetch] stream start error:", e)

    # After stream, if out_csv exists -> append to master parquet
    if os.path.exists(out_csv):
        try:
            df_new = pd.read_csv(out_csv)
            if "timestamp" in df_new.columns:
                df_new["timestamp"] = pd.to_datetime(df_new["timestamp"])
            master = os.path.join(DATA_DIR, "ohlcv_master.parquet")
            if os.path.exists(master):
                df_master = pd.read_parquet(master)
                combined = pd.concat([df_master, df_new], ignore_index=True)
                combined = combined.drop_duplicates(subset=["timestamp","ticker"])
                combined.to_parquet(master, index=False)
            else:
                df_new.to_parquet(master, index=False)
            print(f"[fetch] appended incremental to master parquet; rows new={len(df_new)}")
        except Exception as e:
            print("[fetch] append to master error:", e)

    print(f"[fetch] snapshot complete rows={collected}")
    save_runtime_state()
    return out_csv if collected>0 else None

# ---------------------------
# Block 4: Feature engineering helpers
# ---------------------------
def build_features_from_incr(incr_csv: str) -> Optional[str]:
    """
    Build features timeseries & today's snapshot from incremental CSV (recent ticks).
    """
    if incr_csv is None or not os.path.exists(incr_csv):
        print("[features] incremental CSV missing")
        return None
    try:
        df = pd.read_csv(incr_csv)
        if df.empty:
            print("[features] incremental empty")
            return None
        # ensure timestamp
        if "timestamp" in df.columns:
            df["timestamp"] = pd.to_datetime(df["timestamp"])
        df = df.sort_values(["ticker","timestamp"])
        frames=[]
        # use fi from client_fq (preferred) or local fi
        fi_local = None
        try:
            fi_local = client_fq.FiinIndicator()
        except Exception:
            fi_local = fi
        for tk, g in df.groupby("ticker"):
            gg = g.sort_values("timestamp").copy()
            try:
                # compute indicators using fi_local if present else fallbacks
                if fi_local is not None:
                    gg["ema5"] = fi_local.ema(gg["close"], window=5)
                    gg["ema20"] = fi_local.ema(gg["close"], window=20)
                    gg["ema50"] = fi_local.ema(gg["close"], window=50)
                    gg["rsi14"] = fi_local.rsi(gg["close"], window=14)
                    gg["macd"] = fi_local.macd(gg["close"], window_fast=12, window_slow=26)
                    gg["macd_signal"] = fi_local.macd_signal(gg["close"], window_fast=12, window_slow=26, window_sign=9)
                    gg["macd_diff"] = gg["macd"] - gg["macd_signal"]
                    gg["boll_up"] = fi_local.bollinger_hband(gg["close"], window=20, window_dev=2)
                    gg["boll_dn"] = fi_local.bollinger_lband(gg["close"], window=20, window_dev=2)
                    gg["atr14"] = fi_local.atr(gg["high"], gg["low"], gg["close"], window=14)
                    gg["obv"] = fi_local.obv(gg["close"], gg["volume"])
                    gg["vwap"] = fi_local.vwap(gg["high"], gg["low"], gg["close"], gg["volume"], window=20)
                    gg["vol_z"] = (gg["volume"] - gg["volume"].rolling(20).mean()) / gg["volume"].rolling(20).std()
                else:
                    # fallback simple computations
                    gg["ema5"] = gg["close"].ewm(span=5).mean()
                    gg["ema20"] = gg["close"].ewm(span=20).mean()
                    gg["rsi14"] = np.nan
                    gg["macd"] = gg["close"].ewm(span=12).mean() - gg["close"].ewm(span=26).mean()
                    gg["macd_signal"] = gg["macd"].ewm(span=9).mean()
                    gg["macd_diff"] = gg["macd"] - gg["macd_signal"]
                    gg["boll_up"] = gg["close"].rolling(20).mean() + 2 * gg["close"].rolling(20).std()
                    gg["boll_dn"] = gg["close"].rolling(20).mean() - 2 * gg["close"].rolling(20).std()
                    gg["vol_z"] = (gg["volume"] - gg["volume"].rolling(20).mean()) / gg["volume"].rolling(20).std()
            except Exception as e:
                print(f"[features] TA error {tk}: {e}")
            frames.append(gg)
        df_feat = pd.concat(frames, ignore_index=True)
        ts = datetime.now().strftime("%Y%m%d_%H%M%S")
        timeseries_path = os.path.join(FEATURE_DIR, f"features_timeseries_{ts}.parquet")
        df_feat.to_parquet(timeseries_path, index=False)
        snapshot = df_feat.groupby("ticker", as_index=False).last()
        snap_path = os.path.join(FEATURE_DIR, f"features_today_{ts}.csv")
        snapshot.to_csv(snap_path, index=False)
        print("[features] saved timeseries and snapshot")
        return snap_path
    except Exception as e:
        print("[features] build_from_incr error:", e)
        return None

def build_features_from_master(master_parquet: str) -> Optional[str]:
    """
    Build features from full ohlcv_master.parquet and return path to snapshot CSV (features_today_*.csv)
    This is used when incremental snapshot is empty/outside hours; we compute indicators from master historical data and output 'features_today'.
    """
    if not os.path.exists(master_parquet):
        print("[features_master] master parquet not found:", master_parquet)
        return None
    try:
        df = pd.read_parquet(master_parquet)
        if df.empty:
            print("[features_master] master parquet empty")
            return None
        # ensure timestamp as datetime
        if "timestamp" in df.columns:
            df["timestamp"] = pd.to_datetime(df["timestamp"])
        df = df.sort_values(["ticker","timestamp"])
        frames=[]
        fi_local = None
        try:
            fi_local = client_fq.FiinIndicator()
        except Exception:
            fi_local = fi
        for tk, g in df.groupby("ticker"):
            gg = g.sort_values("timestamp").copy()
            try:
                if fi_local is not None:
                    gg["ema5"] = fi_local.ema(gg["close"], window=5)
                    gg["ema20"] = fi_local.ema(gg["close"], window=20)
                    gg["ema50"] = fi_local.ema(gg["close"], window=50)
                    gg["rsi14"] = fi_local.rsi(gg["close"], window=14)
                    gg["macd"] = fi_local.macd(gg["close"], window_fast=12, window_slow=26)
                    gg["macd_signal"] = fi_local.macd_signal(gg["close"], window_fast=12, window_slow=26, window_sign=9)
                    gg["macd_diff"] = gg["macd"] - gg["macd_signal"]
                    gg["boll_up"] = fi_local.bollinger_hband(gg["close"], window=20, window_dev=2)
                    gg["boll_dn"] = fi_local.bollinger_lband(gg["close"], window=20, window_dev=2)
                    gg["atr14"] = fi_local.atr(gg["high"], gg["low"], gg["close"], window=14)
                    gg["obv"] = fi_local.obv(gg["close"], gg["volume"])
                    gg["vwap"] = fi_local.vwap(gg["high"], gg["low"], gg["close"], gg["volume"], window=20)
                    gg["vol_z"] = (gg["volume"] - gg["volume"].rolling(20).mean()) / gg["volume"].rolling(20).std()
                else:
                    gg["ema5"] = gg["close"].ewm(span=5).mean()
                    gg["ema20"] = gg["close"].ewm(span=20).mean()
                    gg["ema50"] = gg["close"].ewm(span=50).mean()
                    gg["rsi14"] = np.nan
                    gg["macd"] = gg["close"].ewm(span=12).mean() - gg["close"].ewm(span=26).mean()
                    gg["macd_signal"] = gg["macd"].ewm(span=9).mean()
                    gg["macd_diff"] = gg["macd"] - gg["macd_signal"]
                    gg["boll_up"] = gg["close"].rolling(20).mean() + 2 * gg["close"].rolling(20).std()
                    gg["boll_dn"] = gg["close"].rolling(20).mean() - 2 * gg["close"].rolling(20).std()
                    gg["vol_z"] = (gg["volume"] - gg["volume"].rolling(20).mean()) / gg["volume"].rolling(20).std()
            except Exception as e:
                print(f"[features_master] TA error {tk}: {e}")
            frames.append(gg)
        df_feat = pd.concat(frames, ignore_index=True)
        ts = datetime.now().strftime("%Y%m%d_%H%M%S")
        timeseries_path = os.path.join(FEATURE_DIR, f"features_timeseries_{ts}.parquet")
        df_feat.to_parquet(timeseries_path, index=False)
        snapshot = df_feat.groupby("ticker", as_index=False).last()
        snap_path = os.path.join(FEATURE_DIR, f"features_today_{ts}.csv")
        snapshot.to_csv(snap_path, index=False)
        print("[features_master] saved timeseries & snapshot")
        return snap_path
    except Exception as e:
        print("[features_master] error:", e)
        return None

# ---------------------------
# Block 5: A3C inference (fallback heuristic)
# ---------------------------
import torch
import torch.nn as nn
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# --- Model A3C (giống block training) ---
class A3CNet(nn.Module):
    def __init__(self, n_features, hidden=64):
        super().__init__()
        self.lstm = nn.LSTM(input_size=n_features, hidden_size=hidden, batch_first=True)
        self.actor = nn.Linear(hidden, 3)   # short, flat, long
        self.critic = nn.Linear(hidden, 1)
    def forward(self, x):
        out, _ = self.lstm(x)
        h = out[:, -1, :]
        return self.actor(h), self.critic(h)

class DummyA3C:
    """
    If you have per-cluster A3C models, load them externally and change a3c_infer to use them.
    This pipeline prefers a precomputed signals file 'signals/a3c_signals_infer.csv' if present.
    Otherwise fallback to heuristic rule-based signals.
    """
    pass

def a3c_infer(features_today_csv: str, a3c_models: Optional[dict] = None) -> Optional[str]:
    """
    Return CSV path with columns: date, ticker, action (int: -1/0/1), prob_buy, prob_hold, prob_sell
    """
    if features_today_csv is None or not os.path.exists(features_today_csv):
        print("[a3c] features missing -> skip")
        return None
    date_str = datetime.now().strftime("%Y-%m-%d")
    out_path = os.path.join(SIGNAL_DIR, f"a3c_signals_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv")
    precomputed = os.path.join(SIGNAL_DIR, "a3c_signals_infer.csv")

    # --- Case 1: nếu có file infer sẵn thì dùng ---
    if os.path.exists(precomputed):
        try:
            df = pd.read_csv(precomputed)
            if "signal" in df.columns and "action" not in df.columns:
                df = df.rename(columns={"signal":"action"})
            if "action" not in df.columns:
                if "pred" in df.columns:
                    df["action"] = df["pred"].astype(int)
                else:
                    df["action"] = 0
            for c,default in [("prob_buy",0.2),("prob_hold",0.6),("prob_sell",0.2)]:
                if c not in df.columns:
                    df[c] = default
            df[["date","ticker","action","prob_buy","prob_hold","prob_sell"]].to_csv(out_path, index=False)
            print("[a3c] used precomputed infer file ->", out_path)
            return out_path
        except Exception as e:
            print("[a3c] precomputed load error:", e)

    # --- Case 2: nếu có checkpoint actor thì load ---
    actor_path = os.path.join(MODEL_DIR, "a3c_actor.pth")
    if os.path.exists(actor_path):
        try:
            # Định nghĩa số feature = số cột của features_today_csv
            feats = pd.read_csv(features_today_csv)
            F = feats.shape[1] - 1  # trừ cột ticker
            model = A3CNet(F).to(device)
            model.load_state_dict(torch.load(actor_path, map_location=device))
            model.eval()

            rows=[]
            with torch.no_grad():
                for _, r in feats.iterrows():
                    x = torch.tensor(r.drop("ticker").values, dtype=torch.float32).unsqueeze(0).unsqueeze(0).to(device)
                    logits, _ = model(x)
                    probs = torch.softmax(logits, dim=-1).cpu().numpy()[0]
                    act = int(np.argmax(probs)-1)  # (-1,0,1)
                    rows.append([date_str, r["ticker"], act, probs[2], probs[1], probs[0]])
            odf = pd.DataFrame(rows, columns=["date","ticker","action","prob_buy","prob_hold","prob_sell"])
            odf.to_csv(out_path, index=False)
            print("[a3c] used trained actor checkpoint ->", out_path)
            return out_path
        except Exception as e:
            print("[a3c] checkpoint load error, fallback:", e)

    # --- Case 3: fallback rule-based ---
    feats = pd.read_csv(features_today_csv)
    rows=[]
    for _, r in feats.iterrows():
        ema5 = gfloat(r.get("ema5")); ema20 = gfloat(r.get("ema20"))
        rsi = gfloat(r.get("rsi14"))
        action = 1 if (not math.isnan(ema5) and not math.isnan(ema20) and ema5>ema20 and not math.isnan(rsi) and rsi<RSI_TH) else 0
        if action == 1:
            pb,ph,ps = 0.75,0.2,0.05
        else:
            pb,ph,ps = 0.1,0.8,0.1
        rows.append([date_str, r["ticker"], int(action), pb, ph, ps])
    odf = pd.DataFrame(rows, columns=["date","ticker","action","prob_buy","prob_hold","prob_sell"])
    odf.to_csv(out_path, index=False)
    print("[a3c] fallback saved:", out_path)
    return out_path


# ---------------------------
# Block 6: Rule-based scoring
# ---------------------------
def rule_scoring(features_today_csv: str) -> Optional[str]:
    if features_today_csv is None or not os.path.exists(features_today_csv):
        print("[rule] features missing")
        return None
    feats = pd.read_csv(features_today_csv)
    rows=[]
    date_str = datetime.now().strftime("%Y-%m-%d")
    for _, r in feats.iterrows():
        ticker = r["ticker"]
        ema5 = gfloat(r.get("ema5")); ema20 = gfloat(r.get("ema20"))
        rsi = gfloat(r.get("rsi14")); macd_diff = gfloat(r.get("macd_diff"))
        close = gfloat(r.get("close")); boll_up = gfloat(r.get("boll_up")); vol_z = gfloat(r.get("vol_z"))
        score=0; pass_list=[]; fail_list=[]
        if not math.isnan(ema5) and not math.isnan(ema20) and ema5>ema20:
            score+=1; pass_list.append("EMA5>EMA20")
        else:
            fail_list.append("EMA<=20")
        if not math.isnan(rsi) and rsi < RSI_TH:
            score+=1; pass_list.append("RSI<TH")
        else:
            fail_list.append("RSI>=")
        if not math.isnan(macd_diff) and macd_diff > 0:
            score+=1; pass_list.append("MACD>0")
        else:
            fail_list.append("MACD<=0")
        if not math.isnan(close) and not math.isnan(boll_up) and close > boll_up:
            score+=1; pass_list.append("BollBreak")
        else:
            fail_list.append("BollNo")
        if not math.isnan(vol_z) and vol_z > VOL_SPIKE_MULT:
            score+=1; pass_list.append("VolSpike")
        else:
            fail_list.append("VolNormal")
        details = {"ema5":ema5,"ema20":ema20,"rsi":rsi,"macd_diff":macd_diff,"close":close,"boll_up":boll_up,"vol_z":vol_z}
        rows.append([date_str, ticker, int(score), ";".join(pass_list), ";".join(fail_list), json.dumps(details), float(r.get("close", np.nan))])
    df = pd.DataFrame(rows, columns=["date","ticker","rule_score","pass_list","fail_list","detail_values","close"])
    out_path = os.path.join(SIGNAL_DIR, f"rule_scores_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv")
    df.to_csv(out_path, index=False)
    print("[rule] saved:", out_path)
    return out_path

# ---------------------------
# Block 7: DDPG hybrid allocation (fallback equal-weight)
# ---------------------------
def run_ddpg_hybrid(a3c_csv: Optional[str], rule_csv: Optional[str]) -> Tuple[Optional[str], Optional[str], str]:
    """
    Try:
      1) use ddpg_actor.pth if present (trained in notebook)
      2) use historical weights_by_ticker.csv if exists
      3) fallback to equal-weight among active tickers (action==1 & rule_score>=RULE_MIN_SCORE)
    Returns (alloc_path, top10_path, alloc_source)
    """
    if a3c_csv is None or not os.path.exists(a3c_csv) or rule_csv is None or not os.path.exists(rule_csv):
        print("[ddpg_hybrid] missing inputs")
        return (None, None, "no_input")

    sig = pd.read_csv(a3c_csv)
    rule = pd.read_csv(rule_csv)
    if "signal" in sig.columns and "action" not in sig.columns:
        sig = sig.rename(columns={"signal": "action"})
    if "action" not in sig.columns:
        sig["action"] = sig.get("action", 0).fillna(0).astype(int)

    merged = sig.merge(rule, on=["date", "ticker"], how="left")
    merged["rule_score"] = merged["rule_score"].fillna(0).astype(int)
    active = merged[(merged["action"] == 1) & (merged["rule_score"] >= RULE_MIN_SCORE)]
    active_tickers = sorted(active["ticker"].unique().tolist())
    if not active_tickers:
        print("[ddpg_hybrid] no active tickers -> no allocation")
        return (None, None, "no_active")

    # --- Try ddpg actor (saved from training notebook) ---
    actor_path = os.path.join(MODEL_DIR, "ddpg_actor.pth")
    if os.path.exists(actor_path) and TORCH:
        try:
            # định nghĩa lại Actor giống notebook training
            class Actor(nn.Module):
                def __init__(self, s_dim, a_dim, hidden=128):
                    super().__init__()
                    self.net = nn.Sequential(
                        nn.Linear(s_dim, hidden), nn.ReLU(),
                        nn.Linear(hidden, hidden), nn.ReLU(),
                        nn.Linear(hidden, a_dim)
                    )
                def forward(self, s):
                    return torch.softmax(self.net(s), dim=-1)

            device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

            # --- load actor ---
            s_dim = len(active_tickers)    # tạm lấy số mã active làm state_dim
            a_dim = len(active_tickers)    # số action = số mã
            ddpg_actor = Actor(s_dim, a_dim).to(device)
            ddpg_actor.load_state_dict(torch.load(actor_path, map_location=device))
            ddpg_actor.eval()

            # --- state hiện tại: đơn giản = identity (1 nếu active) ---
            state_today = np.zeros((s_dim,), dtype="float32")
            for i, tk in enumerate(active_tickers):
                state_today[i] = 1.0  # đánh dấu mã active

            s = torch.tensor(state_today, dtype=torch.float32).unsqueeze(0).to(device)
            with torch.no_grad():
                w = ddpg_actor(s).cpu().numpy()[0]

            # normalize
            w = np.clip(w, 0, 1)
            if w.sum() <= 1e-12:
                w = np.ones_like(w) / len(w)
            else:
                w /= w.sum()

            # build alloc df
            date_label = datetime.now().strftime("%Y-%m-%d")
            rows = [[date_label, 0, tk, 1.0, float(w[i])] for i, tk in enumerate(active_tickers)]
            df_alloc = pd.DataFrame(rows, columns=["date","cluster","ticker","cluster_weight","ticker_weight"])

            alloc_path = os.path.join(ALLOC_DIR, f"alloc_ddpg_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv")
            df_alloc.to_csv(alloc_path, index=False)

            top10 = df_alloc.sort_values("ticker_weight", ascending=False).head(TOPK)
            top10_path = os.path.join(ALLOC_DIR, f"top10_ddpg_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv")
            top10.to_csv(top10_path, index=False)

            print("[ddpg_hybrid] used ddpg_actor ->", alloc_path)
            return (alloc_path, top10_path, "DDPG")
        except Exception as e:
            print("[ddpg_hybrid] ddpg actor error:", e)

    # --- Try historical weights_by_ticker.csv ---
    hist_weights_path = os.path.join(MONITOR_DIR, "weights_by_ticker.csv")
    if os.path.exists(hist_weights_path):
        try:
            dfw = pd.read_csv(hist_weights_path, index_col=0)
            if not dfw.empty:
                last = dfw.iloc[-1]
                rows = []
                date_label = datetime.now().strftime("%Y-%m-%d")
                total_raw = 0.0
                for t in active_tickers:
                    w = float(last.get(t, 0.0)) if t in last.index else 0.0
                    rows.append([date_label, 0, t, None, w])
                    total_raw += w
                if total_raw > 1e-12:
                    df_alloc = pd.DataFrame(rows, columns=["date","cluster","ticker","cluster_weight","ticker_weight"])
                    df_alloc["ticker_weight"] = df_alloc["ticker_weight"] / df_alloc["ticker_weight"].sum()
                    alloc_path = os.path.join(ALLOC_DIR, f"alloc_hist_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv")
                    df_alloc.to_csv(alloc_path, index=False)
                    top10 = df_alloc.sort_values("ticker_weight", ascending=False).head(TOPK)
                    top10_path = os.path.join(ALLOC_DIR, f"top10_hist_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv")
                    top10.to_csv(top10_path, index=False)
                    print("[ddpg_hybrid] using historical weights saved:", alloc_path)
                    return (alloc_path, top10_path, "HistAlloc")
        except Exception as e:
            print("[ddpg_hybrid] historical weights load error:", e)

    # --- fallback equal-weight ---
    per = 1.0 / len(active_tickers)
    rows = []
    date_label = datetime.now().strftime("%Y-%m-%d")
    for t in active_tickers:
        rows.append([date_label, 0, t, 1.0, round(per, 6)])
    df_alloc = pd.DataFrame(rows, columns=["date","cluster","ticker","cluster_weight","ticker_weight"])
    alloc_path = os.path.join(ALLOC_DIR, f"alloc_equal_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv")
    df_alloc.to_csv(alloc_path, index=False)
    top10_path = os.path.join(ALLOC_DIR, f"top10_equal_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv")
    df_alloc.sort_values("ticker_weight", ascending=False).head(TOPK).to_csv(top10_path, index=False)
    print("[ddpg_hybrid] equal-weight allocation saved:", alloc_path)
    return (alloc_path, top10_path, "EqualWeight")

# ---------------------------
# Block 8: Decision engine & Alerts
# ---------------------------
EMOJI = {"BUY":"🟢","SELL":"🔴","HOLD":"🟡","PASS":"✔️","FAIL":"✖️"}

def decision_and_alerts(a3c_csv: Optional[str], rule_csv: Optional[str], alloc_pair: Tuple[Optional[str],Optional[str]], alloc_source: str) -> Optional[str]:
    """
    Create alerts CSV and send Telegram messages + push to Google Sheets.
    """
    if a3c_csv is None or not os.path.exists(a3c_csv) or rule_csv is None or not os.path.exists(rule_csv):
        print("[alerts] missing inputs")
        return None
    sig = pd.read_csv(a3c_csv)
    rule = pd.read_csv(rule_csv)
    if "signal" in sig.columns and "action" not in sig.columns:
        sig = sig.rename(columns={"signal":"action"})
    if "action" not in sig.columns:
        sig["action"] = sig.get("action", 0).fillna(0).astype(int)
    merged = sig.merge(rule, on=["date","ticker"], how="left")
    # attach allocation weights if present
    if alloc_pair and alloc_pair[0] and os.path.exists(alloc_pair[0]):
        alloc_df = pd.read_csv(alloc_pair[0])
        merged = merged.merge(alloc_df[["ticker","ticker_weight"]], on="ticker", how="left")
    else:
        merged["ticker_weight"] = 0.0

    ts_now = datetime.now()
    alerts=[]
    for _, r in merged.iterrows():
        ticker = r["ticker"]
        action_code = int(r.get("action",0))
        rule_score = int(r.get("rule_score",0)) if not pd.isna(r.get("rule_score", np.nan)) else 0
        alloc_pct = float(r.get("ticker_weight", 0.0)) if not pd.isna(r.get("ticker_weight", np.nan)) else 0.0
        price = r.get("close", None)
        decided = None
        if action_code == 1 and rule_score >= RULE_MIN_SCORE and alloc_pct >= MIN_ALLOC:
            decided = "BUY"
        elif action_code == -1 and rule_score >= RULE_MIN_SCORE:
            decided = "SELL"
        else:
            decided = None

        if decided is None:
            continue

        # debounce
        last = runtime_state.get("last_alerts", {}).get(ticker)
        can_send = True
        if last:
            try:
                last_time = datetime.fromisoformat(last.get("time"))
                last_action = last.get("action")
                if last_action == decided and (ts_now - last_time) < timedelta(hours=COOLDOWN_HOURS):
                    can_send = False
            except Exception:
                can_send = True
        if not can_send:
            print(f"[alerts] debounce skip {ticker} {decided}")
            continue

        alert_id = f"{ticker}_{ts_now.strftime('%Y%m%d_%H%M%S')}_{np.random.randint(0,999999):06d}"
        exec_date = (ts_now + timedelta(days=EXEC_LAG)).strftime("%Y-%m-%d")
        indicators = r.get("pass_list", "") or ""
        source_txt = alloc_source or "Unknown"

        row = {
            "log_time": ts_now.strftime("%Y-%m-%d %H:%M:%S"),
            "alert_id": alert_id,
            "ticker": ticker,
            "action": decided,
            "price": price if not pd.isna(price) else None,
            "a3c": {1:"BUY",0:"HOLD",-1:"SELL"}.get(action_code,"HOLD"),
            "rule_score": rule_score,
            "indicators": indicators,
            "alloc_pct": alloc_pct,
            "alloc_source": source_txt,
            "exec_date": exec_date,
            "status": "PENDING"
        }

        # build Telegram message (Markdown)
        emoji = EMOJI.get(decided, "")
        em_a3c = EMOJI.get("BUY","") if action_code==1 else (EMOJI.get("SELL","") if action_code==-1 else EMOJI.get("HOLD",""))
        msg = (
            f"{emoji} *CẢNH BÁO MỚI*\n"
            f"Thời gian: {ts_now.strftime('%Y-%m-%d %H:%M')}\n"
            f"Mã: *{ticker}*\n"
            f"Hành động: *{decided}* {emoji}\n"
            f"Giá: {row['price'] if row['price'] is not None else 'N/A'}\n"
            f"A3C: {em_a3c} {row['a3c']}\n"
            f"Rule-based: {row['rule_score']}/5 — {indicators}\n"
            f"Phân bổ vốn: {round(alloc_pct*100,4)}%  (Nguồn: {source_txt})\n"
            f"Exec: T+{EXEC_LAG} ({exec_date})\n"
            f"AlertID: `{alert_id}`"
        )
        sent = send_telegram_text(msg, thread_id=TG_THREAD_ID)
        time.sleep(0.5)
        row["status"] = "SENT" if sent else "NOT_SENT"
        alerts.append(row)
        runtime_state.setdefault("last_alerts", {})[ticker] = {"time": ts_now.isoformat(), "action": decided}

    if not alerts:
        # send HOLD summary (top candidates)
        merged_local = merged.copy()
        merged_local["rule_score"] = merged_local["rule_score"].fillna(0)
        top = merged_local.sort_values("rule_score", ascending=False).drop_duplicates("ticker").head(TOPK)
        top_text = ""
        for idx, rowt in top.iterrows():
            top_text += f"{rowt['ticker']} — rule {int(rowt['rule_score'])}\n"
        hold_msg = (
            f"{EMOJI['HOLD']} *NO TRADE SIGNAL (HOLD)*\n"
            f"Thời gian: {datetime.now().strftime('%Y-%m-%d %H:%M')}\n"
            f"Top {TOPK} candidates by rule:\n{top_text}"
        )
        send_telegram_text(hold_msg, thread_id=TG_THREAD_ID)
        # optionally send top10 file if present
        if alloc_pair and alloc_pair[1] and os.path.exists(alloc_pair[1]):
            send_telegram_file(alloc_pair[1], caption=f"Top {TOPK} allocations")
        print("[alerts] no trade alerts this run (HOLD summary sent if applicable)")
        save_runtime_state()
        return None

    df_alerts = pd.DataFrame(alerts)
    out_path = os.path.join(ALERT_DIR, f"alerts_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv")
    df_alerts.to_csv(out_path, index=False)
    # push to gsheet
    try:
        append_to_gsheet(df_alerts, worksheet_name="Alerts")
    except Exception as e:
        print("[alerts] push to gsheet error:", e)
    master_log = os.path.join(ALERT_DIR, "alerts_log.csv")
    df_alerts.to_csv(master_log, mode="a", header=not os.path.exists(master_log), index=False)
    print("[alerts] saved & sent:", out_path)
    save_runtime_state()
    return out_path

# ---------------------------
# Block 9: Monitoring & artifact logging
# ---------------------------
def log_weights_returns_turnover(alloc_csv: Optional[str], alloc_source: str):
    if alloc_csv is None or not os.path.exists(alloc_csv):
        print("[monitor] no alloc to log")
        return
    alloc = pd.read_csv(alloc_csv)
    date_label = datetime.now().strftime("%Y-%m-%d")
    w_series = alloc.set_index("ticker")["ticker_weight"].astype(float)
    weights_path = os.path.join(MONITOR_DIR, "weights_by_ticker.csv")
    prev_weights = None
    if os.path.exists(weights_path):
        prev_df = pd.read_csv(weights_path, index_col=0)
        if not prev_df.empty:
            prev_weights = prev_df.iloc[-1].dropna().astype(float)
    all_tickers = sorted(list(set(list(w_series.index) + ([] if prev_weights is None else list(prev_weights.index)))))
    row = {t: float(w_series.get(t, 0.0)) for t in all_tickers}
    df_row = pd.DataFrame([row], index=[date_label])
    df_row.to_csv(weights_path, mode="a", header=not os.path.exists(weights_path))
    if prev_weights is not None:
        prev = prev_weights.reindex(all_tickers).fillna(0.0)
        curr = pd.Series(row).reindex(all_tickers).fillna(0.0)
        turnover = float(np.sum(np.abs(curr.values - prev.values)))
    else:
        turnover = float(np.sum(np.abs(list(row.values()))))
    trades_log_path = os.path.join(MONITOR_DIR, "trades_log.csv")
    pd.DataFrame([{"date": date_label, "turnover": turnover, "alloc_source": alloc_source}]).to_csv(trades_log_path, mode="a", header=not os.path.exists(trades_log_path), index=False)
    # naive returns using last two dates in master parquet
    master_path = os.path.join(DATA_DIR, "ohlcv_master.parquet")
    returns_path = os.path.join(MONITOR_DIR, "returns.csv")
    if os.path.exists(master_path):
        px = pd.read_parquet(master_path)
        px["timestamp"] = pd.to_datetime(px["timestamp"])
        dates_unique = pd.Series(px["timestamp"].dt.date.unique())
        if len(dates_unique) >= 2:
            d1 = pd.to_datetime(dates_unique.iloc[-2])
            d2 = pd.to_datetime(dates_unique.iloc[-1])
            px_wide = px.pivot(index="timestamp", columns="ticker", values="close")
            try:
                p1 = px_wide.loc[px_wide.index.date == d1.date()].iloc[-1]
                p2 = px_wide.loc[px_wide.index.date == d2.date()].iloc[-1]
                pct = (p2 - p1) / p1
                if prev_weights is None:
                    port_ret = float(np.dot(list(row.values()), pct.reindex(all_tickers).fillna(0.0).values))
                else:
                    port_ret = float(np.dot(prev.reindex(all_tickers).fillna(0.0).values, pct.reindex(all_tickers).fillna(0.0).values))
                pd.DataFrame([{"date": date_label, "daily_return": port_ret, "alloc_source": alloc_source}]).to_csv(returns_path, mode="a", header=not os.path.exists(returns_path), index=False)
            except Exception as e:
                print("[monitor] compute returns error:", e)
        else:
            print("[monitor] not enough history to compute returns")
    else:
        print("[monitor] ohlcv master missing -> cannot compute returns")

def monitoring_stats_update():
    api_file = os.path.join(MONITOR_DIR, "api_usage.csv")
    pd.DataFrame([{"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "api_used": runtime_state.get("api_used",0)}]).to_csv(api_file, mode="a", header=not os.path.exists(api_file), index=False)
    master_log = os.path.join(ALERT_DIR, "alerts_log.csv")
    if os.path.exists(master_log):
        df = pd.read_csv(master_log, parse_dates=["log_time"])
        df["date_only"] = df["log_time"].dt.date
        stats = df.groupby("date_only").agg(total=("alert_id","count"), buys=("action", lambda s: (s=="BUY").sum()), sells=("action", lambda s: (s=="SELL").sum()))
        stats.to_csv(os.path.join(MONITOR_DIR, "alert_stats.csv"))

# ---------------------------
# Orchestration: run_cycle (one-run)
# ---------------------------
def run_cycle() -> dict:
    artifacts = {}
    univ_path = os.path.join(UNIVERSE_DIR, "universe_list.csv")
    if not os.path.exists(univ_path):
        raise FileNotFoundError("universe/universe_list.csv missing. Run Block 2 offline.")
    universe = pd.read_csv(univ_path)["ticker"].tolist()
    print(f"[run_cycle] universe size: {len(universe)}")
    # Block3: fetch snapshot (stream)
    incr = None
    try:
        incr = fetch_snapshot(universe, seconds_snapshot=SNAPSHOT_SECONDS)
    except Exception as e:
        print("[run_cycle] fetch_snapshot error:", e)
    artifacts["incremental_csv"] = incr
    master = os.path.join(DATA_DIR, "ohlcv_master.parquet")
    # Backfill master if missing
    if not os.path.exists(master):
        try:
            print("[run_cycle] master missing -> backfilling 6 months")
            from_date = (datetime.now() - pd.DateOffset(months=6)).strftime("%Y-%m-%d")
            all_frames=[]
            for i in range(0, len(universe), BATCH_SIZE):
                batch = universe[i:i+BATCH_SIZE]
                try:
                    dfb = client_fq.Fetch_Trading_Data(realtime=False, tickers=batch,
                              fields=['open','high','low','close','volume'], adjusted=True, by='1d', from_date=from_date).get_data()
                    all_frames.append(dfb)
                    time.sleep(0.5)
                except Exception as e:
                    print("[backfill] batch error:", e)
            if len(all_frames) > 0:
                df_all = pd.concat(all_frames, ignore_index=True)
                df_all.to_parquet(master, index=False)
                print("[backfill] saved master parquet", master, "rows=", len(df_all))
        except Exception as e:
            print("[run_cycle] backfill exception:", e)
    # Push master to GSheet if present
    try:
        if os.path.exists(master):
            push_ohlcv_master_to_gsheet(master)
    except Exception as e:
        print("[run_cycle] push master to gsheet failed:", e)
    # push incremental to gsheet as well
    try:
        if incr and os.path.exists(incr):
            push_csv_to_gsheet(incr, worksheet_name_prefix="INCR")
    except Exception as e:
        print("[run_cycle] push incremental to gsheet failed:", e)

    # Block 4: features
    features_csv = None
    try:
        if os.path.exists(master):
            features_csv = build_features_from_master(master)
        else:
            features_csv = build_features_from_incr(incr)
    except Exception as e:
        print("[run_cycle] build features error:", e)
    artifacts["features_csv"] = features_csv

    # Block 5: A3C inference
    try:
        a3c_csv = a3c_infer(features_csv, None)
    except Exception as e:
        print("[run_cycle] a3c infer error:", e)
        a3c_csv = None
    artifacts["a3c_csv"] = a3c_csv

    # Block 6: rule scoring
    try:
        rule_csv = rule_scoring(features_csv)
    except Exception as e:
        print("[run_cycle] rule scoring error:", e)
        rule_csv = None
    artifacts["rule_csv"] = rule_csv

    # Block 7: allocation
    try:
        alloc_path, top10_path, alloc_source = run_ddpg_hybrid(a3c_csv, rule_csv)
    except Exception as e:
        print("[run_cycle] run_ddpg_hybrid error:", e)
        alloc_path, top10_path, alloc_source = (None, None, "no_input")
    artifacts["alloc_path"] = alloc_path
    artifacts["top10_path"] = top10_path
    artifacts["alloc_source"] = alloc_source

    # Block 8: decision & alerts
    try:
        alert_file = decision_and_alerts(a3c_csv, rule_csv, (alloc_path, top10_path), alloc_source)
    except Exception as e:
        print("[run_cycle] decision_and_alerts error:", e)
        alert_file = None
    artifacts["alert_file"] = alert_file

    # Block 9: monitoring
    try:
        log_weights_returns_turnover(alloc_path, alloc_source)
        monitoring_stats_update()
    except Exception as e:
        print("[run_cycle] monitoring error:", e)

    print("[run_cycle] Done. Artifacts:", artifacts)
    return artifacts

# ---------------------------
# Historical replay / stress-test
# ---------------------------
def run_historical_inference(start_date: str, end_date: str, push_gsheet: bool = True, send_telegram: bool = True) -> str:
    """
    Replay inference on historical master parquet between start_date and end_date inclusive.
    For each trading date, compute features snapshot (up to that date), run a3c_infer & rule_scoring,
    push results to google sheets and send telegram summary (if requested).
    """
    master = os.path.join(DATA_DIR, "ohlcv_master.parquet")
    if not os.path.exists(master):
        raise FileNotFoundError("ohlcv_master.parquet missing for historical inference")
    px = pd.read_parquet(master)
    px["timestamp"] = pd.to_datetime(px["timestamp"])
    dates_all = np.unique(px["timestamp"].dt.date)
    s_date = pd.to_datetime(start_date).date()
    e_date = pd.to_datetime(end_date).date()
    run_dates = [d for d in sorted(dates_all) if (d >= s_date and d <= e_date)]
    summary_rows=[]
    for d in run_dates:
        date_label = pd.to_datetime(d).strftime("%Y-%m-%d")
        print(f"[replay] running for {date_label}")
        try:
            df_until = px[px["timestamp"].dt.date <= pd.to_datetime(date_label).date()].copy()
            frames=[]
            fi_local = None
            try:
                fi_local = client_fq.FiinIndicator()
            except Exception:
                fi_local = fi
            for tk, g in df_until.groupby("ticker"):
                gg = g.sort_values("timestamp").copy()
                try:
                    if fi_local is not None:
                        gg_feat = gg.copy()
                        gg_feat['ema5'] = fi_local.ema(gg_feat['close'], window=5)
                        gg_feat['ema20'] = fi_local.ema(gg_feat['close'], window=20)
                        gg_feat['rsi14'] = fi_local.rsi(gg_feat['close'], window=14)
                        gg_feat['macd'] = fi_local.macd(gg_feat['close'], window_fast=12, window_slow=26)
                        gg_feat['macd_signal'] = fi_local.macd_signal(gg_feat['close'], window_fast=12, window_slow=26, window_sign=9)
                        gg_feat['macd_diff'] = gg_feat['macd'] - gg_feat['macd_signal']
                        gg_feat['boll_up'] = fi_local.bollinger_hband(gg_feat['close'], window=20, window_dev=2)
                        gg_feat['boll_dn'] = fi_local.bollinger_lband(gg_feat['close'], window=20, window_dev=2)
                        gg_feat['vol_z'] = (gg_feat['volume'] - gg_feat['volume'].rolling(20).mean())/gg_feat['volume'].rolling(20).std()
                    else:
                        gg_feat = gg.copy()
                        gg_feat['ema5'] = gg_feat['close'].ewm(span=5).mean()
                        gg_feat['ema20'] = gg_feat['close'].ewm(span=20).mean()
                        gg_feat['rsi14'] = np.nan
                        gg_feat['macd'] = gg_feat['close'].ewm(span=12).mean() - gg_feat['close'].ewm(span=26).mean()
                        gg_feat['macd_signal'] = gg_feat['macd'].ewm(span=9).mean()
                        gg_feat['macd_diff'] = gg_feat['macd'] - gg_feat['macd_signal']
                        gg_feat['boll_up'] = gg_feat['close'].rolling(20).mean() + 2 * gg_feat['close'].rolling(20).std()
                        gg_feat['boll_dn'] = gg_feat['close'].rolling(20).mean() - 2 * gg_feat['close'].rolling(20).std()
                        gg_feat['vol_z'] = (gg_feat['volume'] - gg_feat['volume'].rolling(20).mean())/gg_feat['volume'].rolling(20).std()
                except Exception as e:
                    print(f"[replay] TA error {tk} {e}")
                    gg_feat = gg.copy()
                frames.append(gg_feat)
            if len(frames) == 0:
                print("[replay] no data for date", date_label)
                summary_rows.append({"date": date_label, "a3c": None, "rule": None, "error": "no_data"})
                continue
            df_all = pd.concat(frames, ignore_index=True)
            snapshot = df_all.sort_values("timestamp").groupby("ticker").last().reset_index()
            snapshot_path = os.path.join(FEATURE_DIR, f"features_replay_{date_label}.csv")
            snapshot.to_csv(snapshot_path, index=False)
            a3c_path = a3c_infer(snapshot_path, None)
            rule_path = rule_scoring(snapshot_path)
            # push to gsheet
            if push_gsheet:
                try:
                    push_csv_to_gsheet(snapshot_path, worksheet_name_prefix=f"OHLCV_REPLAY_{date_label}")
                except Exception as e:
                    print("[replay][gs] push prices error:", e)
                try:
                    if a3c_path and os.path.exists(a3c_path):
                        push_csv_to_gsheet(a3c_path, worksheet_name_prefix=f"A3C_REPLAY_{date_label}")
                    if rule_path and os.path.exists(rule_path):
                        push_csv_to_gsheet(rule_path, worksheet_name_prefix=f"RULE_REPLAY_{date_label}")
                except Exception as e:
                    print("[replay][gs] push signals error:", e)
            # send telegram summary
            if send_telegram:
                top = None
                if rule_path and os.path.exists(rule_path):
                    topdf = pd.read_csv(rule_path)
                    top = topdf.sort_values("rule_score", ascending=False).head(TOPK)
                msg = f"📅 Replay {date_label} — signals summary\n"
                if top is None or top.empty:
                    msg += "No strong signals.\n"
                else:
                    for i, row in top.iterrows():
                        msg += f"{row['ticker']} — rule {int(row['rule_score'])}\n"
                send_telegram_text(msg)
            summary_rows.append({"date": date_label, "a3c": a3c_path, "rule": rule_path, "error": None})
        except Exception as e:
            print(f"[replay] exception for {date_label}", e)
            summary_rows.append({"date": date_label, "a3c": None, "rule": None, "error": str(e)})
    summary_df = pd.DataFrame(summary_rows)
    out_summary = os.path.join(MONITOR_DIR, f"replay_summary_{start_date.replace('-','')}_{end_date.replace('-','')}.csv")
    summary_df.to_csv(out_summary, index=False)
    print("[replay] done. summary saved:", out_summary)
    return out_summary

# ---------------------------
# Main loop helper
# ---------------------------
def main_loop(interval_seconds: int = 6*60*60):
    print("[main_loop] starting with interval", interval_seconds)
    while True:
        try:
            run_cycle()
            print(f"[main_loop] sleeping for {interval_seconds} seconds...")
            time.sleep(interval_seconds)
        except KeyboardInterrupt:
            print("[main_loop] stopped by user")
            break
        except Exception as e:
            print("[main_loop] exception:", e)
            traceback.print_exc()
            time.sleep(60)

[init] Logging into FiinQuant...
[init] Fiin client logged in. FiinIndicator ok? True
[init] Google Sheets ready: DSTRound3


In [33]:
run_cycle()

[run_cycle] universe size: 200
[fetch] [Batch 1/4] Fetching 50 tickers...
Connection established. Waiting for server processing...
2025-09-21 16:12:36 Joined group: Realtime.Ticker.VIX
2025-09-21 16:12:36 Joined group: Realtime.Ticker.CII
2025-09-21 16:12:36 Joined group: Realtime.Ticker.SSI
2025-09-21 16:12:36 Joined group: Realtime.Ticker.GEX
2025-09-21 16:12:36 Joined group: Realtime.Ticker.VPB
2025-09-21 16:12:36 Joined group: Realtime.Ticker.VIC
2025-09-21 16:12:36 Joined group: Realtime.Ticker.VHM
2025-09-21 16:12:36 Joined group: Realtime.Ticker.SHB
2025-09-21 16:12:36 Joined group: Realtime.Ticker.DXG
2025-09-21 16:12:36 Joined group: Realtime.Ticker.DIG
2025-09-21 16:12:36 Joined group: Realtime.Ticker.VND
2025-09-21 16:12:36 Joined group: Realtime.Ticker.VSC
2025-09-21 16:12:36 Joined group: Realtime.Ticker.PDR
2025-09-21 16:12:36 Joined group: Realtime.Ticker.MBB
2025-09-21 16:12:36 Joined group: Realtime.Ticker.NKG
2025-09-21 16:12:36 Joined group: Realtime.Ticker.ANV
2025-

{'incremental_csv': None,
 'features_csv': './features\\features_today_20250921_161353.csv',
 'a3c_csv': './signals\\a3c_signals_20250921_161354.csv',
 'rule_csv': './signals\\rule_scores_20250921_161359.csv',
 'alloc_path': None,
 'top10_path': None,
 'alloc_source': 'no_active',
 'alert_file': None}

**TEST TRÊN STRESS TEST**

In [18]:
# === Backtest / Historical Replay standalone cell ===
# Paste toàn bộ cell này vào 1 ô trong notebook và chạy.
# Mặc định: sẽ chạy replay từ START_DATE -> END_DATE và gửi Telegram mỗi ngày.
# Nếu muốn đồng thời ghi Google Sheet, set PUSH_GSHEET = True (cần service JSON & sheet)
# Người dùng có thể chỉnh biến cấu hình bên dưới

import os, time, json, math, glob, traceback
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
import requests

# ====== CONFIG (chỉnh ở đây) ======
FQ_USERNAME = "DSTC_18@fiinquant.vn"        # bạn cung cấp
FQ_PASSWORD = "Fiinquant0606"              # bạn cung cấp

# Google Sheets (optional)
PUSH_GSHEET = False                         # default: False (khuyên bật False khi debug)
GS_SERVICE_JSON = "dstround3.json"          # service account JSON path
GS_SHEET_KEY = "17FRrF63TFE3bmAseoV4vQK5EA9nYaTaRRnLyd1F8MGU"
GS_SHEET_NAME = "DSTRound3"

# Telegram config (edit if cần)
TG_TOKEN = "8454050043:AAG_quR7eSALqh9WVRvx6DRZVxtRde_OpFQ"
TG_CHAT_ID = "-1002692813170"   # chat id as string, e.g. "-2692813170"
TG_THREAD_ID = 2             # set thread id you want, change if needed

# Replay period (stress test default)
START_DATE = "2025-03-25"
END_DATE   = "2025-04-16"

# OHLCV backfill window length (if master not found)
BACKFILL_MONTHS = 6   # fetch 6 months history if needed

# Rule thresholds
RSI_TH = 35
VOL_SPIKE_MULT = 1.5
RULE_MIN_SCORE = 3
MIN_ALLOC = 0.005   # not used here, just for reference

# Paths
DATA_DIR = "./data"
FEATURE_DIR = "./features"
SIGNAL_DIR = "./signals"
MONITOR_DIR = "./monitor"
os.makedirs(DATA_DIR, exist_ok=True)
os.makedirs(FEATURE_DIR, exist_ok=True)
os.makedirs(SIGNAL_DIR, exist_ok=True)
os.makedirs(MONITOR_DIR, exist_ok=True)

# ===== Helpers: Telegram & Google Sheets =====
def send_telegram_text(msg, thread_id=None):
    url = f"https://api.telegram.org/bot{TG_TOKEN}/sendMessage"
    payload = {"chat_id": TG_CHAT_ID, "text": msg, "parse_mode": "Markdown"}
    if thread_id is not None:
        payload["message_thread_id"] = int(thread_id)
    try:
        r = requests.post(url, json=payload, timeout=15)
        if r.status_code != 200:
            print(f"[tg] send failed: {r.status_code} {r.text}")
            return False
        return True
    except Exception as e:
        print("[tg] exception:", e)
        return False

def send_telegram_file(path, caption="", thread_id=None):
    url = f"https://api.telegram.org/bot{TG_TOKEN}/sendDocument"
    data = {"chat_id": TG_CHAT_ID, "caption": caption}
    if thread_id is not None:
        data["message_thread_id"] = int(thread_id)
    try:
        with open(path,"rb") as fh:
            files = {"document": fh}
            r = requests.post(url, data=data, files=files, timeout=30)
        if r.status_code != 200:
            print("[tg] sendDocument failed:", r.status_code, r.text)
            return False
        return True
    except Exception as e:
        print("[tg] sendDocument exception:", e)
        return False

# Google Sheets helper (optional)
if PUSH_GSHEET:
    try:
        import gspread
        from gspread_dataframe import set_with_dataframe
        from google.oauth2.service_account import Credentials
        GS_CREDS = Credentials.from_service_account_file(GS_SERVICE_JSON, scopes=[
            "https://www.googleapis.com/auth/spreadsheets",
            "https://www.googleapis.com/auth/drive"
        ])
        GS_CLIENT = gspread.authorize(GS_CREDS)
        GS_SHEET = GS_CLIENT.open_by_key(GS_SHEET_KEY)
        try:
            GS_WS = GS_SHEET.worksheet(GS_SHEET_NAME)
        except Exception:
            GS_WS = GS_SHEET.add_worksheet(title=GS_SHEET_NAME, rows="1000", cols="20")
        print("[gs] Google Sheets ready:", GS_SHEET_NAME)
    except Exception as e:
        print("[gs] init failed, disabling PUSH_GSHEET. Error:", e)
        PUSH_GSHEET = False

def append_df_to_gsheet(df, worksheet, batch_size=500):
    # Clean NaN -> empty string before push to avoid JSON NaN error
    df2 = df.copy()
    df2 = df2.replace([np.inf, -np.inf], np.nan).fillna("")
    # append rows in batches
    rows = df2.values.tolist()
    try:
        # gspread append_rows may accept batches
        for i in range(0, len(rows), batch_size):
            chunk = rows[i:i+batch_size]
            worksheet.append_rows(chunk, value_input_option="RAW")
        return True
    except Exception as e:
        print("[gs] append error:", e)
        return False

# ===== FiinQuant client helpers =====
from FiinQuantX import FiinSession, RealTimeData   # must be installed in environment

def init_fiin_client():
    try:
        client = FiinSession(username=FQ_USERNAME, password=FQ_PASSWORD).login()
        print("[init] Fiin client logged in")
        return client
    except Exception as e:
        print("[init] Fiin login failed:", e)
        return None

# TA builder using fiin indicator from client
def add_ta_indicators_for_df(df, fi):
    # df should have columns: timestamp, open, high, low, close, volume
    df = df.sort_values("timestamp").copy().reset_index(drop=True)
    try:
        df['ema_5'] = fi.ema(df['close'], window=5)
        df['ema_20'] = fi.ema(df['close'], window=20)
        df['ema_50'] = fi.ema(df['close'], window=50)
        df['macd'] = fi.macd(df['close'], window_fast=12, window_slow=26)
        df['macd_signal'] = fi.macd_signal(df['close'], window_fast=12, window_slow=26, window_sign=9)
        df['macd_diff'] = df['macd'] - df['macd_signal']
        df['rsi_14'] = fi.rsi(df['close'], window=14)
        df['boll_up'] = fi.bollinger_hband(df['close'], window=20, window_dev=2)
        df['boll_dn'] = fi.bollinger_lband(df['close'], window=20, window_dev=2)
        df['atr_14'] = fi.atr(df['high'], df['low'], df['close'], window=14)
        df['obv'] = fi.obv(df['close'], df['volume'])
        df['vwap_20'] = fi.vwap(df['high'], df['low'], df['close'], df['volume'], window=20)
        # volume z-score over 20
        df['vol_z'] = (df['volume'] - df['volume'].rolling(20).mean()) / (df['volume'].rolling(20).std().replace(0, np.nan))
    except Exception as e:
        # If some fi functions not available or error, fill NaN
        print("[TA] indicator compute error:", e)
    return df

# ===== Backfill / ensure master OHLCV present =====
def backfill_master(client, universe, months=BACKFILL_MONTHS):
    master_path = os.path.join(DATA_DIR, "ohlcv_master.parquet")
    if os.path.exists(master_path):
        print("[backfill] master exists:", master_path)
        return master_path
    # compute from_date
    to_date = datetime.now().date()
    from_date = (datetime.now() - pd.DateOffset(months=months)).date()
    print(f"[backfill] requesting historical OHLCV for {len(universe)} tickers from {from_date}")
    # Fetch in batches (Fiin API may accept many tickers)
    all_rows=[]
    batch_size = 50
    for i in range(0, len(universe), batch_size):
        batch = universe[i:i+batch_size]
        try:
            data = client.Fetch_Trading_Data(
                realtime=False,
                tickers=batch,
                fields=['open','high','low','close','volume'],
                adjusted=True,
                by='1d',
                from_date=str(from_date),
                to_date=str(to_date)
            ).get_data()
            # data expected DataFrame with timestamp,ticker,close,...
            if data is not None and not data.empty:
                all_rows.append(data)
            time.sleep(0.2)
        except Exception as e:
            print("[backfill] batch fetch error:", e)
    if all_rows:
        df_master = pd.concat(all_rows, ignore_index=True)
        df_master.to_parquet(master_path, index=False)
        print(f"[backfill] saved master parquet {master_path} rows={len(df_master)}")
        return master_path
    else:
        print("[backfill] no data fetched")
        return None

# ===== A3C inference fallback (if precomputed file not available) =====
def a3c_fallback_signals_from_snapshot(snapshot_df):
    # snapshot_df: one-row-per-ticker with TA columns
    rows=[]
    date_str = datetime.now().strftime("%Y-%m-%d")
    for _, r in snapshot_df.iterrows():
        ema5 = r.get('ema_5', np.nan); ema20 = r.get('ema_20', np.nan); rsi = r.get('rsi_14', np.nan)
        buy = False
        if not math.isnan(ema5) and not math.isnan(ema20) and ema5 > ema20 and (not math.isnan(rsi) and rsi < RSI_TH):
            buy = True
        action = 1 if buy else 0
        pb, ph, ps = (0.75, 0.2, 0.05) if buy else (0.1, 0.85, 0.05)
        rows.append([date_str, r['ticker'], int(action), float(pb), float(ph), float(ps)])
    df = pd.DataFrame(rows, columns=["date","ticker","action","prob_buy","prob_hold","prob_sell"])
    return df

# ===== Rule scoring (per snapshot) =====
def rule_scoring_from_snapshot(snapshot_df):
    rows=[]
    date_str = datetime.now().strftime("%Y-%m-%d")
    for _, r in snapshot_df.iterrows():
        ticker = r['ticker']
        ema5 = float(r.get('ema_5', np.nan)) if not pd.isna(r.get('ema_5', np.nan)) else np.nan
        ema20 = float(r.get('ema_20', np.nan)) if not pd.isna(r.get('ema_20', np.nan)) else np.nan
        rsi = float(r.get('rsi_14', np.nan)) if not pd.isna(r.get('rsi_14', np.nan)) else np.nan
        macd_diff = float(r.get('macd_diff', np.nan)) if not pd.isna(r.get('macd_diff', np.nan)) else np.nan
        close = float(r.get('close', np.nan)) if not pd.isna(r.get('close', np.nan)) else np.nan
        boll_up = float(r.get('boll_up', np.nan)) if not pd.isna(r.get('boll_up', np.nan)) else np.nan
        vol_z = float(r.get('vol_z', np.nan)) if not pd.isna(r.get('vol_z', np.nan)) else np.nan

        score=0; pass_list=[]; fail_list=[]
        if not math.isnan(ema5) and not math.isnan(ema20) and ema5 > ema20:
            score += 1; pass_list.append("EMA5>EMA20")
        else:
            fail_list.append("EMA<=20")
        if not math.isnan(rsi) and rsi < RSI_TH:
            score += 1; pass_list.append("RSI<TH")
        else:
            fail_list.append("RSI>=")
        if not math.isnan(macd_diff) and macd_diff > 0:
            score += 1; pass_list.append("MACD>0")
        else:
            fail_list.append("MACD<=0")
        if not math.isnan(close) and not math.isnan(boll_up) and close > boll_up:
            score += 1; pass_list.append("BollBreak")
        else:
            fail_list.append("BollNo")
        if not math.isnan(vol_z) and vol_z > VOL_SPIKE_MULT:
            score += 1; pass_list.append("VolSpike")
        else:
            fail_list.append("VolNormal")

        details = {"ema5":ema5,"ema20":ema20,"rsi":rsi,"macd_diff":macd_diff,"close":close,"boll_up":boll_up,"vol_z":vol_z}
        rows.append([date_str, ticker, int(score), ";".join(pass_list), ";".join(fail_list), json.dumps(details)])
    df = pd.DataFrame(rows, columns=["date","ticker","rule_score","pass_list","fail_list","detail_values"])
    return df

# ===== Single-day processing (core of replay) =====
def process_one_replay_day(client, day_date, universe, master_df=None, use_precomputed_a3c=False):
    """
    day_date: pd.Timestamp or 'YYYY-MM-DD'
    universe: list of tickers
    master_df: full ohlcv_master DataFrame (optional, if provided reuse)
    use_precomputed_a3c: if True and signals/a3c_signals_infer.csv present, will use it.
    Returns: dict with paths/dataframes for a3c, rule, snapshot
    """
    day = pd.to_datetime(day_date).date()
    date_str = day.strftime("%Y-%m-%d")
    print(f"[replay] running for {date_str}")

    # 1) prepare price data for day: we will derive snapshot = last available row <= day for each ticker
    master_path = os.path.join(DATA_DIR, "ohlcv_master.parquet")
    if master_df is None:
        if not os.path.exists(master_path):
            # try backfill
            print("[replay] ohlcv_master.parquet not found. Backfilling history...")
            backfill_master(client, universe, months=BACKFILL_MONTHS)
        if not os.path.exists(master_path):
            print("[replay] master missing -> abort day")
            return {"error":"master_missing"}
        master_df = pd.read_parquet(master_path)
        master_df["timestamp"] = pd.to_datetime(master_df["timestamp"])

    # filter rows up to day (inclusive)
    df_up_to = master_df[master_df["timestamp"].dt.date <= day].copy()
    if df_up_to.empty:
        print("[replay] no historical rows <= day")
        return {"error":"no_rows"}

    # Build snapshot: last row per ticker
    last_rows = df_up_to.sort_values(["ticker","timestamp"]).groupby("ticker").tail(1).reset_index(drop=True)
    # compute TA per ticker: we may need timeseries longer than 1 row to compute indicators.
    # For robust TA we compute per-ticker on its timeseries up to day
    snapshot_frames=[]
    fi = client.FiinIndicator()  # IMPORTANT: use client.FiinIndicator() per your correct sample
    for tk in universe:
        tk_df = df_up_to[df_up_to["ticker"]==tk].sort_values("timestamp").copy()
        if tk_df.empty:
            continue
        tk_df = add_ta_indicators_for_df(tk_df, fi)
        # pick last row (snapshot)
        last = tk_df.tail(1).copy()
        last["ticker"] = tk
        snapshot_frames.append(last)
    if not snapshot_frames:
        print("[replay] no ticker snapshots computed")
        return {"error":"no_snapshots"}
    snapshot_df = pd.concat(snapshot_frames, ignore_index=True)
    # Keep only desired columns and ensure 'ticker' present
    if "ticker" not in snapshot_df.columns:
        snapshot_df["ticker"] = snapshot_df.get("ticker", np.nan)
    # Save snapshot CSV for debugging
    snap_path = os.path.join(FEATURE_DIR, f"features_today_{date_str}.csv")
    snapshot_df.to_csv(snap_path, index=False)

    # 2) A3C inference: try precomputed file first
    a3c_out = None
    if use_precomputed_a3c:
        infer_files = sorted(glob.glob(os.path.join(SIGNAL_DIR, "a3c_signals_*.csv")))
        if infer_files:
            # choose the latest precomputed file and filter by date if available
            a3c_df_pre = pd.read_csv(infer_files[-1])
            if 'date' in a3c_df_pre.columns:
                a3c_df_pre['date'] = pd.to_datetime(a3c_df_pre['date']).dt.strftime("%Y-%m-%d")
                a3c_sel = a3c_df_pre[a3c_df_pre['date'] == date_str].copy()
                if not a3c_sel.empty:
                    a3c_out = a3c_sel
    if a3c_out is None:
        # fallback heuristic
        a3c_out = a3c_fallback_signals_from_snapshot(snapshot_df)
    # save a3c signals
    a3c_path = os.path.join(SIGNAL_DIR, f"a3c_signals_{date_str}.csv")
    a3c_out.to_csv(a3c_path, index=False)

    # 3) rule scoring
    rule_df = rule_scoring_from_snapshot(snapshot_df)
    rule_path = os.path.join(SIGNAL_DIR, f"rule_scores_{date_str}.csv")
    rule_df.to_csv(rule_path, index=False)

    # 4) Merge and prepare alerts summary (no execution)
    merged = a3c_out.merge(rule_df, on=["date","ticker"], how="left")
    merged["rule_score"] = merged["rule_score"].fillna(0).astype(int)

    # 5) Build alert list per our decision rules (A3C==1 & rule_score >= RULE_MIN_SCORE)
    alerts = merged[(merged["action"]==1) & (merged["rule_score"] >= RULE_MIN_SCORE)].copy()
    # If no alerts but you want rule-only alerts, create those where rule_score >= RULE_MIN_SCORE
    # (we will still send hold summary if no alerts)
    return {
        "date": date_str,
        "snapshot_path": snap_path,
        "a3c_path": a3c_path,
        "rule_path": rule_path,
        "snapshot_df": snapshot_df,
        "a3c_df": a3c_out,
        "rule_df": rule_df,
        "merged": merged,
        "alerts": alerts
    }

# ===== Replay driver: run across date range and send telegram (and optional GSheet) =====
def run_historical_replay(start_date=START_DATE, end_date=END_DATE, push_gsheet=PUSH_GSHEET, send_telegram=True, use_precomputed_a3c=True, universe_override=None, allow_rule_only=False):
    client = init_fiin_client()
    if client is None:
        raise RuntimeError("Fiin login failed; cannot proceed")

    # Determine universe: read universe/universe_list.csv if exists, else use top tickers from master
    universe_path = os.path.join("universe","universe_list.csv")
    universe=[]
    if universe_override:
        universe = universe_override
    elif os.path.exists(universe_path):
        try:
            df_univ = pd.read_csv(universe_path)
            universe = df_univ["ticker"].astype(str).tolist()
        except Exception:
            universe = []
    # fallback: take top 200 tickers present in ohlcv_master
    master_path = os.path.join(DATA_DIR, "ohlcv_master.parquet")
    if not universe and os.path.exists(master_path):
        px = pd.read_parquet(master_path)
        universe = px["ticker"].unique().tolist()[:200]
    if not universe:
        raise RuntimeError("Universe empty: please provide universe_list.csv or ensure ohlcv_master.parquet is present")

    # Preload master once to speed up
    master_df = None
    if os.path.exists(master_path):
        master_df = pd.read_parquet(master_path)
        master_df["timestamp"] = pd.to_datetime(master_df["timestamp"])

    start = pd.to_datetime(start_date)
    end = pd.to_datetime(end_date)
    day = start
    summary_rows=[]
    while day <= end:
        try:
            res = process_one_replay_day(client, day, universe, master_df=master_df, use_precomputed_a3c=use_precomputed_a3c)
            if res.get("error"):
                print(f"[replay] error for {day.date()}: {res.get('error')}")
                summary_rows.append({"date": day.strftime("%Y-%m-%d"), "status": "error", "note": res.get("error")})
                day += pd.Timedelta(days=1)
                continue

            merged = res["merged"]
            alerts = res["alerts"]

            # Build telegram message
            date_label = res["date"]
            if not alerts.empty:
                # list alerts
                lines=[]
                for _, r in alerts.iterrows():
                    t = r["ticker"]
                    a3c_label = {1:"BUY",0:"HOLD",-1:"SELL"}.get(int(r.get("action",0)),"HOLD")
                    score = int(r.get("rule_score",0))
                    lines.append(f"- {t} | A3C:{a3c_label} | rule:{score}")
                body = "\n".join(lines)
                msg = f"🟢 *REPLAY ALERTS* {date_label}\n{body}\n(Replay log)"
                if send_telegram:
                    ok = send_telegram_text(msg, thread_id=TG_THREAD_ID)
                    if not ok:
                        # try without thread id (compatibility)
                        send_telegram_text(msg, thread_id=None)
                summary_rows.append({"date": date_label, "status":"alerts_sent", "n_alerts": len(alerts)})
            else:
                # no alerts -> send HOLD summary or rule-only if allowed
                if allow_rule_only:
                    rule_candidates = merged[merged["rule_score"] >= RULE_MIN_SCORE].sort_values("rule_score", ascending=False).head(10)
                    if not rule_candidates.empty:
                        lines=[]
                        for i, r in enumerate(rule_candidates.itertuples(),1):
                            lines.append(f"{i}. {r.ticker} — rule {int(r.rule_score)}")
                        msg = f"🟡 *REPLAY RULE-ONLY SUMMARY* {date_label}\nTop {len(rule_candidates)}:\n" + "\n".join(lines)
                        if send_telegram:
                            ok = send_telegram_text(msg, thread_id=TG_THREAD_ID)
                            if not ok:
                                send_telegram_text(msg, thread_id=None)
                        summary_rows.append({"date": date_label, "status":"rule_only_sent", "n_candidates": len(rule_candidates)})
                    else:
                        msg = f"🟡 *REPLAY HOLD* {date_label} — no signals."
                        if send_telegram:
                            send_telegram_text(msg, thread_id=TG_THREAD_ID)
                        summary_rows.append({"date": date_label, "status":"hold", "n_candidates": 0})
                else:
                    msg = f"🟡 *REPLAY HOLD* {date_label} — no signals."
                    if send_telegram:
                        send_telegram_text(msg, thread_id=TG_THREAD_ID)
                    summary_rows.append({"date": date_label, "status":"hold", "n_candidates": 0})

            # Optional: push snapshot or alerts to Google Sheets (if turned on)
            if push_gsheet:
                try:
                    # push merged (select columns to avoid huge sheet)
                    push_df = merged[["date","ticker","action","prob_buy","rule_score"]].copy()
                    append_df_to_gsheet(push_df, GS_WS)
                except Exception as e:
                    print("[gs] append error:", e)

        except Exception as e:
            print("[replay] exception for", day.strftime("%Y-%m-%d"), e)
            traceback.print_exc()
            summary_rows.append({"date": day.strftime("%Y-%m-%d"), "status":"exception", "note": str(e)})

        day += pd.Timedelta(days=1)

    # Save replay summary CSV
    summary_df = pd.DataFrame(summary_rows)
    summary_path = os.path.join(MONITOR_DIR, f"replay_summary_{start.strftime('%Y%m%d')}_{end.strftime('%Y%m%d')}.csv")
    summary_df.to_csv(summary_path, index=False)
    print("[replay] done. summary saved:", summary_path)
    return summary_path


In [19]:
summary = run_historical_replay(start_date=START_DATE, end_date=END_DATE, push_gsheet=PUSH_GSHEET, send_telegram=True, use_precomputed_a3c=True, allow_rule_only=True)

[init] Fiin client logged in
[replay] running for 2025-03-25
[replay] running for 2025-03-26
[replay] running for 2025-03-27
[replay] running for 2025-03-28
[replay] running for 2025-03-29
[replay] running for 2025-03-30
[replay] running for 2025-03-31
[replay] running for 2025-04-01
[replay] running for 2025-04-02
[replay] running for 2025-04-03
[replay] running for 2025-04-04
[replay] running for 2025-04-05
[replay] running for 2025-04-06
[replay] running for 2025-04-07
[replay] running for 2025-04-08
[replay] running for 2025-04-09
[replay] running for 2025-04-10
[replay] running for 2025-04-11
[replay] running for 2025-04-12
[replay] running for 2025-04-13
[replay] running for 2025-04-14
[replay] running for 2025-04-15
[replay] running for 2025-04-16
[replay] done. summary saved: ./monitor\replay_summary_20250325_20250416.csv
