In [23]:
# app.py

import os
import re
import time
import pandas as pd
import prince
import plotly.express as px

from google.cloud import bigquery
from google.oauth2 import service_account

import dash
from dash import dcc, html, Input, Output, dash_table
from sklearn.cluster import KMeans
# At top of app.py, import and configure a Kafka consumer
from confluent_kafka import Consumer, TopicPartition
import json
from datetime import datetime, timezone, timedelta

KAFKA_CONF = {
    'bootstrap.servers':  'pkc-619z3.us-east1.gcp.confluent.cloud:9092',
    'security.protocol':  'SASL_SSL',
    'sasl.mechanisms':    'PLAIN',
    'sasl.username':      'H7C5SHD4EUVIGHTZ',
    'sasl.password':      'oyA7H99XPrK6c6I/aA3yB5fGhAlcp055Hr9ZdPrcqm5qlPRdsshfzS/Ku4xCHD8z',
    'group.id':           'dash-live-group',
    'auto.offset.reset':      'earliest',    # ← start from earliest
    'enable.auto.commit':     False         # ← don’t commit offsets automatically
}
live_consumer = Consumer(KAFKA_CONF)
live_consumer.subscribe(['watch_live_topic'])


# ── 1) BigQuery client setup ────────────────────────────────────────────────
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "mindful-vial-460001-h6-4d83b36dd3e9.json"
KEY_PATH = "mindful-vial-460001-h6-4d83b36dd3e9.json"
PROJECT  = "mindful-vial-460001-h6"
CRED     = service_account.Credentials.from_service_account_file(KEY_PATH)
client   = bigquery.Client(project=PROJECT, credentials=CRED)
PROJECT = "mindful-vial-460001-h6"
DATASET = "euphoria"
client  = bigquery.Client(project=PROJECT)

# ── 2) Shared queries & data ───────────────────────────────────────────────

# Pre-load KPI data
KPI_SQL = f"""
WITH
  viewed AS (
    SELECT country, SUM(length) AS total_watch_seconds
    FROM `{PROJECT}.{DATASET}.watch_topic`
    GROUP BY country
    ORDER BY total_watch_seconds DESC
    LIMIT 10
  ),
  purchased AS (
    SELECT product_name, COUNT(*) AS purchase_count
    FROM `{PROJECT}.{DATASET}.purchase_events_topic`
    GROUP BY product_name
    ORDER BY purchase_count DESC
    LIMIT 8
  ),
  streamer_perf AS (
    SELECT
      p.screen_name,
      SUM((s.viewers_total/NULLIF(s.length,0)) * s.comments_total) AS performance_score
    FROM `{PROJECT}.{DATASET}.streams_topic`   AS s
    JOIN `{PROJECT}.{DATASET}.partners_topic`  AS p
      ON s.partner_id = p.partner_id
    GROUP BY p.screen_name
    ORDER BY performance_score DESC
    LIMIT 10
  ),
  best_games AS (
    SELECT product_name, COUNT(*) AS purchase_count
    FROM `{PROJECT}.{DATASET}.purchase_events_topic`
    WHERE category = 'game'
    GROUP BY product_name
    ORDER BY purchase_count DESC
    LIMIT 2
  ),
  streamed_games AS (
    SELECT
      g.title      AS game_title,
      COUNT(*)     AS stream_count
    FROM `{PROJECT}.{DATASET}.streams_topic` AS s
    JOIN `{PROJECT}.{DATASET}.games_topic`   AS g
      ON s.game_id = g.game_id
    GROUP BY g.title
    ORDER BY stream_count DESC
    LIMIT 2
  ),
  top_cust_purch AS (
    SELECT customer_id, COUNT(*) AS purchase_count
    FROM `{PROJECT}.{DATASET}.purchase_events_topic`
    GROUP BY customer_id
    ORDER BY purchase_count DESC
    LIMIT 1000
  ),
  top_cust_watch AS (
    SELECT customer_id, SUM(length) AS total_watch_seconds
    FROM `{PROJECT}.{DATASET}.watch_topic`
    GROUP BY customer_id
    ORDER BY total_watch_seconds DESC
    LIMIT 1000
  ),
  monthly_watch AS (
    SELECT
      FORMAT('%04d-%02d',
        EXTRACT(YEAR  FROM DATE(date)),
        EXTRACT(MONTH FROM DATE(date))
      ) AS period,
      SUM(length) AS total_watch_seconds
    FROM `{PROJECT}.{DATASET}.watch_topic`
    GROUP BY period
    ORDER BY period
  ),
  yearly_watch AS (
    SELECT
      CAST(EXTRACT(YEAR FROM DATE(date)) AS STRING) AS period,
      SUM(length) AS total_watch_seconds
    FROM `{PROJECT}.{DATASET}.watch_topic`
    GROUP BY period
    ORDER BY period
  ),
  monthly_merch AS (
    SELECT
      FORMAT('%04d-%02d',
        EXTRACT(YEAR  FROM DATE(timestamp)),
        EXTRACT(MONTH FROM DATE(timestamp))
      ) AS period,
      SUM(price) AS total_merch_sales
    FROM `{PROJECT}.{DATASET}.purchase_events_topic`
    WHERE category = 'merch'
    GROUP BY period
    ORDER BY period
  ),
  yearly_merch AS (
    SELECT
      CAST(EXTRACT(YEAR FROM DATE(timestamp)) AS STRING) AS period,
      SUM(price) AS total_merch_sales
    FROM `{PROJECT}.{DATASET}.purchase_events_topic`
    WHERE category = 'merch'
    GROUP BY period
    ORDER BY period
  )

SELECT 'Top 10 Viewed Countries'       AS kpi, country        AS label, CAST(total_watch_seconds AS STRING)     AS value FROM viewed
UNION ALL
SELECT 'Top 8 Purchased Products'      AS kpi, product_name   AS label, CAST(purchase_count            AS STRING) AS value FROM purchased
UNION ALL
SELECT 'Top 10 Streamer Performance'   AS kpi, screen_name    AS label, CAST(ROUND(performance_score,2) AS STRING) AS value FROM streamer_perf
UNION ALL
SELECT 'Top 2 Best-Selling Games'      AS kpi, product_name   AS label, CAST(purchase_count            AS STRING) AS value FROM best_games
UNION ALL
SELECT 'Top 2 Most-Streamed Games'     AS kpi, game_title     AS label, CAST(stream_count              AS STRING) AS value FROM streamed_games
UNION ALL
SELECT 'Top 1000 Customers by Purchases'      AS kpi, customer_id AS label, CAST(purchase_count            AS STRING) AS value FROM top_cust_purch
UNION ALL
SELECT 'Top 1000 Customers by Watch Seconds'  AS kpi, customer_id AS label, CAST(total_watch_seconds       AS STRING) AS value FROM top_cust_watch
UNION ALL
SELECT 'Monthly Watch (sec)'           AS kpi, period         AS label, CAST(total_watch_seconds       AS STRING) AS value FROM monthly_watch
UNION ALL
SELECT 'Yearly Watch (sec)'            AS kpi, period         AS label, CAST(total_watch_seconds       AS STRING) AS value FROM yearly_watch
UNION ALL
SELECT 'Monthly Merch Sales'           AS kpi, period         AS label, FORMAT('$%.2f', total_merch_sales)        AS value FROM monthly_merch
UNION ALL
SELECT 'Yearly Merch Sales'            AS kpi, period         AS label, FORMAT('$%.2f', total_merch_sales)        AS value FROM yearly_merch
;
"""
df_kpi = client.query(KPI_SQL).to_dataframe()

# ── 3) Trophy segmentation util ────────────────────────────────────────────
def compute_trophy_segments(sample_limit=50000, k=4):
    sql = f"""
      WITH trophy_profiles AS (
        SELECT
          c.customer_id,
          DATE_DIFF(CURRENT_DATE(), DATE(c.birthday), YEAR) AS age,
          c.gender,
          c.region
        FROM `{PROJECT}.{DATASET}.purchase_events_topic` p
        JOIN `{PROJECT}.{DATASET}.customers_topic` c
          USING(customer_id)
        WHERE p.category='merch'
          AND p.product_name='Authentic Mahiman Trophy'
      )
      SELECT *
      FROM trophy_profiles
      LIMIT {sample_limit}
    """
    df = client.query(sql).to_dataframe()
    # bin age
    df['age_bin'] = pd.cut(
      df['age'], bins=range(10,81,5),
      labels=[f"{i}-{i+4}" for i in range(10,80,5)],
      right=False
    )
    # MCA
    df_mca = df[['age_bin','gender','region']].astype(str)
    mca = prince.MCA(n_components=2, engine='sklearn', random_state=42).fit(df_mca)
    coords = mca.transform(df_mca)
    coords.columns = ['Dim1','Dim2']
    # KMeans
    km = KMeans(n_clusters=k, random_state=42)
    coords['cluster'] = km.fit_predict(coords[['Dim1','Dim2']])
    df['cluster']     = coords['cluster']
    # summary
    summary = pd.DataFrame([
      {
        'cluster': i,
        'size': int((df.cluster==i).sum()),
        'top_regions': df[df.cluster==i].region.value_counts().head(5).to_dict(),
        'avg_age': df[df.cluster==i].age.mean()
      }
      for i in range(k)
    ])
    return coords, df, summary, km.cluster_centers_

coords_seg, df_seg, df_seg_summary, seg_centers = compute_trophy_segments()


# ── 4) Build Dash app ────────────────────────────────────────────────
app = dash.Dash(__name__, suppress_callback_exceptions=True)
app.title = "Euphoria Analytical Dashboard"

app.layout = html.Div([
  dcc.Tabs([
    dcc.Tab(label="SQL Runner", children=[
      html.H3("BigQuery SQL Runner"),
      dcc.Textarea(
        id='sql-input',
        value=f"SELECT * FROM `{PROJECT}.{DATASET}.streams_topic` LIMIT 5;",
        style={'width':'100%','height':'120px'}
      ),
      html.Button("Run Query", id='run-sql'),
      html.Div(id='sql-table')
    ]),
    dcc.Tab(label="Live Watch (5m)", children=[
      html.H3("Live Watch Hours (last 5 min)"),
      dcc.Graph(id='live-choropleth'),
      dcc.Interval(id='interval-live', interval=10*1000, disabled=False)
    ]),
    dcc.Tab(label="KPI Dashboard", children=[
      html.H3("Euphoria KPIs"),
      dcc.Dropdown(
        id='kpi-dropdown',
        options=[{'label': k, 'value': k} for k in df_kpi.kpi.unique()],
        value=df_kpi.kpi.unique()[0]
      ),
      dash_table.DataTable(
        id='kpi-table',
        columns=[{'name':c,'id':c} for c in df_kpi.columns],
        page_size=10
      )
    ]),
    dcc.Tab(label="Yearly Watch Rank", children=[
      html.H3("Yearly Relative Watch Hours"),
      dcc.Dropdown(
        id='year-dropdown',
        options=[{'label':y,'value':y} for y in range(pd.Timestamp.now().year, pd.Timestamp.now().year-10, -1)],
        value=pd.Timestamp.now().year
      ),
      dcc.Graph(id='yearly-choropleth')
    ]),
    dcc.Tab(label="Trophy Segments", children=[
      html.H3("Trophy Buyer Segments (MCA + KMeans)"),
      dash_table.DataTable(
        id='seg-summary',
        columns=[{'name':c,'id':c} for c in df_seg_summary.columns],
        data=df_seg_summary.to_dict('records')
      ),
      dcc.Graph(id='seg-scatter')
    ])
  ])
], style={'padding':'20px'})

# ── 5) Callbacks ─────────────────────────────────────────────────────────

# SQL Runner
@app.callback(
  Output('sql-table','children'),
  Input('run-sql','n_clicks'),
  Input('sql-input','value')
)
def run_sql(n, query):
  if not n: return html.Div("Click Run Query")
  q = re.sub(r"\bPROJECT\b", PROJECT, query)
  q = re.sub(r"\bDATASET\b", DATASET, q)
  df = client.query(q).to_dataframe()
  return dash_table.DataTable(
    columns=[{'name':c,'id':c} for c in df.columns],
    data=df.to_dict('records'),
    page_size=10
  )

# Live watch map
# ── Live watch map (15% real-time substream) ───────────────────────────────
# Replace your update_live callback with this:
@app.callback(
    Output('live-choropleth', 'figure'),
    Input('interval-live', 'n_intervals')
)
def update_live(n):
    # 1) Reset to beginning of each partition
    for tp in live_consumer.assignment():
        tp = TopicPartition(tp.topic, tp.partition, 0)
        live_consumer.seek(tp)

    # 2) Consume messages
    msgs = live_consumer.consume(num_messages=200, timeout=1.0)
    records = []
    cutoff  = datetime.now(timezone.utc) - timedelta(minutes=5)

    for msg in msgs or []:
        if msg is None or msg.error():
            continue
        rec = json.loads(msg.value().decode('utf-8'))
        ts  = datetime.fromisoformat(rec['date'].replace('Z','+00:00'))
        if ts >= cutoff:
            records.append(rec)

    # 3) Empty‐data fallback
    if not records:
        empty = pd.DataFrame({'country':[], 'watch_hours':[]})
        fig = px.choropleth(
            empty, locations='country', locationmode='country names',
            color='watch_hours', color_continuous_scale='Viridis'
        )
        fig.update_layout(title="No live watch data in the past 5 minutes")
        return fig

    # 4) Aggregate and plot
    df_live = pd.DataFrame(records)
    agg     = df_live.groupby('country', as_index=False)['length'].sum()
    agg['watch_hours'] = agg['length'] / 3600.0

    fig = px.choropleth(
        agg,
        locations='country',
        locationmode='country names',
        color='watch_hours',
        hover_name='country',
        color_continuous_scale='Viridis',
        range_color=(0, agg.watch_hours.max())
    )
    fig.update_layout(title="Live Watch Hours (last 5 min)")
    return fig

# KPI table
@app.callback(
  Output('kpi-table','data'),
  Input('kpi-dropdown','value')
)
def update_kpi(k):
  return df_kpi[df_kpi.kpi==k].to_dict('records')

# Yearly watch rank
@app.callback(
  Output('yearly-choropleth','figure'),
  Input('year-dropdown','value')
)
def update_year(y):
  sql = f"""
    WITH country_totals AS (
      SELECT country, SUM(length)/3600.0 AS watch_hours
      FROM `{PROJECT}.{DATASET}.watch_topic`
      WHERE EXTRACT(YEAR FROM DATE(date)) = {y}
      GROUP BY country
    )
    SELECT country, watch_hours,
      PERCENT_RANK() OVER (ORDER BY watch_hours) AS pct_rank
    FROM country_totals
  """
  df = client.query(sql).to_dataframe()
  return px.choropleth(
    df, locations='country', locationmode='country names',
    color='pct_rank', hover_name='country',
    hover_data={'watch_hours':':.1f','pct_rank':':.2f'},
    color_continuous_scale='Viridis', range_color=(0,1)
  )

# Trophy segments scatter
@app.callback(
    Output('seg-scatter','figure'),
    Input('seg-summary','data')
)
def update_seg(_):
    fig = px.scatter(
        coords_seg, x='Dim1', y='Dim2',
        color='cluster', title='Trophy Buyer Segments'
    )
    # plot the returned centers
    fig.add_scatter(
        x=seg_centers[:,0],
        y=seg_centers[:,1],
        mode='markers',
        marker=dict(symbol='x', size=12, color='black'),
        name='Centroids'
    )
    return fig


# ── 6) Run server ─────────────────────────────────────────────────────────
if __name__ == "__main__":
    app.run(debug=True,port=8054)
