In [None]:
from google.cloud import bigquery
import pandas as pd
from datetime import date, timedelta
import vertexai
from vertexai.generative_models import GenerativeModel
from google.auth import default
import numpy as np
import json


In [None]:
# ---------- Config & clients ----------
PROJECT_ID = "bigtimestudios"
LOCATION = "us-central1"
credentials, project = default()
client = bigquery.Client(credentials=credentials, project=project)
vertexai.init(project=PROJECT_ID, location=LOCATION)
model = GenerativeModel("gemini-2.5-pro")

In [3]:
# ---------- Dates ----------
RUN_DATE = date.today() - timedelta(days=2)   # your temp override
YDAY     = RUN_DATE - timedelta(days=1)
WEEK_AGO = RUN_DATE - timedelta(days=7)
DAYS30_AGO = RUN_DATE - timedelta(days=30)
LOOKBACK = RUN_DATE - timedelta(days=40)      # cushion for gaps

RUN_DATE_STR = RUN_DATE.strftime("%Y-%m-%d")

In [6]:
def pct(a, b):
    if b is None or pd.isna(b) or b == 0: return np.nan
    return (a - b) / b

def ensure_date(col):
    return pd.to_datetime(col).dt.date

def window_avg(df, date_col: str, value_col: str, start_date, end_date):
    """Mean of daily totals in [start_date, end_date)."""
    win = df[(df[date_col] >= start_date) & (df[date_col] < end_date)]
    if win.empty: return np.nan
    daily = win.groupby(date_col, as_index=False)[value_col].sum()
    return float(daily[value_col].mean())

def change_vs_prior(current: pd.DataFrame,
                    prior: pd.DataFrame,
                    hist: pd.DataFrame,
                    date_col: str,
                    keys: list[str],
                    metric: str,
                    top_n: int | None = None,
                    ascending: bool = False) -> pd.DataFrame:
    """
    Movers with DoD delta plus 7d/30d baselines (mean of daily totals) and % vs baselines.
    hist must include at least 30d before RUN_DATE.
    """
    # group today + prior
    if current is None or current.empty:
        cols = keys + [metric, f"{metric}_yday", "delta", "pct_change",
                       f"{metric}_7d_avg", f"{metric}_30d_avg", "vs7_pct", "vs30_pct"]
        return pd.DataFrame(columns=cols)

    c = current.groupby(keys, as_index=False)[metric].sum()
    p = (prior.groupby(keys, as_index=False)[metric].sum()
         if prior is not None and not prior.empty
         else pd.DataFrame(columns=keys + [metric]))

    m = c.merge(p, on=keys, how="left", suffixes=("", "_yday"))
    ycol = f"{metric}_yday"
    m[ycol] = m[ycol].fillna(0)
    m["delta"] = m[metric] - m[ycol]
    m["pct_change"] = np.where(m[ycol] != 0, (m["delta"] / m[ycol]) * 100, np.nan)

    # 7d / 30d baselines per key
    h7 = hist[(hist[date_col] >= RUN_DATE - timedelta(days=7)) & (hist[date_col] < RUN_DATE)]
    h30 = hist[(hist[date_col] >= RUN_DATE - timedelta(days=30)) & (hist[date_col] < RUN_DATE)]

    h7g = (h7.groupby(keys + [date_col], as_index=False)[metric].sum()
             .groupby(keys, as_index=False)[metric].mean()
             .rename(columns={metric: f"{metric}_7d_avg"})) if not h7.empty else pd.DataFrame(columns=keys+[f"{metric}_7d_avg"])
    h30g = (h30.groupby(keys + [date_col], as_index=False)[metric].sum()
              .groupby(keys, as_index=False)[metric].mean()
              .rename(columns={metric: f"{metric}_30d_avg"})) if not h30.empty else pd.DataFrame(columns=keys+[f"{metric}_30d_avg"])

    m = m.merge(h7g, on=keys, how="left").merge(h30g, on=keys, how="left")

    m["vs7_pct"]  = np.where(m[f"{metric}_7d_avg"].fillna(0)  != 0, (m[metric] - m[f"{metric}_7d_avg"])  / m[f"{metric}_7d_avg"]  * 100, np.nan)
    m["vs30_pct"] = np.where(m[f"{metric}_30d_avg"].fillna(0) != 0, (m[metric] - m[f"{metric}_30d_avg"]) / m[f"{metric}_30d_avg"] * 100, np.nan)

    m = m.sort_values("delta", ascending=ascending)
    return m.head(top_n) if top_n else m

def _clean_for_join(s: pd.Series) -> pd.Series:
    s = s.astype(str)
    s = s.str.replace(r"\(.*?\)", "", regex=True)
    s = s.str.replace(r"grant[:\- ]*", "", regex=True)
    s = s.str.replace(r"\s+", " ", regex=True).str.strip().str.lower()
    return s

def compact_json(df: pd.DataFrame, cols: list[str], n=8):
    if df is None or df.empty: return []
    keep = [c for c in cols if c in df.columns]
    return df.loc[:, keep].head(n).to_dict(orient="records")


In [5]:

# ---------- Parameterized queries (server-side date filter) ----------
job_config = bigquery.QueryJobConfig(
    query_parameters=[
        bigquery.ScalarQueryParameter("start", "DATE", LOOKBACK),
        bigquery.ScalarQueryParameter("today", "DATE", RUN_DATE),
    ]
)

# DAU
df_dau = client.query("""
SELECT checkpoint AS Date, DAU, hours_played, avg_time_per_player
FROM bigtimestudios.BT_ML.ai_bot_input_dau
WHERE checkpoint BETWEEN @start AND @today
""", job_config=job_config).to_dataframe()

# Dungeons
df_dungeon = client.query("""
SELECT Day, portalType, mission, Players, dungeonUptime, sandConsumed
FROM bigtimestudios.BT_ML.ai_bot_input_dungeon
WHERE Day BETWEEN @start AND @today
""", job_config=job_config).to_dataframe()

# Revenue
df_revenue = client.query("""
SELECT Date, item_name, revenue, Buyers, transactions
FROM bigtimestudios.BT_ML.ai_bot_input_revenue
WHERE Date BETWEEN @start AND @today
""", job_config=job_config).to_dataframe()

# Tokens
df_tokens = client.query("""
SELECT Day, action, sourceString, amount, users
FROM bigtimestudios.BT_ML.ai_bot_input_token
WHERE Day BETWEEN @start AND @today
""", job_config=job_config).to_dataframe()

# Crafts
df_crafts = client.query("""
SELECT Day, name, crafts, users
FROM bigtimestudios.BT_ML.ai_bot_input_crafts
WHERE Day BETWEEN @start AND @today
""", job_config=job_config).to_dataframe()


In [None]:
# ---------- Normalize Dates ----------
df_revenue["Date"]  = ensure_date(df_revenue["Date"])
df_dau["Date"]      = ensure_date(df_dau["Date"])
df_dungeon["Day"]   = ensure_date(df_dungeon["Day"])
df_tokens["Day"]    = ensure_date(df_tokens["Day"])
df_crafts["Day"]    = ensure_date(df_crafts["Day"])

# ---------- Slices ----------
rev_today = df_revenue.loc[df_revenue["Date"] == RUN_DATE]
rev_yday  = df_revenue.loc[df_revenue["Date"] == YDAY]
rev_7d    = df_revenue.loc[(df_revenue["Date"] < RUN_DATE) & (df_revenue["Date"] >= WEEK_AGO)]

dau_today = df_dau.loc[df_dau["Date"] == RUN_DATE]
dau_yday  = df_dau.loc[df_dau["Date"] == YDAY]
dau_7d    = df_dau.loc[(df_dau["Date"] < RUN_DATE) & (df_dau["Date"] >= WEEK_AGO)]

gp_today  = df_dungeon.loc[df_dungeon["Day"] == RUN_DATE]
gp_yday   = df_dungeon.loc[df_dungeon["Day"] == YDAY]

tok_today = df_tokens.loc[df_tokens["Day"] == RUN_DATE]
tok_yday  = df_tokens.loc[df_tokens["Day"] == YDAY]

cr_today  = df_crafts.loc[df_crafts["Day"] == RUN_DATE]
cr_yday   = df_crafts.loc[df_crafts["Day"] == YDAY]

In [9]:
# ---------- KPIs ----------
total_rev_today = float(rev_today["revenue"].sum()) if not rev_today.empty else 0.0
buyers_today    = int(rev_today["Buyers"].sum()) if not rev_today.empty else 0
tx_today        = int(rev_today["transactions"].sum()) if not rev_today.empty else 0
total_rev_yday  = float(rev_yday["revenue"].sum()) if not rev_yday.empty else 0.0

rev_7d_avg  = window_avg(df_revenue, "Date", "revenue", RUN_DATE - timedelta(days=7),  RUN_DATE)
rev_30d_avg = window_avg(df_revenue, "Date", "revenue", RUN_DATE - timedelta(days=30), RUN_DATE)

rev_dod   = pct(total_rev_today, total_rev_yday)
rev_vs7   = pct(total_rev_today, rev_7d_avg)
rev_vs30  = pct(total_rev_today, rev_30d_avg)

# DAU
dau_today_v = int(dau_today["DAU"].sum()) if not dau_today.empty else 0
dau_yday_v  = int(dau_yday["DAU"].sum()) if not dau_yday.empty else np.nan

dau_7d_avg  = window_avg(df_dau, "Date", "DAU", RUN_DATE - timedelta(days=7),  RUN_DATE)
dau_30d_avg = window_avg(df_dau, "Date", "DAU", RUN_DATE - timedelta(days=30), RUN_DATE)

dau_dod  = pct(dau_today_v, dau_yday_v)
dau_vs7  = pct(dau_today_v, dau_7d_avg)
dau_vs30 = pct(dau_today_v, dau_30d_avg)

summary_metrics = {
    "date": RUN_DATE_STR,
    "revenue": {
        "today": round(total_rev_today, 2),
        "DoD_pct": None if pd.isna(rev_dod) else round(100 * rev_dod, 2),
        "vs7d_avg_pct": None if pd.isna(rev_vs7) else round(100 * rev_vs7, 2),
        "vs30d_avg_pct": None if pd.isna(rev_vs30) else round(100 * rev_vs30, 2),
        "buyers": buyers_today,
        "transactions": tx_today,
        "avg_7d": None if pd.isna(rev_7d_avg) else round(rev_7d_avg, 2),
        "avg_30d": None if pd.isna(rev_30d_avg) else round(rev_30d_avg, 2),
    },
    "dau": {
        "today": dau_today_v,
        "DoD_pct": None if pd.isna(dau_dod) else round(100 * dau_dod, 2),
        "vs7d_avg_pct": None if pd.isna(dau_vs7) else round(100 * dau_vs7, 2),
        "vs30d_avg_pct": None if pd.isna(dau_vs30) else round(100 * dau_vs30, 2),
        "avg_7d": None if pd.isna(dau_7d_avg) else round(dau_7d_avg, 2),
        "avg_30d": None if pd.isna(dau_30d_avg) else round(dau_30d_avg, 2),
        "avg_time_today_min": None if dau_today.empty else round(float(dau_today["avg_time_per_player"].mean()), 2),
    },
}

# Top seller (driver hint)
if not rev_today.empty:
    rev_by_item  = rev_today.groupby("item_name", as_index=False)["revenue"].sum()
    top_item_row = rev_by_item.sort_values("revenue", ascending=False).iloc[0]
    top_item     = {"item": top_item_row["item_name"], "revenue": float(top_item_row["revenue"])}
else:
    top_item = None

# ---------- Movers ----------
gp_players_chg  = change_vs_prior(gp_today, gp_yday, df_dungeon, "Day",
                                  ["portalType", "mission"], "Players", top_n=10)
gp_sand_chg     = change_vs_prior(gp_today, gp_yday, df_dungeon, "Day",
                                  ["portalType", "mission"], "sandConsumed", top_n=10)
gp_players_drop = gp_players_chg.loc[gp_players_chg["delta"] < 0].sort_values("delta").head(5)

tok_amt_chg   = change_vs_prior(tok_today, tok_yday, df_tokens, "Day",
                                ["action", "sourceString"], "amount", top_n=15)
tok_users_chg = change_vs_prior(tok_today, tok_yday, df_tokens, "Day",
                                ["action", "sourceString"], "users", top_n=15)

# ---------- Token sinks ----------
sinks_today = (tok_today.loc[tok_today["action"] == "sink"]
               .groupby("sourceString", as_index=False)["amount"].sum()
               .sort_values("amount", ascending=False))
tok_sinks_chg = change_vs_prior(
    tok_today.loc[tok_today["action"] == "sink"],
    tok_yday.loc[tok_yday["action"] == "sink"] if not tok_yday.empty else pd.DataFrame(columns=tok_today.columns),
    df_tokens[df_tokens["action"] == "sink"], "Day",
    ["sourceString"], "amount", top_n=5
)

# Crafting
cr_crafts_chg = change_vs_prior(cr_today, cr_yday, df_crafts, "Day",
                                ["name"], "crafts", top_n=15)
cr_users_chg  = change_vs_prior(cr_today, cr_yday, df_crafts, "Day",
                                ["name"], "users",  top_n=15)
# ---------- Mission-level grants (robust) ----------
ctx_mission_insight = ""
grants_today = tok_today.loc[tok_today["action"] == "grant", ["sourceString", "amount"]].copy()
gp_players_today = gp_today[["mission", "Players"]].copy()

if not grants_today.empty:
    grants_today.rename(columns={"sourceString": "mission_raw"}, inplace=True)
    grants_today["mission_norm"] = _clean_for_join(grants_today["mission_raw"])
    token_grants_today = (grants_today.groupby("mission_norm", as_index=False)["amount"]
                          .sum().rename(columns={"amount": "tokens_granted"}))
else:
    token_grants_today = pd.DataFrame(columns=["mission_norm", "tokens_granted"])

if not gp_players_today.empty:
    gp_players_today["mission_norm"] = _clean_for_join(gp_players_today["mission"])
    gp_players_today = gp_players_today.groupby("mission_norm", as_index=False)["Players"].sum()
else:
    gp_players_today = pd.DataFrame(columns=["mission_norm", "Players"])

mission_insights = token_grants_today.merge(gp_players_today, on="mission_norm", how="inner")

if not mission_insights.empty:
    top_m = mission_insights.sort_values("tokens_granted", ascending=False).iloc[0]
    if not gp_today.empty:
        readable_map = (gp_today.assign(mission_norm=_clean_for_join(gp_today["mission"]))
                        .dropna(subset=["mission_norm"])
                        .drop_duplicates(subset=["mission_norm"])
                        .set_index("mission_norm")["mission"].to_dict())
        readable_name = readable_map.get(top_m["mission_norm"], str(top_m["mission_norm"]))
    else:
        readable_name = str(top_m["mission_norm"])
    ctx_mission_insight = {
        "mission": readable_name,
        "players": int(top_m["Players"]),
        "tokens_granted": int(top_m["tokens_granted"])
    }
elif not grants_today.empty:
    top_grant = (grants_today.groupby("mission_raw", as_index=False)["amount"]
                 .sum().sort_values("amount", ascending=False).iloc[0])
    ctx_mission_insight = {
        "mission": str(top_grant["mission_raw"]),
        "players": None,
        "tokens_granted": int(top_grant["amount"]),
        "note": "mission link unavailable"
    }

In [10]:
# ---------- Context (JSON-first, better for LLM reasoning) ----------
ctx = {
    "date": RUN_DATE_STR,
    "kpis": summary_metrics,
    "drivers": {
        "top_item": top_item  # {"item": ..., "revenue": ...} or None
    },
    "gameplay": {
        "players_up":   compact_json(gp_players_chg,  ["portalType","mission","Players","delta","pct_change","Players_7d_avg","Players_30d_avg","vs7_pct","vs30_pct"]),
        "players_down": compact_json(gp_players_drop, ["portalType","mission","Players","delta","pct_change","Players_7d_avg","Players_30d_avg","vs7_pct","vs30_pct"]),
        "sand_movers":  compact_json(gp_sand_chg,     ["portalType","mission","sandConsumed","delta","pct_change","sandConsumed_7d_avg","sandConsumed_30d_avg","vs7_pct","vs30_pct"]),
        "mission_insight": ctx_mission_insight or {}
    },
    "tokens": {
        "amount_movers": compact_json(tok_amt_chg, ["action","sourceString","amount","delta","pct_change","amount_7d_avg","amount_30d_avg","vs7_pct","vs30_pct"]),
        "user_movers":   compact_json(tok_users_chg, ["action","sourceString","users","delta","pct_change","users_7d_avg","users_30d_avg","vs7_pct","vs30_pct"]),
        "sinks_today":   compact_json(sinks_today, ["sourceString","amount"]),
        "sinks_change":  compact_json(tok_sinks_chg, ["sourceString","amount","delta","pct_change","amount_7d_avg","amount_30d_avg","vs7_pct","vs30_pct"])
    },
    "crafting": {
        "craft_movers": compact_json(cr_crafts_chg, ["name","crafts","delta","pct_change","crafts_7d_avg","crafts_30d_avg","vs7_pct","vs30_pct"]),
        "user_movers":  compact_json(cr_users_chg,  ["name","users","delta","pct_change","users_7d_avg","users_30d_avg","vs7_pct","vs30_pct"])
    }
}

In [13]:
# ---------- Prompt ----------
def format_prompt(ctx):
    return (
f"""You are Head of Analytics for Role Playing Game 'Big Time'.

Game context (short): Players buy Time Crystals (TC), use TC to add Time Sand to Hourglasses, then run Prestige Portals to farm $BIGTIME (e.g., 'racial' drops). Users also use TC to open Epoch Chests, which have been the main way we issue $BIGTIME to players over the last year. Entry to prestige uses $BIGTIME + TC. 'grant' in tokens often maps to a dungeon mission.

Using the JSON below, write a concise daily update for {ctx['date']}.
Rules:
- Bullets only, no preamble.
- For each section, state the change and one likely driver.
- Use both baselines: vs 7d and vs 30d.
- Call out anomalies (large % moves, issuance>rev, farming concentration).

JSON:
{json.dumps(ctx, separators=(',',':'))}
""").strip()

In [None]:
# Summarize
def summarize_with_vertex(prompt_text: str) -> str:
    return model.generate_content(prompt_text).text

prompt_text = format_prompt(ctx)
ai_summary = summarize_with_vertex(prompt_text)

# ---------- Fallback (simple bullets) ----------
if not ai_summary:
    lines = []
    lines.append(f"**Daily Update — {RUN_DATE_STR}**")
    lines.append(f"- Revenue: ${summary_metrics['revenue']['today']:,.0f} "
                 f"({summary_metrics['revenue']['DoD_pct']}% DoD, {summary_metrics['revenue']['vs7d_avg_pct']}% vs 7d); "
                 f"buyers={summary_metrics['revenue']['buyers']:,}, tx={summary_metrics['revenue']['transactions']:,}")
    lines.append(f"- DAU: {summary_metrics['dau']['today']:,} "
                 f"({summary_metrics['dau']['DoD_pct']}% DoD, {summary_metrics['dau']['vs7d_avg_pct']}% vs 7d); "
                 f"avg time={summary_metrics['dau']['avg_time_today_min']} min")
    ai_summary = "\n".join(lines)

print("\n===== AI SUMMARY =====\n")
print(ai_summary[:4000])
