<a href="https://colab.research.google.com/github/SamapanThongmee/blog/blob/main/Untitled61.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
%%writefile app.py
"""
SPX Market Breadth Dashboard - Robust Version (Fix blank charts)
"""

import streamlit as st
import pandas as pd
import plotly.graph_objects as go
from datetime import timedelta, date
import requests
from io import StringIO

# -------------------------
# Page
# -------------------------
st.set_page_config(page_title="SPX Dashboard", layout="wide")

SHEET_ID = "1faOXwIk7uR51IIeAMrrRPdorRsO7iJ3PDPn-mk5vc24"
GID = "564353266"

URL_PRIMARY  = f"https://docs.google.com/spreadsheets/d/{SHEET_ID}/export?format=csv&gid={GID}"
URL_FALLBACK = f"https://docs.google.com/spreadsheets/d/{SHEET_ID}/gviz/tq?tqx=out:csv&gid={GID}"

st.title("ðŸ“ˆ SPX Dashboard")

# -------------------------
# Helpers
# -------------------------
def _looks_like_html(text: str) -> bool:
    t = (text or "").lstrip().lower()
    return t.startswith("<!doctype html") or t.startswith("<html") or ("servicelogin" in t)

def _clean_numeric_series(s: pd.Series) -> pd.Series:
    s = s.astype(str).str.strip()
    s = s.replace({"nan": "", "None": "", "null": ""})
    s = (
        s.str.replace(",", "", regex=False)
         .str.replace("%", "", regex=False)
         .str.replace("âˆ’", "-", regex=False)   # unicode minus
         .str.replace("â€”", "-", regex=False)
    )
    return pd.to_numeric(s, errors="coerce")

def _normalize_percent(s: pd.Series) -> pd.Series:
    x = _clean_numeric_series(s)
    mx = x.max(skipna=True)
    if pd.notna(mx) and mx <= 1.5:
        x = x * 100.0
    return x

def _pick_col(df: pd.DataFrame, candidates: list[str]) -> str | None:
    cols = {str(c).strip().lower(): c for c in df.columns}
    for cand in candidates:
        key = cand.strip().lower()
        if key in cols:
            return cols[key]
    return None

def _to_naive_normalized_dt(s: pd.Series) -> pd.Series:
    dt = pd.to_datetime(s, errors="coerce")
    # remove timezone if any (safe)
    try:
        if getattr(dt.dt, "tz", None) is not None:
            dt = dt.dt.tz_convert(None)
    except Exception:
        pass
    # normalize to midnight so comparisons match
    dt = dt.dt.normalize()
    return dt

def _build_standard_df(raw: pd.DataFrame) -> pd.DataFrame:
    df = raw.copy()
    df.columns = [str(c).strip() for c in df.columns]

    # name-based mapping
    col_date  = _pick_col(df, ["Date", "Datetime", "Time", "timestamp"])
    col_open  = _pick_col(df, ["Open"])
    col_high  = _pick_col(df, ["High"])
    col_low   = _pick_col(df, ["Low"])
    col_close = _pick_col(df, ["Close"])

    col_ma20  = _pick_col(df, ["MA20"])
    col_ma50  = _pick_col(df, ["MA50"])
    col_ma200 = _pick_col(df, ["MA200"])

    col_above = _pick_col(df, ["Percentage_Above_Both", "PctAbove", "% Above Both", "AboveBoth"])
    col_below = _pick_col(df, ["Percentage_Below_Both", "PctBelow", "% Below Both", "BelowBoth"])

    # positional fallback (A..S style)
    if not all([col_date, col_open, col_high, col_low, col_close]):
        def try_positional(offset: int = 0) -> dict:
            if df.shape[1] < 19 + offset:
                return {}
            cols = df.columns.tolist()
            return {
                "Date": cols[0 + offset],
                "Open": cols[1 + offset],
                "High": cols[2 + offset],
                "Low":  cols[3 + offset],
                "Close":cols[4 + offset],
                "MA20": cols[11 + offset],
                "MA50": cols[12 + offset],
                "MA200":cols[14 + offset],
                "PctAbove": cols[15 + offset],
                "PctBelow": cols[18 + offset],
            }

        mapping = try_positional(0) or try_positional(1)
        if not mapping:
            raise ValueError(f"Cannot map columns. Found {df.shape[1]} columns: {df.columns.tolist()}")

        out = pd.DataFrame({
            "Date": df[mapping["Date"]],
            "Open": df[mapping["Open"]],
            "High": df[mapping["High"]],
            "Low":  df[mapping["Low"]],
            "Close":df[mapping["Close"]],
            "MA20": df[mapping["MA20"]],
            "MA50": df[mapping["MA50"]],
            "MA200":df[mapping["MA200"]],
            "PctAbove": df[mapping["PctAbove"]],
            "PctBelow": df[mapping["PctBelow"]],
        })
    else:
        out = pd.DataFrame({
            "Date": df[col_date],
            "Open": df[col_open],
            "High": df[col_high],
            "Low":  df[col_low],
            "Close":df[col_close],
            "MA20": df[col_ma20] if col_ma20 else None,
            "MA50": df[col_ma50] if col_ma50 else None,
            "MA200":df[col_ma200] if col_ma200 else None,
            "PctAbove": df[col_above] if col_above else None,
            "PctBelow": df[col_below] if col_below else None,
        })

    # parse dates robustly (normalize)
    out["Date"] = _to_naive_normalized_dt(out["Date"])
    out = out.dropna(subset=["Date"]).sort_values("Date")

    for c in ["Open", "High", "Low", "Close", "MA20", "MA50", "MA200"]:
        if c in out.columns:
            out[c] = _clean_numeric_series(out[c])

    for c in ["PctAbove", "PctBelow"]:
        if c in out.columns:
            out[c] = _normalize_percent(out[c])

    return out

def make_rangebreaks(dff: pd.DataFrame):
    """
    Safe rangebreaks:
    - always skip weekends
    - additionally skip missing BUSINESS days only (holidays)
    """
    if dff.empty:
        return [dict(bounds=["sat", "mon"])]

    # âœ… correct for Series
    obs = pd.to_datetime(dff["Date"], errors="coerce").dt.normalize()
    obs = obs.dropna()
    if obs.empty:
        return [dict(bounds=["sat", "mon"])]

    obs_set = set(obs.unique())
    bdays = pd.date_range(obs.min(), obs.max(), freq="B")
    missing_bdays = [d for d in bdays if d not in obs_set]

    rbs = [dict(bounds=["sat", "mon"])]
    if missing_bdays:
        rbs.append(dict(values=missing_bdays))
    return rbs

@st.cache_data(ttl=300, show_spinner=False)
def load_sheet_df(url_primary: str, url_fallback: str) -> tuple[pd.DataFrame, str]:
    headers = {"User-Agent": "Mozilla/5.0"}
    for url in [url_primary, url_fallback]:
        r = requests.get(url, headers=headers, timeout=20)
        txt = r.text or ""
        if _looks_like_html(txt):
            continue
        df = pd.read_csv(StringIO(txt), dtype=str)
        return df, url
    raise ValueError("Google Sheet did not return CSV (private/not published or wrong GID).")

# -------------------------
# Controls
# -------------------------
c1, c2, c3, c4 = st.columns([1, 1.5, 3, 1.5])

with c1:
    if st.button("ðŸ”„ Refresh data", use_container_width=True):
        st.cache_data.clear()
        st.rerun()

with c2:
    months_to_show = st.selectbox("Default window", [3, 6, 12, 24], index=1)

with c4:
    disable_breaks = st.checkbox("Disable rangebreaks (debug)", value=False)

# -------------------------
# Load
# -------------------------
try:
    raw_df, used_url = load_sheet_df(URL_PRIMARY, URL_FALLBACK)
except Exception as e:
    st.error(f"Load failed: {e}")
    st.stop()

try:
    df = _build_standard_df(raw_df)
except Exception as e:
    st.error(f"Parse/mapping failed: {e}")
    with st.expander("Debug: raw columns + preview"):
        st.write(raw_df.columns.tolist())
        st.write(raw_df.head(10))
    st.stop()

if df.empty:
    st.warning("No valid rows after parsing Date. Check your Date column format.")
    with st.expander("Debug: raw preview"):
        st.write(raw_df.head(20))
    st.stop()

max_date = df["Date"].max()
default_start = (max_date - timedelta(days=30 * int(months_to_show))).date()
default_end = max_date.date()

with c3:
    start_d, end_d = st.date_input(
        "Date range",
        value=(default_start, default_end),
        min_value=df["Date"].min().date(),
        max_value=df["Date"].max().date(),
    )
    if isinstance(start_d, date) and isinstance(end_d, date) and start_d > end_d:
        start_d, end_d = end_d, start_d

mask = (df["Date"].dt.date >= start_d) & (df["Date"].dt.date <= end_d)
dff = df.loc[mask].copy()

st.markdown("---")
# st.write(f"Using: `{used_url}`")
# st.write(f"Showing: **{start_d} â†’ {end_d}** | Rows: **{len(dff):,}**")
st.write(f"Showing: **{start_d} â†’ {end_d}**") # | Rows: **{len(dff):,}**")

if dff.empty:
    st.warning("No data in the selected date range.")
    st.stop()

needed_ohlc = dff[["Open", "High", "Low", "Close"]].notna().all(axis=1).sum()
if needed_ohlc == 0:
    st.error("OHLC columns are not numeric (all NaN after cleaning).")
    with st.expander("Debug: cleaned OHLC sample"):
        st.write(dff[["Date", "Open", "High", "Low", "Close"]].head(30))
    st.stop()

rangebreaks = [] if disable_breaks else make_rangebreaks(dff)

# -------------------------
# Charts
# -------------------------
st.subheader("ðŸ“ˆ S&P 500 Index")
fig1 = go.Figure(
    go.Candlestick(
        x=dff["Date"],
        open=dff["Open"],
        high=dff["High"],
        low=dff["Low"],
        close=dff["Close"],
        increasing_line_color="#26a69a",
        decreasing_line_color="#ef5350",
    )
)
if rangebreaks:
    fig1.update_xaxes(rangebreaks=rangebreaks)
fig1.update_layout(height=450, template="plotly_dark", xaxis_rangeslider_visible=False)
st.plotly_chart(fig1, use_container_width=True)

st.markdown("---")
st.subheader("ðŸ“Š Market Breadth Analysis")
tab1, tab2 = st.tabs(["ðŸ“Š Moving Averages", "ðŸ“ˆ Percentage Distribution"])

with tab1:
    fig2 = go.Figure()
    if "MA20" in dff.columns and dff["MA20"].notna().any():
        fig2.add_trace(go.Scatter(x=dff["Date"], y=dff["MA20"], name="MA20", line=dict(width=1.5)))
    if "MA50" in dff.columns and dff["MA50"].notna().any():
        fig2.add_trace(go.Scatter(x=dff["Date"], y=dff["MA50"], name="MA50", line=dict(width=2)))
    if "MA200" in dff.columns and dff["MA200"].notna().any():
        fig2.add_trace(go.Scatter(x=dff["Date"], y=dff["MA200"], name="MA200", line=dict(width=2.5)))

    if rangebreaks:
        fig2.update_xaxes(rangebreaks=rangebreaks)
    fig2.update_layout(
        height=400,
        template="plotly_dark",
        hovermode="x unified",
        legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),
    )
    st.plotly_chart(fig2, use_container_width=True)

with tab2:
    fig3 = go.Figure()

    if "PctAbove" in dff.columns and dff["PctAbove"].notna().any():
        fig3.add_trace(
            go.Scatter(
                x=dff["Date"], y=dff["PctAbove"],
                name="% Above Both",
                fill="tozeroy",
                line=dict(width=1.5),
                fillcolor="rgba(0, 255, 0, 0.2)",
            )
        )

    if "PctBelow" in dff.columns and dff["PctBelow"].notna().any():
        fig3.add_trace(
            go.Scatter(
                x=dff["Date"], y=dff["PctBelow"],
                name="% Below Both",
                fill="tozeroy",
                line=dict(width=1.5),
                fillcolor="rgba(255, 0, 0, 0.2)",
            )
        )

    if rangebreaks:
        fig3.update_xaxes(rangebreaks=rangebreaks)
    fig3.update_layout(
        height=400,
        template="plotly_dark",
        hovermode="x unified",
        legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),
        yaxis_title="Percent",
    )
    st.plotly_chart(fig3, use_container_width=True)

st.markdown("---")
st.caption(f"ðŸ“Š Dashboard | {len(dff):,} points | {dff['Date'].min().date()} to {dff['Date'].max().date()}")