In [None]:
'''
Docstring for 2501291219-05.ipynb
This notebook integrate between pipeline-01 and BigQuery input
'''

### Import important library

In [1]:
from __future__ import annotations

import json
import os
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Tuple

import numpy as np
import pandas as pd
import yaml
from pathlib import Path

In [2]:
from functions.utils.logging import get_logger
from functions.utils.config  import PROJECT_ROOT, load_config
from functions.utils.llm_client import build_llm_client_from_yaml
from functions.utils.text_embeddings import GoogleEmbeddingModel
from functions.core.context_builder import build_user_context
from functions.core.history import build_history_summary

### QueryData

In [3]:
from google.cloud import bigquery

In [62]:
class DataQuery:
    def __init__(self):
        self.client = bigquery.Client()
    def get_students(self):
        query = """
            SELECT *
            FROM `poc-piloturl-nonprod.gold_layer.students`
        """
        df = self.client.query(query).to_dataframe()
        return df
    def get_interactions(self):
        query = """
            SELECT *
            FROM `poc-piloturl-nonprod.gold_layer.interactions`
        """
        df = self.client.query(query).to_dataframe()
        return df 
    def get_user_events_json(self):
        query = """
        SELECT *
        FROM `poc-piloturl-nonprod.gold_layer.feeds`
        """
        df = self.client.query(query).to_dataframe()
        # ensure created_at is ISO-8601 Z format
        df["created_at"] = df["created_at"].dt.strftime("%Y-%m-%dT%H:%M:%SZ")

        feeds_lookup: Dict[str, Dict[str, Any]] = {}
        for _,row in df.iterrows():
            feed_id = row["feed_id"]
            feeds_lookup[feed_id] = {
                "feed_id"        : feed_id,
                "title"          : row["title"],
                "feed_text"      : row["feed_text"],
                "tags"           : row["tags"],                     
                "language"       : row["language"],
                "created_at"     : row["created_at"],
                "source"         : row["source"],
                "url"            : row["url"],
                "views"          : int(row["views"]),
                "embedding_input": row["embedding_input"]
            }
        return feeds_lookup
# dq = DataQuery()
# dq.get_students()   

### Helper function

In [75]:
def ensure_dir(path: str) -> None:
    """Create directory if it does not exist (idempotent)."""
    os.makedirs(path, exist_ok=True)
    
def _read_hyde_config(cfg: Dict[str, Any]) -> Tuple[int, int, int, bool, str]:
    """
    Read HyDE-related configuration with safe defaults.

    Returns
    -------
    history_threshold:
        Event count threshold for prompt selection
    recent_k:
        Max number of recent feeds used in HistorySummary
    feed_text_max_chars:
        Per-feed text truncation limit
    include_recent_feeds:
        Whether HistorySummary may include feed snippets
    query_embedding_model_name:
        Embedding model for HyDE queries
    """
    hyde_cfg = cfg.get("hyde", {}) if isinstance(cfg, dict) else {}

    history_threshold = int(hyde_cfg.get("history_threshold", 5))
    recent_k = int(hyde_cfg.get("recent_k", 5))
    feed_text_max_chars = int(hyde_cfg.get("feed_text_max_chars", 240))
    include_recent_feeds = bool(hyde_cfg.get("include_recent_feeds", True))

    # Default to same embedding family as feed embeddings
    query_embedding_model_name = str(
        hyde_cfg.get("query_embedding_model_name")
        or cfg.get("embeddings", {}).get("model_name", "")
        or "gemini-embedding-001"
    )

    # Hard safety guards
    history_threshold = max(1, history_threshold)
    recent_k = max(0, min(recent_k, 10))
    feed_text_max_chars = max(0, min(feed_text_max_chars, 2000))

    return (
        history_threshold,
        recent_k,
        feed_text_max_chars,
        include_recent_feeds,
        query_embedding_model_name,
    )
    
def read_jsonl(path: str) -> List[Dict[str, Any]]:
    """
    Deterministic JSONL reader.

    Order is preserved, which is critical for any downstream alignment.
    """
    rows: List[Dict[str, Any]] = []
    with open(path, "r", encoding="utf-8") as f:
        for line_no, line in enumerate(f, start=1):
            line = line.strip()
            if not line:
                continue
            try:
                rows.append(json.loads(line))
            except Exception as e:
                raise ValueError(f"Invalid JSONL at line {line_no}: {e}") from e
    return rows

def load_prompts() -> Dict[str, str]:
    """
    Load HyDE prompt templates from parameters/prompts.yaml.

    Expected structure:
      hyde_prompts:
        hyde_a: "..."
        hyde_b: "..."
        hyde_c: "..."
    """
    import yaml

    prompts_path = PROJECT_ROOT / "parameters" / "prompts.yaml"
    with prompts_path.open("r", encoding="utf-8") as f:
        data = yaml.safe_load(f) or {}

    return data.get("hyde_prompts", {}) or {}

# =============================================================================
# Prompt selection and rendering
# =============================================================================
def choose_hyde_prompt_key(num_events: int, history_threshold: int = 5) -> str:
    """
    Select HyDE prompt variant based on interaction volume.

    Rules
    -----
    - num_events >= history_threshold → history-heavy (hyde_b)
    - num_events <= 1               → onboarding / sparse (hyde_c)
    - otherwise                     → mixed (hyde_a)
    """
    if num_events >= history_threshold:
        return "hyde_b"
    if num_events <= 1:
        return "hyde_c"
    return "hyde_a"


def render_prompt(
    template: str,
    preferred_language: str,
    user_context_text: str,
    history_summary_text: Optional[str],
) -> str:
    """
    Render a prompt template using strict placeholder substitution.

    Supported placeholders:
    - {{preferred_language}}
    - {{UserContextText}}
    - {{HistorySummaryText}}

    No templating engine is used on purpose to keep behavior explicit.
    """
    s = template.replace("{{preferred_language}}", preferred_language or "th")
    s = s.replace("{{UserContextText}}", user_context_text or "")
    s = s.replace("{{HistorySummaryText}}", history_summary_text or "")
    return s

# =============================================================================
# HyDE output handling
# =============================================================================
def _extract_hyde_query_texts(hyde_json: Dict[str, Any]) -> List[str]:
    """
    Extract query_text values from HyDE JSON output.

    Expected structure:
      {
        "hyde_queries": [
          {"query_id": "...", "query_text": "...", ...},
          ...
        ]
      }

    Order is preserved and MUST match embedding row order.
    """
    if not isinstance(hyde_json, dict):
        raise ValueError("hyde_output must be a dict")

    items = hyde_json.get("hyde_queries") or []
    if not isinstance(items, list):
        raise ValueError("hyde_output.hyde_queries must be a list")

    out: List[str] = []
    for i, it in enumerate(items):
        if not isinstance(it, dict):
            raise ValueError(f"hyde_output.hyde_queries[{i}] must be an object")
        out.append(str(it.get("query_text") or "").strip())

    return out


def _l2_normalize_rows(x: np.ndarray) -> np.ndarray:
    """
    Row-wise L2 normalization.

    Zero rows are left as zero to avoid NaNs.
    """
    if x.ndim != 2:
        raise ValueError("Expected 2D array for row normalization")

    norms = np.linalg.norm(x, axis=1, keepdims=True)
    norms[norms == 0.0] = 1.0
    return (x / norms).astype(np.float32)


def _atomic_save_npy(path: str, arr: np.ndarray) -> None:
    """
    Best-effort atomic .npy write.

    Writes to a temp file and renames to avoid partial reads.
    """
    tmp = path + ".tmp.npy"
    np.save(tmp, arr)
    os.replace(tmp, path)

<hr>

# Main

### Load resource

In [77]:
cfg = load_config()
out_dir = cfg["artifacts"]["user_query_bundles_dir"]

bq = DataQuery()

In [66]:
# students_path = cfg["data"]["students_path"]           # 'data/students.csv'
# students      = pd.read_csv(students_path)
# students
students = bq.get_students() 



In [67]:
# interactions_path = cfg["data"]["interactions_path"]   # 'data/interactions.csv'
# interactions = pd.read_csv(interactions_path)
# interactions
interactions = bq.get_interactions() 

In [68]:
# feeds_path = cfg["data"]["feeds_path"]                 # 'data/feeds.jsonl'
# feeds_lookup: Dict[str, Dict[str, Any]] = {}
# if os.path.exists(feeds_path):
#     feeds = read_jsonl(feeds_path)
#     feeds_lookup = {
#         str(f.get("feed_id")): f
#         for f in feeds
#         if isinstance(f, dict) and f.get("feed_id") is not None
#     }
feeds_lookup = bq.get_user_events_json()

### Read HyDE-related configuration once

In [70]:
(history_threshold,recent_k,feed_text_max_chars,include_recent_feeds,query_embedding_model_name) = _read_hyde_config(cfg)
expected_dim = int(cfg.get("embeddings", {}).get("dim", 0) or 0)

In [71]:
prompts = load_prompts()
if not prompts:
    raise ValueError("hyde_prompts missing from parameters/prompts.yaml")
client = build_llm_client_from_yaml(
    parameters_path=str(PROJECT_ROOT / "parameters" / "parameters.yaml"),
    credentials_path=str(PROJECT_ROOT / "parameters" / "credentials.yaml"),
)
query_embedder = GoogleEmbeddingModel(
    model_name=query_embedding_model_name,
    credentials_path=str(PROJECT_ROOT / "parameters" / "credentials.yaml"),
)
now_iso = datetime.now(timezone.utc).replace(microsecond=0).isoformat()

In [74]:
query_embedder

GoogleEmbeddingModel(model_name='gemini-embedding-001', credentials_path='/code/src/parameters/credentials.yaml', output_dim=768, uniqueness_guard_enabled=True, uniqueness_guard_min_unique_ratio=0.85, uniqueness_guard_round_decimals=8, _client=None)

In [84]:
verbose = 0

In [85]:
# ------------------------------------------------------------------
# Generate one cached bundle per student
# ------------------------------------------------------------------
logger = get_logger("pipeline_1_user_hyde")
for _, row in students.iterrows():
    student_row = row.to_dict()     # convert pd -> dict for each row
    student_id  = str(student_row.get("student_id","")).strip()
    if not student_id or student_id.lower() == "nan":
        raise ValueError(f"Invalid student_id in students.csv: {student_row!r}")
    
    user_ctx = build_user_context(student_row)
    pref_lang = user_ctx.user_context_json.get("preferred_language","th")

    user_events = interactions[interactions["user_id"] == student_id]   # <- user event from interaction.csv
    num_events  = int(len(user_events))

    history_summary_text : Optional[str] = None
    #** Crate by combe data for each person student **#
    if num_events > 0:
        history_summary_text = build_history_summary(
            user_events,
            preferred_language   = pref_lang,
            include_recent_feeds = include_recent_feeds,
            recent_k             = recent_k,
            feeds_lookup         = feeds_lookup or None,
            feed_text_max_chars  = feed_text_max_chars,
        )
    
    prompt_key = choose_hyde_prompt_key(num_events,history_threshold)
    template = prompts.get(prompt_key)
    if not template:
        raise ValueError(f"Missing prompt '{prompt_key}' in pormpts.yaml")
    prompt = render_prompt(
            template=template,
            preferred_language=pref_lang,
            user_context_text=user_ctx.user_context_text,
            history_summary_text=history_summary_text,
        )
    # ------------------------------------------------------------------
    # LLM call (JSON-only)
    # ------------------------------------------------------------------
    hyde_json = client.generate_json(prompt)
    # ------------------------------------------------------------------
    # Embed HyDE queries for fast serving
    # ------------------------------------------------------------------
    hyde_query_texts = _extract_hyde_query_texts(hyde_json)

    if hyde_query_texts:
        emb = query_embedder.embed_documents(hyde_query_texts)
        emb = np.asarray(emb, dtype = np.float32)
        if emb.ndim != 2:
            raise ValueError(f"Invalid embedding shape {emb.shape}")
        emb = _l2_normalize_rows(emb)
        if expected_dim and emb.shape[1] != expected_dim:
            raise ValueError(
                f"Embedding dim mismatch for student = {student_id}:"
                f"got {emb.shape[1]} expected {expected_dim}"
            )
        dim = int(emb.shape[1])
    else:
        dim = expected_dim or 0
        emb = np.zeros((0,dim), dtype=np.float32)

    emb_filename = f"{student_id}_hyde_q_emb.npy"
    emb_path     = os.path.join(out_dir, emb_filename)
    _atomic_save_npy(emb_path, emb)
    # ---------------------------------------------------
    # Persist cached bundle for online serving
    # ---------------------------------------------------
    bundle: Dict[str, Any] = {
        "bundle_version"        : "v2_hyde_embedded_queries",
        "student_id"            : student_id,
        "generated_at"          : now_iso,
        "prompt_key"            : prompt_key,
        "preferred_language"    : pref_lang,
        "num_events"            : num_events,
        "user_context_json"     : user_ctx.user_context_json,
        "user_context_text"     : user_ctx.user_context_text,
        "history_summary_text"  : history_summary_text,
        "hyde_output"           : hyde_json,
        "hyde_query_embeddings" : {
            "path"        : emb_filename,
            "model"       : query_embedding_model_name,
            "dim"         : dim,
            "dtype"       : "float32",
            "num_queries" : int(len(hyde_query_texts)),
            "normalized"  : True,
        },
    }

    out_path = os.path.join(out_dir, f"{student_id}.json")
    with open(out_path,"w",encoding="utf-8") as f:
        json.dump(bundle,f,ensure_ascii=False,indent=2)
    logger.info(
        "wrote HyDE bundle student_id=%s events=%d prompt=%s",
        student_id,
        num_events,
        prompt_key,
    )
    if verbose > 0:
        print(_)
        print(f"student_row -> \n {student_row}")
        print(f"user_ctx -> \n {user_ctx}")
        print(f"user_events -> \n {user_events}")
        print(f"promt_key -> {prompt_key}")
        print(f"hyde_query_texts->{hyde_query_texts}")
        print(f"emb -> \n{emb}")
        print(f"emb_path ->\n{emb_path}")
        print(f"bundle->\n{bundle}")
        print("#"*100)
    break

2026-01-29T06:36:53Z | INFO | functions.utils.llm_client | LLM call done | attempt=1 | latency=8.326s | in_tokens=816 | out_tokens=351 | model=gemini-2.5-flash | status=ok
2026-01-29T06:36:55Z | INFO | pipeline_1_user_hyde | wrote HyDE bundle student_id=stu_p007 events=4 prompt=hyde_a


### Got embedding

In [86]:
emb

array([[-0.01371195, -0.01495129,  0.0114938 , ...,  0.05251655,
         0.00130554,  0.00109984],
       [-0.01202164, -0.01056953,  0.00548899, ...,  0.02600264,
         0.00876749,  0.02130553],
       [-0.02254788, -0.03349402,  0.0111196 , ...,  0.05043196,
         0.01059734,  0.02642387],
       [ 0.01462885, -0.01676613,  0.04861221, ...,  0.04482346,
         0.01663549,  0.01487698],
       [ 0.00693145, -0.02495532, -0.03097113, ...,  0.02536403,
         0.01605297,  0.01515156]], dtype=float32)