In [None]:
!pip install faiss-cpu pyngrok

In [None]:
import transformers
import torch
import sentence_transformers
import faiss
import flask
import pyngrok

In [None]:
RAW_URL = "https://github.com/AntoineAbouJanab/Livedrops---AntoineAouJanab/tree/main/docs/prompting/knowledge-base.md"

In [None]:
RAW_URL = "https://raw.githubusercontent.com/AntoineAbouJanab/Livedrops---AntoineAouJanab/main/docs/prompting/knowledge-base.md"

import requests

headers = {}

resp = requests.get(RAW_URL, headers=headers)
resp.raise_for_status()
markdown_text = resp.text

print("✅ Fetched markdown, length:", len(markdown_text))
print(markdown_text[:500], "...")

In [None]:
import re

import re

def parse_docs(md_text: str):
    # capture "## Document N: Title"
    pattern = re.compile(r'(?m)^##\s*Document\s+(\d+)\s*:\s*(.+?)\s*$')
    matches = list(pattern.finditer(md_text))
    docs = []
    for i, m in enumerate(matches):
        num = m.group(1).strip()
        title = m.group(2).strip()   # << this is the actual title part
        start = m.end()
        end = matches[i+1].start() if i+1 < len(matches) else len(md_text)
        chunk = md_text[start:end].strip()
        # remove any separator lines (---)
        chunk = re.sub(r'(?m)^\s*---\s*$', '', chunk).strip()
        docs.append({
            "title": title,          # now the true doc title
            "content": chunk,
            "id": f"doc{num}"
        })
    return docs

KNOWLEDGE_BASE = parse_docs(markdown_text)

print(f"✅ Parsed {len(KNOWLEDGE_BASE)} documents")
print(KNOWLEDGE_BASE)


In [None]:
from sentence_transformers import SentenceTransformer
import numpy as np

# A lightweight, solid baseline. Swap if you prefer another.
EMBED_MODEL_NAME = "BAAI/bge-base-en-v1.5"
embedder = SentenceTransformer(EMBED_MODEL_NAME)

print("Embedder ready →", EMBED_MODEL_NAME)


In [None]:
import faiss
import numpy as np

texts = [f"{d['title']}\n\n{d['content']}" for d in KNOWLEDGE_BASE]
ids = [d["id"] for d in KNOWLEDGE_BASE]
titles = [d["title"] for d in KNOWLEDGE_BASE]

embs = embedder.encode(texts, convert_to_numpy=True, show_progress_bar=True).astype("float32")
embs /= (np.linalg.norm(embs, axis=1, keepdims=True) + 1e-12)

d = embs.shape[1]
faiss_index = faiss.IndexFlatIP(d)
faiss_index.add(embs)


In [None]:
import numpy as np

def mmr(query_vec, cand_vecs, lambda_mult=0.7, top_k=5):
    # All inputs are L2-normalized float32
    selected = []
    candidate_idxs = list(range(cand_vecs.shape[0]))
    # cosine similarities to query (since normalized, inner product = cosine)
    q_sims = cand_vecs @ query_vec

    while candidate_idxs and len(selected) < top_k:
        if not selected:
            # pick the best to query first
            best = max(candidate_idxs, key=lambda i: q_sims[i])
            selected.append(best)
            candidate_idxs.remove(best)
        else:
            # max similarity to any already selected
            sims_to_selected = np.max(cand_vecs[candidate_idxs] @ cand_vecs[selected].T, axis=1)
            # MMR score balances relevance vs redundancy
            scores = lambda_mult * q_sims[candidate_idxs] - (1 - lambda_mult) * sims_to_selected
            best_idx_local = int(np.argmax(scores))
            best = candidate_idxs[best_idx_local]
            selected.append(best)
            candidate_idxs.remove(best)
    return selected


In [None]:
from sentence_transformers import CrossEncoder

# Load once (choose one)
# light & fast:
RERANKER_NAME = "cross-encoder/ms-marco-MiniLM-L-6-v2"
# stronger alternatives:
# RERANKER_NAME = "BAAI/bge-reranker-base"
cross_encoder = CrossEncoder(RERANKER_NAME)

def rerank_with_cross_encoder(query, cand_texts):
    pairs = [(query, t) for t in cand_texts]
    scores = cross_encoder.predict(pairs)  # higher = more relevant
    order = np.argsort(-scores)  # descending
    return order, scores


In [None]:
def decide_k_from_scores(scores, max_k=3, margin=4, ratio=0.85):
    """
    Simple heuristic:
      - If best is clearly above second by margin OR ratio => return 1
      - Else return min(2, max_k)
    Tune margin/ratio on a tiny validation set if you can.
    """
    if len(scores) == 0 or scores[0]<-5 :
        return 0
    if len(scores) == 1:
        return 1
    s1, s2 = float(scores[0]), float(scores[1])
    if (s1 - s2) >= margin:
        return 1
    return min(2, max_k)


In [None]:
def search_enhanced(query, ann_k=50, mmr_top=10, lambda_mult=0.7, use_reranker=True, max_return_k=3):
    # 1) ANN retrieve
    q = embedder.encode([query], convert_to_numpy=True)[0].astype("float32")
    q /= (np.linalg.norm(q) + 1e-12)
    scores, idxs = faiss_index.search(np.array([q], dtype="float32"), min(ann_k, len(ids)))
    idxs = idxs[0]
    ann_scores = scores[0]

    cand_vecs = embs[idxs]  # normalized
    # 2) MMR diversify among the ANN candidates
    sel = mmr(q, cand_vecs, lambda_mult=lambda_mult, top_k=min(mmr_top, len(idxs)))
    mmr_idxs = idxs[sel]
    mmr_texts = [texts[i] for i in mmr_idxs]
    mmr_ids = [ids[i] for i in mmr_idxs]
    mmr_titles = [titles[i] for i in mmr_idxs]
    print("MMR selected", mmr_titles)

    # 3) Optional cross-encoder rerank
    if use_reranker:
        order, ce_scores = rerank_with_cross_encoder(query, mmr_texts)
        final_idxs = [mmr_idxs[i] for i in order]
        final_scores = [float(ce_scores[i]) for i in order]
        print(f"final idxs {final_idxs}")
        print(f"final scores {final_scores}")
    else:
        # Fall back to ANN similarity after MMR
        # (cosine to query since all normalized)
        final_scores = (embs[mmr_idxs] @ q).tolist()
        order = np.argsort(-np.array(final_scores))
        final_idxs = [mmr_idxs[i] for i in order]
        final_scores = [final_scores[i] for i in order]

    # 4) Decide how many docs to return (adaptive k)
    k = decide_k_from_scores(final_scores, max_k=max_return_k, margin=4, ratio=0.85)

    results = []
    for i in range(min(k, len(final_idxs))):
        idx = final_idxs[i]
        results.append({
            "id": ids[idx],
            "title": titles[idx],
            "score": float(final_scores[i]),
            "preview": texts[idx][:200].replace("\n", " ") + "..."
        })
    return results


In [None]:
for result in (search_enhanced("How do I synchronize inventory via API with my warehouse system?")):
  print(result["title"])
print(len(search_enhanced("How do I synchronize inventory via API with my warehouse system?")))

In [None]:
# Cell 1 — fetch_prompts_from_github.py
import os, re, requests
from pathlib import Path

def to_raw_github(url: str) -> str:

    if "raw.githubusercontent.com" in url:
        return url
    m = re.match(r"https?://github\.com/([^/]+)/([^/]+)/blob/([^/]+)/(.+)", url)
    if not m:
        raise ValueError("Provide a valid GitHub file URL or raw URL.")
    user, repo, branch, path = m.groups()
    return f"https://raw.githubusercontent.com/{user}/{repo}/{branch}/{path}"

def download_prompts(src_url: str, save_path: str = "/content/assistant-prompts.yml") -> str:
    raw_url = to_raw_github(src_url)
    headers = {}
    token = os.environ.get("GITHUB_TOKEN")  # optional: for private repos
    if token:
        headers["Authorization"] = f"token {token}"
    r = requests.get(raw_url, headers=headers, timeout=30)
    r.raise_for_status()
    Path(save_path).write_text(r.text, encoding="utf-8")
    return save_path

# USAGE EXAMPLE:
# PROMPTS_URL = "https://github.com/<user>/<repo>/blob/main/assistant-prompts.yml"
# local_yaml_path = download_prompts(PROMPTS_URL)


In [None]:
# Cell 2 — load_prompts.py
import yaml
from pathlib import Path

REQUIRED_KEYS = ["base_retrieval_prompt", "multi_document_synthesis", "refusal_no_context"]

def load_prompts(yaml_path: str):
    p = Path(yaml_path)
    if not p.exists():
        raise FileNotFoundError(f"Prompts file not found: {yaml_path}")
    data = yaml.safe_load(p.read_text(encoding="utf-8")) or {}
    missing = [k for k in REQUIRED_KEYS if k not in data]
    if missing:
        raise KeyError(f"Missing keys in prompts YAML: {missing}")
    return data


In [None]:
# Cell 3 — routing_and_prompt_builder.py
from typing import List, Dict, Any

def choose_prompt(retrieved_docs: List[Dict[str, Any]], prompts: Dict[str, str]) -> str:
    n = len(retrieved_docs)
    if n == 0:
        return prompts["refusal_no_context"]
    elif n == 1:
        return prompts["base_retrieval_prompt"]
    else:
        return prompts["multi_document_synthesis"]

def build_final_prompt(user_question: str, retrieved_docs: List[Dict[str, Any]], prompts: Dict[str, str]) -> str:
    selected = choose_prompt(retrieved_docs, prompts)

    # ---- inline sources builder (no external function) ----
    def _sources_block(docs: List[Dict[str, Any]], limit: int = 5) -> str:
        if not docs:
            return ""
        # best-first (by score desc), dedup titles
        items = sorted(docs, key=lambda d: d.get("score", 0.0), reverse=True)
        seen, names = set(), []
        for d in items:
            name = d.get("title") or d.get("id") or "Untitled"
            if name in seen:
                continue
            seen.add(name)
            names.append(name)
            if len(names) >= limit:
                break
        if not names:
            return ""
        return "### Sources\n" + "\n".join(f"- {n}" for n in names)

    # ---- No-context refusal ----
    if len(retrieved_docs) == 0:
        return f"{selected}\n\nUser question:\n{user_question}\n\n{_sources_block(retrieved_docs)}"

    # ---- Single-doc ----
    if len(retrieved_docs) == 1:
        d = retrieved_docs[0]
        # prefer 'text', else 'content', else 'preview'
        body = d.get('text')
        if not body:
            body = d.get('content')
        if not body:
            body = d.get('preview', '')
        context_block = (
            f"### Source\n"
            f"Title: {d.get('title','')}\n"
            f"ID: {d.get('id','')}\n\n"
            f"{body}"
        )
        return (
            f"{selected}\n\nUser question:\n{user_question}\n\n"
            f"{context_block}\n\n{_sources_block(retrieved_docs)}"
        )

    # ---- Multi-doc ----
    parts = []
    for j, d in enumerate(retrieved_docs, start=1):
        body = d.get('text')
        if not body:
            body = d.get('content')
        if not body:
            body = d.get('preview', '')
        parts.append(
            "---\n"
            f"[Doc {j}] Title: {d.get('title','')}\n"
            f"ID: {d.get('id','')}\n"
            f"Score: {float(d.get('score',0.0)):.4f}\n\n"
            f"{body}"
        )
    context_block = "\n".join(parts)

    return (
        f"{selected}\n\nUser question:\n{user_question}\n\n"
        f"### Retrieved Context ({len(retrieved_docs)} docs)\n"
        f"{context_block}\n\n{_sources_block(retrieved_docs)}"
    )


In [None]:
# Cell 4 — example_usage.py

# 1) Point to your GitHub file:
PROMPTS_URL = "https://github.com/AntoineAbouJanab/Livedrops---AntoineAouJanab/blob/main/docs/prompting/assistant-prompts.yml"  # <- change this
local_yaml_path = download_prompts(PROMPTS_URL)  # fetch to /content

# 2) Load prompts:
prompts = load_prompts(local_yaml_path)

# 3) Simulate retrieval results:
#    Replace this with your actual retrieval output (FAISS, etc.)
retrieved_docs = [
    # Example single doc:
    # {"id":"doc2","title":"Shoplite Product Search and Filtering Features","text":"...", "score":0.12}
]

# 4) Build final prompt for the LLM:
user_question = "what is my name"
final_prompt = build_final_prompt(user_question, search_enhanced(user_question), prompts)
print(final_prompt)

In [None]:
# Cell 1: Install Dependencies
!pip install -q transformers accelerate bitsandbytes sentencepiece

In [None]:
# Cell A — deps
!pip -q install --upgrade transformers accelerate bitsandbytes huggingface_hub sentencepiece
import torch, os
print("Torch:", torch.__version__)


In [None]:
# Cell B — HF login (pick ONE of the two methods)

# --- Option 1: interactive login (paste token when prompted)
from huggingface_hub import login
login()  # paste your HF token here when asked

# --- Option 2: env var + pass token to from_pretrained
# import os
# os.environ["HF_TOKEN"] = "hf_xxx...your_token..."


In [None]:
# Cell C — load model + tokenizer
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig

MODEL_ID = "meta-llama/Llama-3.1-8B-Instruct"

# T4 doesn't do bfloat16 well; stick to float16 compute
compute_dtype = torch.float16

# 4-bit NF4 quantization (fast + memory efficient on T4)
bnb_4bit = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_use_double_quant=True,
    bnb_4bit_compute_dtype=compute_dtype,
)

# If you used Option 2 above:
HF_TOKEN = os.environ.get("HF_TOKEN", None)

print(f"Loading {MODEL_ID} in 4-bit NF4...")
tokenizer = AutoTokenizer.from_pretrained(
    MODEL_ID,
    use_fast=True,
    token=HF_TOKEN,           # ignored if you logged in interactively
)

model = AutoModelForCausalLM.from_pretrained(
    MODEL_ID,
    device_map="auto",
    torch_dtype=compute_dtype,
    quantization_config=bnb_4bit,
    attn_implementation="sdpa",  # Llama 3.1 supports SDPA; faster than eager
    low_cpu_mem_usage=True,
    token=HF_TOKEN,              # ignored if you logged in interactively
)

# make sure pad token id is set
if model.generation_config.pad_token_id is None:
    model.generation_config.pad_token_id = tokenizer.eos_token_id

model.eval()
print("✅ Llama 3.1 8B Instruct is ready.")

if torch.cuda.is_available():
    torch.cuda.synchronize()
    print("GPU:", torch.cuda.get_device_name(0))
    print(f"VRAM in use: {torch.cuda.memory_allocated(0)/1024**3:.2f} GB")


In [None]:
# Cell D — robust Llama generate (token-slicing + min_new_tokens)
def llama_generate(prompt, max_new_tokens=120, temperature=None, min_new_tokens=16):
    """
    - Slices by token length (stable).
    - Forces at least `min_new_tokens` so it can't stop immediately.
    - Greedy by default for speed; pass temperature>0 to sample.
    """
    messages = [{"role": "user", "content": prompt}]

    # Get input_ids directly from the chat template (no extra tokenize step)
    input_ids = tokenizer.apply_chat_template(
        messages, add_generation_prompt=True, return_tensors="pt"
    ).to(model.device)

    # Many Llama 3.1 builds use both <|eot_id|> and <|end_of_text|> as terminators
    terminators = [tokenizer.eos_token_id]
    try:
        eot_id = tokenizer.convert_tokens_to_ids("<|eot_id|>")
        if eot_id is not None and eot_id != tokenizer.eos_token_id:
            terminators.append(eot_id)
    except Exception:
        pass  # not all tokenizers have <|eot_id|>

    gen_kwargs = dict(
        input_ids=input_ids,
        max_new_tokens=max_new_tokens,
        min_new_tokens=min_new_tokens,
        use_cache=True,
        eos_token_id=terminators if len(terminators) > 1 else terminators[0],
        do_sample=False,  # greedy by default
    )
    if temperature is not None and temperature > 0:
        gen_kwargs.update(do_sample=True, temperature=temperature, top_p=0.95)

    model.eval()
    with torch.inference_mode():
        out = model.generate(**gen_kwargs)

    # Slice off the prompt by token count (stable) and decode
    gen_only = out[0, input_ids.shape[-1]:]
    return tokenizer.decode(gen_only, skip_special_tokens=True).strip()


In [None]:
import time
q = "Say hi in one short sentence."
t0 = time.time()
resp = llama_generate(q, max_new_tokens=60, min_new_tokens=12)  # small but non-zero
print(repr(resp))  # repr() shows if it's actually empty
print(f"\nElapsed: {time.time() - t0:.2f}s")


In [None]:
def generate_response(prompt, max_new_tokens=512, temperature=0.7):
    """Generate response with the currently loaded Llama model (RAG prompts unchanged)."""

    # Format as chat (same structure you used before)
    messages = [{"role": "user", "content": prompt}]

    # Build input_ids directly from the chat template (more robust than char-slicing)
    input_ids = tokenizer.apply_chat_template(
        messages,
        add_generation_prompt=True,
        return_tensors="pt"
    ).to(model.device)

    # Llama 3.1 often uses both <|eot_id|> and EOS as terminators
    terminators = [tokenizer.eos_token_id]
    try:
        eot_id = tokenizer.convert_tokens_to_ids("<|eot_id|>")
        if eot_id is not None and eot_id != tokenizer.eos_token_id:
            terminators.append(eot_id)
    except Exception:
        pass  # not all tokenizers expose <|eot_id|>

    # Build generation kwargs
    gen_kwargs = dict(
        max_new_tokens=max_new_tokens,
        use_cache=True,  # KV-cache ON for speed
        eos_token_id=terminators if len(terminators) > 1 else terminators[0],
        do_sample=False,  # greedy by default (fast & stable for RAG)
    )

    # Only enable sampling if you actually want it
    if temperature is not None and temperature > 0:
        gen_kwargs.update(
            do_sample=True,
            temperature=float(temperature),
            top_p=0.95,
            repetition_penalty=1.1,
        )

    model.eval()
    with torch.inference_mode():
        outputs = model.generate(input_ids=input_ids, **gen_kwargs)

    # Slice off the prompt by *token* length (stable)
    gen_only = outputs[0, input_ids.shape[-1]:]
    return tokenizer.decode(gen_only, skip_special_tokens=True).strip()

print("âœ… Generation function updated with use_cache=False")


# ==============================================================================
# ==============================================================================
# Helper: collect clean, deduped source titles from search_enhanced(...) results
# Place this right after the search_enhanced(...) definition
# ==============================================================================
def extract_doc_titles(results, unique=True, limit=5, print_debug=False):
    """
    results: list of dicts from search_enhanced(...) like
             {"id": "...", "title": "...", "score": float, "preview": "..."}
    Returns a best-first list of titles (falls back to id if title missing).
    """
    if not results:
        return []

    titles_out, seen = [], set()
    # results are already sorted best-first by reranker/ANN logic in search_enhanced
    for r in results:
        t = r.get("title") or r.get("id") or "Untitled"
        if unique and t in seen:
            continue
        seen.add(t)
        titles_out.append(t)
        if limit and len(titles_out) >= limit:
            break

    if print_debug:
        print("ðŸ”Ž Sources:")
        for t in titles_out:
            print(f"- {t}")
    return titles_out


# ==============================================================================
# ==============================================================================
# CELL 7: Integrate with RAG Pipeline (returns answer + sources)
# ==============================================================================
def rag_generate(user_question, max_tokens=512):
    retrieved = search_enhanced(user_question)
    final_prompt = build_final_prompt(user_question, retrieved, prompts)
    print(f"final prompt = {final_prompt}")
    print(f" Retrieved {len(retrieved)} documents")
    print(" Generating response...")
    answer = generate_response(final_prompt, max_new_tokens=max_tokens)
    sources = extract_doc_titles(retrieved, unique=True, limit=5)
    return answer, sources





# ==============================================================================
user_question = "how to track my order?"
print(f"Question: {user_question}\n")
answer, sources = rag_generate(user_question)
print(f"Answer:\n{answer}\n")





In [None]:
# =========================
# Flask API: /health, /ping, /chat, /generate  (NO AUTH GUARD)
# Idempotent: safe to re-run in Colab without "overwriting endpoint" or port errors
# =========================
import os
import time
import threading
from flask import Flask, request, jsonify

# Reuse existing Flask app if already created
app = globals().get("app")
if app is None:
    app = Flask(__name__)
    globals()["app"] = app

# Helper to avoid double-registering the same endpoint
def _register_route(rule, endpoint, view_func, methods=("GET",)):
    if endpoint in app.view_functions:
        # Already registered -> skip
        return
    app.add_url_rule(rule, endpoint, view_func, methods=list(methods))

def _safe_sources(retrieved, limit=5):
    try:
        return extract_doc_titles(retrieved, unique=True, limit=limit)
    except NameError:
        seen, out = set(), []
        for r in retrieved or []:
            t = r.get("title") or r.get("id") or "Untitled"
            if t in seen:
                continue
            seen.add(t)
            out.append(t)
            if len(out) >= limit:
                break
        return out

# ---------- Routes (defined as plain functions; registered once) ----------

def health():
    return jsonify(
        status="ok",
        model=str(getattr(model.config, "name_or_path", "llm")),
        device=str(model.device),
    ), 200
_register_route("/health", "health", health, methods=("GET",))

def ping():
    data = request.get_json(silent=True) or {}
    prompt = data.get("prompt") or data.get("question") or ""
    if not prompt:
        return jsonify(error="Missing 'prompt'"), 400
    t0 = time.time()
    out = generate_response(prompt, max_new_tokens=int(data.get("max_new_tokens", 160)))
    return jsonify(answer=out, latency_s=round(time.time() - t0, 3)), 200
_register_route("/ping", "ping", ping, methods=("POST",))

def chat():
    data = request.get_json(silent=True) or {}
    question = data.get("question") or data.get("prompt") or ""
    if not question:
        return jsonify(error="Missing 'question'"), 400

    t0 = time.time()
    answer, sources = rag_generate(question)  # your rag_generate returns (answer, sources)
    latency = round(time.time() - t0, 3)

    return jsonify(
        answer=answer,
        sources=sources,
        latency_s=latency,
    ), 200
_register_route("/chat", "chat", chat, methods=("POST",))

def generate_endpoint():
    data = request.get_json(silent=True) or {}
    prompt = data.get("prompt") or data.get("question") or ""
    if not prompt:
        return jsonify({"error": "Missing 'prompt'"}), 400

    max_new_tokens = int(data.get("max_new_tokens", 160))
    temperature = data.get("temperature", None)
    try:
        temperature = float(temperature) if temperature is not None else 0.3
    except Exception:
        temperature = 0.3

    t0 = time.time()
    text = generate_response(
        prompt,
        max_new_tokens=max_new_tokens,
        temperature=temperature if (temperature and temperature > 0) else 0.3
    )
    return jsonify({"text": text, "latency_s": round(time.time() - t0, 3)}), 200
_register_route("/generate", "generate_endpoint", generate_endpoint, methods=("POST",))

# ---------- Start Flask only once ----------
def _run():
    app.run(
        host="0.0.0.0",
        port=int(os.environ.get("PORT", 5000)),
        debug=False,
        threaded=True,
        use_reloader=False,
    )

if not globals().get("_FLASK_THREAD_RUNNING", False):
    thread = threading.Thread(target=_run, daemon=True)
    thread.start()
    globals()["_FLASK_THREAD_RUNNING"] = True
    print("✅ Flask started on http://127.0.0.1:5000")
else:
    print("ℹ️ Flask already running on http://127.0.0.1:5000")


In [None]:
import requests, json, time

BASE = "http://127.0.0.1:5000"

# 1) health
print("GET /health ->")
print(requests.get(f"{BASE}/health").json(), "\n")

# 2) ping (direct LLM, no RAG)
print("POST /ping ->")
print(requests.post(f"{BASE}/ping", json={
    "prompt": "Say hi in one short sentence.",
    "max_new_tokens": 80
}).json(), "\n")

# 3) generate (plain completion)
print("POST /generate ->")
t0 = time.time()
res = requests.post(
    f"{BASE}/generate",
    json={
        "prompt": "Write a short welcome message for Shoplite and mention email verification.",
        "max_new_tokens": 160
    }
)
print(res.status_code, res.headers.get("content-type"))
print(res.json())
print(f"\nElapsed: {time.time()-t0:.2f}s")


In [None]:
from pyngrok import ngrok

ngrok_token = input("Enter your ngrok token: ").strip()
ngrok.set_auth_token(ngrok_token)

# Close previous tunnels (if any)
for t in ngrok.get_tunnels():
    ngrok.disconnect(t.public_url)

public_url = ngrok.connect(5000, "http").public_url
print("🌐 Public URL:", public_url)
print("Try POST /chat and /ping against this URL.")
