In [None]:
import os
import json
import glob
from typing import List, Dict, Any, Tuple, Optional

In [None]:
MARKER = "The onboarding is complete. A panel of experts has joined."

In [None]:
def load_json(path: str) -> Any:
    with open(path, "r", encoding="utf-8") as f:
        return json.load(f)

def save_jsonl(records: List[Dict[str, Any]], out_path: str) -> None:
    os.makedirs(os.path.dirname(out_path), exist_ok=True) if os.path.dirname(out_path) else None
    with open(out_path, "w", encoding="utf-8") as wf:
        for r in records:
            wf.write(json.dumps(r, ensure_ascii=False) + "\n")

In [None]:
# -----------------------------
# Conversation processing
# -----------------------------
def split_onboarding(history: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
    """
    Split onboarding vs post-onboarding by the system marker message.
    """
    idx = None
    for i, m in enumerate(history):
        if m.get("role") == "system" and MARKER in m.get("content", ""):
            idx = i
            break
    if idx is None:
        raise ValueError("Onboarding marker not found in history.")
    onboarding = history[:idx]
    post = history[idx + 1:]
    return onboarding, post

def slice_post_history_for_step(post: List[Dict[str, Any]], step_t: int) -> List[Dict[str, Any]]:
    """
    Your post-history is stored as pairs per step:
      advisor(step1), user(step1), advisor(step2), user(step2), ...
    For sample at step t, include steps 1..t-1 -> first 2*(t-1) messages.
    """
    n = 2 * (step_t - 1)
    return post[:n]

In [None]:
# -----------------------------
# Market snapshot formatting
# -----------------------------
def _fmt_val(x: Any) -> str:
    if x is None:
        return "NA"
    # keep floats readable
    if isinstance(x, float):
        return f"{x:.6g}"
    return str(x)

def format_market_snapshot(snapshot: Dict[str, Any], field_order: Optional[List[str]] = None) -> str:
    """
    Convert market_snapshot dict -> compact readable text.

    snapshot example: {
      "AAPL": {"mu":..., "var":..., "mdd":..., "7d_ret":..., "vol":..., "current_price":..., "price_trend":...},
      ...
    }
    """
    if field_order is None:
        # choose a stable, interpretable order
        field_order = ["7d_ret", "vol", "mdd", "mu", "var", "current_price", "price_trend"]

    lines = []
    for ticker in snapshot.keys():
        info = snapshot[ticker] or {}
        parts = []
        for k in field_order:
            if k in info:
                parts.append(f"{k}={_fmt_val(info.get(k))}")
        # if none of the ordered keys exist, dump keys compactly
        if not parts:
            parts = [f"{k}={_fmt_val(v)}" for k, v in info.items()]
        lines.append(f"{ticker}: " + ", ".join(parts))
    return "\n".join(lines)

In [None]:
# -----------------------------
# Prompt construction
# -----------------------------
def build_step_instruction(select_item: Dict[str, Any]) -> Dict[str, str]:
    """
    The ONLY thing we add at current step:
      - task instruction
      - candidate tickers list
      - market snapshot (7-day window)
      - strict JSON output spec
    """
    step = select_item["step"]
    date = select_item.get("date", "")
    snapshot = select_item["market_snapshot"]
    tickers = list(snapshot.keys())

    content = f"""
You are a personalized investment advisor.

Task:
Based on the FULL conversation above, which includes interactions between
the user and multiple advisors offering different investment perspectives,
infer the user's latent risk preference from their stated goals, emotional
reactions, and responses to these perspectives.

Then, using the current 7-day market snapshot, produce a personalized ranked
list of ALL candidate stocks from best to worst for this user.

Important:
- The conversation may include advisory viewpoints reflecting different
  principles (e.g., return-seeking, risk-averse, or balanced strategies).
- These viewpoints may be complementary or conflicting.
- Your goal is NOT to follow any advisor directly, but to infer which types
  of advice best align with THIS user's preferences as revealed through
  the conversation.
- The user's risk preference is NOT given explicitly and must be inferred
  from the conversation history.

Candidates (must include all, exactly once):
{tickers}

Current market snapshot (7-day window):
{format_market_snapshot(snapshot)}

Output format (STRICT):
Return a VALID JSON object with exactly ONE key:
- "final_rank": a list of tickers ordered from best to worst.

Format example (for structure only, NOT actual output):
{{
  "final_rank": ["TICKER_1", "TICKER_2",..., "TICKER_10"]
}}

Constraints:
- "final_rank" MUST be a permutation of the candidate list above.
- The list length MUST be exactly {len(tickers)}.
- No duplicate tickers.
- No missing tickers.
- Do NOT include any other keys.
- Do NOT include explanations or text outside the JSON.
- Do NOT use markdown or code fences.
"""

    return {"role": "user", "content": content}

def messages_to_prompt_text(messages: List[Dict[str, str]]) -> str:
    """
    Optional: also store a flat prompt string for non-chat inference pipelines.
    Keeping both is convenient for HF users.
    """
    chunks = []
    for m in messages:
        role = (m.get("role") or "").upper()
        content = (m.get("content") or "").strip()
        chunks.append(f"[{role}]\n{content}")
    return "\n\n".join(chunks)


In [None]:
# -----------------------------
# Labels
# -----------------------------
def build_labels(select_item: Dict[str, Any]) -> Dict[str, Any]:
    ap = select_item["advisor_panel"]
    return {
        "utility_rank": ap["utility_advisor"]["rank"],
        "momentum_rank": ap["momentum_advisor"]["rank"],
        "safety_rank": ap["safety_advisor"]["rank"],
        "user_choice": select_item["user_turn"]["choice"],
    }

In [None]:
# -----------------------------
# Main dataset builder
# -----------------------------
def build_eval_records_for_user(
    user_id: str,
    history_path: str,
    select_path: str,
    *,
    store_prompt_text: bool = True,
    keep_marker_msg: bool = True,
) -> List[Dict[str, Any]]:
    history = load_json(history_path)
    select_data = load_json(select_path)

    onboarding, post = split_onboarding(history)

    marker_msg = {"role": "system", "content": MARKER}

    records = []
    for item in select_data:  # steps 1..23
        t = int(item["step"])

        past_msgs = slice_post_history_for_step(post, t)

        messages: List[Dict[str, str]] = []
        messages.extend(onboarding)
        if keep_marker_msg:
            messages.append(marker_msg)
        messages.extend(past_msgs)

        # append current step instruction + snapshot
        messages.append(build_step_instruction(item))

        rec: Dict[str, Any] = {
            "id": f"{user_id}_step_{t}",
            "user_id": user_id,
            "step": t,
            "date": item.get("date", ""),
            "messages": messages,  # <-- main prompt input (conversation)
            "labels": build_labels(item),
            "meta": {
                "candidate_tickers": list(item["market_snapshot"].keys()),
            },
        }

        # optional convenience field for non-chat inference
        if store_prompt_text:
            rec["prompt"] = messages_to_prompt_text(messages)

        records.append(rec)

    return records

In [None]:
def auto_discover_users(data_dir: str) -> List[str]:
    """
    Discover users by finding *_conversation_history.json and matching *_conversation_select.json.
    Returns user_ids like 'User_0', 'User_1', ...
    """
    history_files = glob.glob(os.path.join(data_dir, "*_conversation_history.json"))
    user_ids = []
    for hp in sorted(history_files):
        base = os.path.basename(hp)
        user_id = base.replace("_conversation_history.json", "")
        sp = os.path.join(data_dir, f"{user_id}_conversation_select.json")
        if os.path.exists(sp):
            user_ids.append(user_id)
    return user_ids


In [None]:
def build_eval_jsonl(
    data_dir: str,
    out_jsonl_path: str,
    *,
    user_ids: Optional[List[str]] = None,
    store_prompt_text: bool = True,
    keep_marker_msg: bool = True,
) -> None:
    if user_ids is None:
        user_ids = auto_discover_users(data_dir)
    if not user_ids:
        raise ValueError("No users discovered. Expected files like User_0_conversation_history.json in data_dir.")

    all_records: List[Dict[str, Any]] = []
    for user_id in user_ids:
        history_path = os.path.join(data_dir, f"{user_id}_conversation_history.json")
        select_path = os.path.join(data_dir, f"{user_id}_conversation_select.json")
        if not os.path.exists(history_path):
            raise FileNotFoundError(history_path)
        if not os.path.exists(select_path):
            raise FileNotFoundError(select_path)

        recs = build_eval_records_for_user(
            user_id=user_id,
            history_path=history_path,
            select_path=select_path,
            store_prompt_text=store_prompt_text,
            keep_marker_msg=keep_marker_msg,
        )
        all_records.extend(recs)

    save_jsonl(all_records, out_jsonl_path)
    print(f"[OK] Wrote {len(all_records)} records to {out_jsonl_path}")
    print(f"[OK] Users: {len(user_ids)} | Steps per user: {len(all_records)//len(user_ids) if user_ids else 'NA'}")

In [None]:
# === Notebook entry ===

data_dir = "user_conversation/"   # Change to your actual path
out_jsonl_path = "evaluation/conv_finre_eval_10users.jsonl"

# If you want to automatically discover User_0 ... User_9
user_ids = None


build_eval_jsonl(
    data_dir=data_dir,
    out_jsonl_path=out_jsonl_path,
    user_ids=user_ids,
    store_prompt_text=True,   
    keep_marker_msg=True
)

## build prompt without history

In [None]:
# -----------------------------
# Prompt construction (NO-HISTORY)
# -----------------------------
def build_step_instruction_no_history(select_item: Dict[str, Any]) -> Dict[str, str]:
    """
    Current step message that ONLY uses:
      - onboarding conversation (already above)
      - current market snapshot
    No post-onboarding history is included.
    """
    step = int(select_item["step"])
    date = select_item.get("date", "")
    snapshot = select_item["market_snapshot"]
    tickers = list(snapshot.keys())

    content = f"""
You are a personalized investment advisor.

Task:
Based ONLY on the onboarding conversation above (user profile, goals, constraints, and risk attitude),
and the current 7-day market snapshot, produce a personalized ranked list of ALL candidate stocks
from best to worst for this user.

Important:
- Do NOT assume you have access to any post-onboarding interaction history.
- Use only the onboarding information to infer the user's preferences.

Candidates (must include all, exactly once):
{tickers}

Current market snapshot (7-day window) at step {step}{f" on {date}" if date else ""}:
{format_market_snapshot(snapshot)}

Output format (STRICT):
Return a VALID JSON object with exactly ONE key:
- "final_rank": a list of tickers ordered from best to worst.

Format example (for structure only, NOT actual output):
{{
  "final_rank": ["TICKER_1", "TICKER_2",..., "TICKER_10"]
}}

Constraints:
- "final_rank" MUST be a permutation of the candidate list above.
- The list length MUST be exactly {len(tickers)}.
- No duplicate tickers.
- No missing tickers.
- Do NOT include any other keys.
- Do NOT include explanations or text outside the JSON.
- Do NOT use markdown or code fences.
"""
    return {"role": "user", "content": content}

In [None]:
# -----------------------------
# Main dataset builder (NO-HISTORY)
# -----------------------------
def build_eval_records_for_user_no_history(
    user_id: str,
    history_path: str,
    select_path: str,
    *,
    store_prompt_text: bool = True,
    keep_marker_msg: bool = True,
) -> List[Dict[str, Any]]:
    """
    Build records that contain ONLY:
      - onboarding conversation
      - (optional) marker system msg
      - current step instruction with market snapshot
    """
    history = load_json(history_path)
    select_data = load_json(select_path)

    onboarding, _post = split_onboarding(history)
    marker_msg = {"role": "system", "content": MARKER}

    records = []
    for item in select_data:  # steps 1..T
        t = int(item["step"])

        messages: List[Dict[str, str]] = []
        messages.extend(onboarding)
        if keep_marker_msg:
            messages.append(marker_msg)

        # only current step instruction + snapshot
        messages.append(build_step_instruction_no_history(item))

        rec: Dict[str, Any] = {
            "id": f"{user_id}_step_{t}",
            "user_id": user_id,
            "step": t,
            "date": item.get("date", ""),
            "messages": messages,
            "labels": build_labels(item),
            "meta": {
                "candidate_tickers": list(item["market_snapshot"].keys()),
                "setting": "no_history_onboarding_plus_snapshot",
            },
        }

        if store_prompt_text:
            rec["prompt"] = messages_to_prompt_text(messages)

        records.append(rec)

    return records

In [None]:
def build_eval_jsonl_no_history(
    data_dir: str,
    out_jsonl_path: str,
    *,
    user_ids: Optional[List[str]] = None,
    store_prompt_text: bool = True,
    keep_marker_msg: bool = True,
) -> None:
    if user_ids is None:
        user_ids = auto_discover_users(data_dir)
    if not user_ids:
        raise ValueError("No users discovered. Expected files like User_0_conversation_history.json in data_dir.")

    all_records: List[Dict[str, Any]] = []
    for user_id in user_ids:
        history_path = os.path.join(data_dir, f"{user_id}_conversation_history.json")
        select_path = os.path.join(data_dir, f"{user_id}_conversation_select.json")
        if not os.path.exists(history_path):
            raise FileNotFoundError(history_path)
        if not os.path.exists(select_path):
            raise FileNotFoundError(select_path)

        recs = build_eval_records_for_user_no_history(
            user_id=user_id,
            history_path=history_path,
            select_path=select_path,
            store_prompt_text=store_prompt_text,
            keep_marker_msg=keep_marker_msg,
        )
        all_records.extend(recs)

    save_jsonl(all_records, out_jsonl_path)
    print(f"[OK] Wrote {len(all_records)} records to {out_jsonl_path}")
    print(f"[OK] Users: {len(user_ids)} | Steps per user: {len(all_records)//len(user_ids) if user_ids else 'NA'}")


In [None]:
data_dir = "user_conversation/"
out_jsonl_path = "evaluation/conv_finre_eval_no_history_10users.jsonl"


user_ids = None


build_eval_jsonl_no_history(
    data_dir=data_dir,
    out_jsonl_path=out_jsonl_path,
    user_ids=user_ids,
    store_prompt_text=True,
    keep_marker_msg=True,
)