In [1]:
!pip -q install --upgrade transformers sentence-transformers faiss-cpu pypdf pillow accelerate

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m40.1/40.1 kB[0m [31m1.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m11.6/11.6 MB[0m [31m29.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m486.6/486.6 kB[0m [31m7.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m31.4/31.4 MB[0m [31m17.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m322.5/322.5 kB[0m [31m7.8 MB/s[0m eta [36m0:00:00[0m
[?25h

In [2]:
import os, math, textwrap, json
from pathlib import Path
from typing import List, Tuple
import torch
import faiss
from pypdf import PdfReader
from PIL import Image

from sentence_transformers import SentenceTransformer
from transformers import (
    AutoTokenizer, AutoModelForCausalLM, pipeline,
    BlipProcessor, BlipForConditionalGeneration
)

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print("Using device:", DEVICE)

Using device: cpu


In [3]:
from google.colab import files
up = files.upload()  # choose your PDF
PDF_PATH = next((f"/content/{name}" for name in up.keys() if name.lower().endswith(".pdf")), None)
print("PDF_PATH =", PDF_PATH)

Saving MINI-Cooper-Cooper-S-2007-2010-factory-repair-manual (1).pdf to MINI-Cooper-Cooper-S-2007-2010-factory-repair-manual (1).pdf
PDF_PATH = /content/MINI-Cooper-Cooper-S-2007-2010-factory-repair-manual (1).pdf


In [4]:
# Cell — Ingest PDF -> chunks -> embeddings -> FAISS
import re
from pathlib import Path
from pypdf import PdfReader
import torch, faiss, numpy as np
from sentence_transformers import SentenceTransformer

print("Using PDF_PATH:", PDF_PATH)

# Quick mode so big manuals index fast (flip to False later for full parsing)
FAST_MODE   = True     # set False to parse every page
PAGE_STRIDE = 3        # take every 3rd page in FAST_MODE
MAX_PAGES   = 180      # cap pages in FAST_MODE

# --- 1) Extract text (page-aware) ---
def extract_pages(pdf_path: str):
    if not PDF_PATH or not Path(pdf_path).exists():
        # tiny fallback so the pipeline still works
        return [{"page": 1, "text":
            "Radio receiver removal: Disconnect battery negative lead. "
            "Unscrew fasteners, pull unit forward, disconnect wiring harness and antenna lead. "
            "Install in reverse order; observe ESD precautions. Torque to spec."
        }]
    reader = PdfReader(pdf_path)
    total = len(reader.pages)
    idxs = range(total) if not FAST_MODE else range(0, min(total, MAX_PAGES), PAGE_STRIDE)
    pages = []
    for i in idxs:
        t = (reader.pages[i].extract_text() or "").strip()
        t = re.sub(r"\s+", " ", t)
        if t:
            pages.append({"page": i+1, "text": t})
    if not pages:  # final fallback
        pages = [{"page": 1, "text": "No text extracted; please try FAST_MODE=False."}]
    return pages

# --- 2) Chunk with overlap ---
def chunk_pages(pages, max_chars=900, overlap=150):
    chunks = []
    for pg in pages:
        t = pg["text"]
        start = 0
        while start < len(t):
            end = min(len(t), start + max_chars)
            chunks.append({"page": pg["page"], "text": t[start:end]})
            if end == len(t): break
            start = max(0, end - overlap)
    return chunks

PAGES  = extract_pages(PDF_PATH)
CHUNKS = chunk_pages(PAGES)
print(f"Pages parsed: {len(PAGES)} | Chunks: {len(CHUNKS)}")

# --- 3) Embeddings + FAISS (cosine via normalized inner product) ---
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
embed_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2", device=DEVICE)

texts = [c["text"] for c in CHUNKS]
embs  = embed_model.encode(texts, convert_to_numpy=True, normalize_embeddings=True, show_progress_bar=True)

index = faiss.IndexFlatIP(embs.shape[1])
index.add(embs)

DOCS = [{"id": i, "page": CHUNKS[i]["page"], "text": texts[i]} for i in range(len(texts))]
print("FAISS index size:", index.ntotal)

# --- 4) Handy peek + quick search to show progress ---
def peek(i=0):
    d = DOCS[min(max(0, i), len(DOCS)-1)]
    return f"[p.{d['page']}] {d['text'][:400]}…"

def search_preview(query, k=3):
    q = embed_model.encode([query], convert_to_numpy=True, normalize_embeddings=True)
    scores, idxs = index.search(q, k)
    for r, (j, s) in enumerate(zip(idxs[0], scores[0]), start=1):
        d = DOCS[int(j)]
        print(f"#{r} score={s:.3f} [p.{d['page']}] {d['text'][:160]}…")

print("peek(0):", peek(0))
print("Try: search_preview('radio receiver removal')")


Using PDF_PATH: /content/MINI-Cooper-Cooper-S-2007-2010-factory-repair-manual (1).pdf
Pages parsed: 60 | Chunks: 112


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

Batches:   0%|          | 0/4 [00:00<?, ?it/s]

FAISS index size: 112
peek(0): [p.1] 2007 ACCESSORIES & EQUIPMENT Audio, Navigation and Anti-Theft - Repair Instructions - Cooper 11 MONO RADIO 65 11 030 REMOVING AND INSTALLING/REPLACING RADIO RECEIVER (BUILT-IN UNIT) Necessary preliminary tasks:  Remove cover strips for front center console Release screws (1) and pull radio receiver back. Remove holder (1) for radio wiring harness. Fig. 1: Radio Receiver Mounting Screws And Remova…
Try: search_preview('radio receiver removal')


In [5]:
# TinyLlama for text-to-text over retrieved manual chunks
!pip -q install --upgrade "transformers>=4.46.1" accelerate

import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
import numpy as np

# Sanity: make sure the retriever objects exist (from your previous cell)
assert "embed_model" in globals(), "Run the PDF indexing cell first to build embed_model."
assert "index" in globals(), "Run the PDF indexing cell first to build the FAISS index."
assert "DOCS" in globals() and len(DOCS)>0, "Run the PDF indexing cell first to populate DOCS."

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
DTYPE  = torch.float16 if DEVICE=="cuda" else torch.float32

tok = AutoTokenizer.from_pretrained("TinyLlama/TinyLlama-1.1B-Chat-v1.0")
# avoid pad_token warning
if tok.pad_token_id is None and tok.eos_token_id is not None:
    tok.pad_token_id = tok.eos_token_id

llm = AutoModelForCausalLM.from_pretrained(
    "TinyLlama/TinyLlama-1.1B-Chat-v1.0",
    torch_dtype=DTYPE,
    device_map="auto"        # let Accelerate place weights
)

# IMPORTANT: no device=... here since device_map="auto" is used
gen = pipeline("text-generation", model=llm, tokenizer=tok)

SYSTEM = (
    "You are a repair assistant. Use ONLY the provided context to answer. "
    "Start with SAFETY notes if any. Give a short, numbered procedure. "
    "If context is insufficient, say what is missing and stop. "
    "End with 'Sources:' listing the chunk ranks and pages you used."
)

def _retrieve(query: str, k: int = 4):
    qvec = embed_model.encode([query], convert_to_numpy=True, normalize_embeddings=True)
    scores, idxs = index.search(qvec, k)
    hits = []
    for r in range(min(k, len(DOCS))):
        j = int(idxs[0][r])
        d = DOCS[j]
        hits.append({
            "rank": r+1,
            "score": float(scores[0][r]),
            "page": d.get("page", None),
            "text": d["text"]
        })
    return hits

def answer(question: str, k: int = 4, max_new_tokens: int = 320) -> dict:
    hits = _retrieve(question, k=k)
    context = "\n\n".join([f"[Chunk {h['rank']}] [p.{h['page']}] {h['text']}" for h in hits]) or "(no context)"
    prompt = (
        f"<s>[INST] <<SYS>>{SYSTEM}<</SYS>>\n"
        f"Question: {question}\n"
        f"Context:\n{context}\n[/INST]"
    )
    out = gen(prompt, max_new_tokens=max_new_tokens, do_sample=False)[0]["generated_text"]
    if "[/INST]" in out:
        out = out.split("[/INST]", 1)[-1].strip()
    cites = [{"rank":h["rank"], "page":h["page"], "score":round(h["score"],3)} for h in hits]
    return {"answer": out, "citations": cites}


tokenizer_config.json: 0.00B [00:00, ?B/s]

tokenizer.model:   0%|          | 0.00/500k [00:00<?, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/551 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/608 [00:00<?, ?B/s]

`torch_dtype` is deprecated! Use `dtype` instead!


model.safetensors:   0%|          | 0.00/2.20G [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/124 [00:00<?, ?B/s]

Device set to use cpu


In [8]:
# Cell 1: imports
import re
from typing import List, Dict, Any


In [9]:
# Cell 2: retrieval adapters
def _default_retrieve(query: str, k: int = 6) -> List[Dict[str, Any]]:
    """
    Tries common retrieval objects you might already have:
      - retriever.get_relevant_documents(query)
      - vectorstore.similarity_search(query, k)
      - retrieve(query, k)
    Returns list of dicts: {"text", "page", "source"}.
    """
    if 'retriever' in globals():
        docs = globals()['retriever'].get_relevant_documents(query)
        out = []
        for d in docs[:k]:
            meta = getattr(d, "metadata", {}) or {}
            out.append({
                "text": getattr(d, "page_content", str(d)),
                "page": meta.get("page") or meta.get("page_number") or meta.get("pageno"),
                "source": meta.get("source") or meta.get("file") or meta.get("title") or meta.get("document_id"),
            })
        return out

    if 'vectorstore' in globals():
        docs = globals()['vectorstore'].similarity_search(query, k=k)
        out = []
        for d in docs:
            meta = getattr(d, "metadata", {}) or {}
            out.append({
                "text": d.page_content,
                "page": meta.get("page") or meta.get("page_number"),
                "source": meta.get("source") or meta.get("title"),
            })
        return out

    if 'retrieve' in globals() and callable(globals()['retrieve']):
        docs = globals()['retrieve'](query, k=k)
        out = []
        for d in docs:
            if isinstance(d, str):
                out.append({"text": d, "page": None, "source": None})
            elif isinstance(d, dict):
                out.append({
                    "text": d.get("text") or d.get("page_content") or d.get("content") or "",
                    "page": d.get("page") or d.get("page_number"),
                    "source": d.get("source") or d.get("title"),
                })
        return out

    raise RuntimeError(
        "No retriever found. Define `retriever`, `vectorstore`, or a `retrieve(query, k)` function."
    )


In [10]:
# Cell 3: llm caller + helpers

# Strict system rules
_SYS_STRICT = (
    "You are a cautious automotive repair assistant.\n"
    "RULES:\n"
    "1) Use ONLY the provided context. If missing info, say what is missing and stop.\n"
    "2) Start with SAFETY notes if any.\n"
    "3) If steps are asked, give a SHORT numbered procedure (3–10 steps max).\n"
    "4) If tools/cautions are asked, list concise bullets.\n"
    "5) Do NOT paste raw chunks, headers like '[Chunk ...]', or page banners. Summarize.\n"
    "6) Keep it under ~8 lines unless essential.\n"
)

def _call_llm(messages: List[Dict[str, str]]) -> str:
    """
    Calls your chat model via:
      - OpenAI `client.chat.completions.create`, or
      - LangChain chat model in `chat` or `llm` with .invoke(messages)
    """
    if 'client' in globals():
        resp = client.chat.completions.create(
            model = globals().get("MODEL_NAME", "gpt-4o-mini"),
            messages = messages,
            temperature = 0.2,
        )
        return resp.choices[0].message.content.strip()

    if 'chat' in globals():
        out = globals()['chat'].invoke(messages)
        return getattr(out, "content", str(out)).strip()

    if 'llm' in globals():
        out = globals()['llm'].invoke(messages)
        return getattr(out, "content", str(out)).strip()

    raise RuntimeError("No chat model found: define `client`, or a LangChain chat model `chat`/`llm`.")

def _strip_sys_leak(text: str) -> str:
    text = re.sub(r"<<SYS>>.*?(?=\n|$)", "", text, flags=re.DOTALL)
    text = re.sub(r"^System:\s*", "", text, flags=re.IGNORECASE)
    return text.strip()

def _looks_like_raw_chunk(text: str) -> bool:
    if re.search(r"^\s*\[Chunk\s*\d+\]", text, flags=re.IGNORECASE|re.MULTILINE):
        return True
    if re.search(r"ACCESSORIES\s*&\s*EQUIPMENT", text):
        return True
    if len(text) > 600 and text.count("\n") <= 2:
        return True
    return False

def _build_user_prompt(query: str, contexts: List[Dict[str, Any]]) -> str:
    joined = "\n\n---\n\n".join([c["text"] for c in contexts if c.get("text")])
    return (
        f"User question:\n{query}\n\n"
        f"Context (snippets from the manual):\n{joined}\n\n"
        "Write the answer following the RULES. If data is insufficient, say what pages/sections are missing."
    )

def _make_citations(contexts: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    seen, cites = set(), []
    for c in contexts:
        page = c.get("page"); src = c.get("source")
        if page is None and src is None:
            continue
        key = (page, src)
        if key in seen:
            continue
        seen.add(key)
        cites.append({"page": page, "source": src})
    return cites


In [37]:
# Safe, self-contained answer(): no NameError on 'cleaned'
def answer(query: str, k: int = 6):
    # 1) Retrieve
    ctxs = _default_retrieve(query, k=k)
    if not ctxs:
        return {"answer": "No context found for this query.", "citations": []}

    # 2) Build messages
    user_prompt = _build_user_prompt(query, ctxs)
    msgs = [
        {"role": "system", "content": _SYS_STRICT},
        {"role": "user", "content": user_prompt},
    ]

    # 3) Generate
    raw_text = _call_llm(msgs)
    base_text = _strip_sys_leak(raw_text)

    # 4) Optional finalizer (only if defined)
    final_text = base_text
    if '_finalize_answer' in globals():
        try:
            final_text = _finalize_answer(base_text)
        except Exception:
            # if your helper misfires, keep the base text
            final_text = base_text

    # 5) Guard against raw chunk echo
    if _looks_like_raw_chunk(final_text):
        final_text = (
            "I have the relevant pages, but the text appears to be raw chunks. "
            "Here’s a concise summary based on the context:\n\n"
            + re.sub(r"^\s*\[Chunk.*?\]\s*", "", final_text, flags=re.IGNORECASE|re.DOTALL)[:800]
        )

    # 6) Citations
    citations = _make_citations(ctxs)

    return {"answer": final_text.strip(), "citations": citations}


In [13]:
# Cell A: quick probe for likely objects
suspects = {}
for name, obj in list(globals().items()):
    lname = name.lower()
    if any(key in lname for key in ["retriever", "vectorstore", "faiss", "chroma", "db", "index", "store", "vs"]):
        suspects[name] = type(obj).__name__
suspects


{'faiss': 'module', 'index': 'IndexFlatIP'}

In [16]:
# Cell B2: adapter for a raw faiss.IndexFlat* named `index`
# Tries common embedders automatically and builds a LangChain-like retriever.

import numpy as np
import types

# --- 1) Query embedding helper (tries several common objects) ---
def _embed_query_auto(text: str) -> np.ndarray:
    # OpenAI / LangChain-style
    if 'embeddings' in globals():
        emb_obj = globals()['embeddings']
        if hasattr(emb_obj, 'embed_query'):
            v = emb_obj.embed_query(text)
            return np.array(v, dtype=np.float32)
        if hasattr(emb_obj, 'encode'):
            v = emb_obj.encode([text])[0]
            return np.array(v, dtype=np.float32)

    # SentenceTransformers-like
    for name in ['embedding_model', 'encoder', 'st_model', 'model']:
        if name in globals():
            m = globals()[name]
            if hasattr(m, 'encode'):
                v = m.encode([text])[0]
                return np.array(v, dtype=np.float32)

    raise RuntimeError(
        "No embedding function found. Provide an object named `embeddings` with .embed_query(), "
        "or a SentenceTransformers-like model with .encode()."
    )

# --- 2) Try to discover your docs + metadatas lists in globals ---
def _discover_docs():
    # Try common variable names
    text_candidates = ['texts', 'docs', 'documents', 'pages', 'corpus']
    meta_candidates = ['metadatas', 'metadata', 'metas', 'infos']

    texts = None
    metas = None

    for n in text_candidates:
        if n in globals():
            texts = globals()[n]
            break
    for n in meta_candidates:
        if n in globals():
            metas = globals()[n]
            break

    # Fallback: if you already have a single list of dicts
    if texts is None and 'docs_list' in globals():
        return globals()['docs_list']

    if texts is None:
        raise RuntimeError(
            "Could not find your documents. Define a list `texts` (or `docs`/`documents`) "
            "and optional `metadatas` aligned with the FAISS index order."
        )

    if metas is None:
        metas = [None] * len(texts)

    if len(texts) != len(metas):
        raise RuntimeError("Length mismatch: texts and metadatas must have the same length.")

    out = []
    for t, m in zip(texts, metas):
        if isinstance(m, dict):
            page = m.get("page") or m.get("page_number")
            src  = m.get("source") or m.get("title")
        else:
            page = None
            src  = None
        out.append({"text": str(t), "page": page, "source": src})
    return out

docs_list = _discover_docs()

# --- 3) Build a minimal retriever around your FAISS index ---
try:
    import faiss  # just to check availability
except Exception:
    faiss = None

def _l2_normalize(x: np.ndarray) -> np.ndarray:
    n = np.linalg.norm(x, axis=-1, keepdims=True) + 1e-12
    return x / n

# Detect if inner-product index (cosine) and normalize accordingly
_is_ip = 'index' in globals() and type(globals()['index']).__name__.lower().endswith('flatip')

class RawFaissRetriever:
    def __init__(self, faiss_index, docs, k=6, normalize_for_ip=True):
        self.index = faiss_index
        self.docs = docs
        self.k = k
        self.normalize_for_ip = normalize_for_ip

    def get_relevant_documents(self, query: str):
        q = _embed_query_auto(query).astype('float32')
        if self.normalize_for_ip and _is_ip:
            q = _l2_normalize(q)
        q = q.reshape(1, -1)
        D, I = self.index.search(q, self.k)
        out = []
        for idx in I[0]:
            if idx == -1:
                continue
            # Defensive: FAISS sometimes returns > len(docs); guard it
            if 0 <= idx < len(self.docs):
                # Mimic LangChain Document
                class _Doc:
                    def __init__(self, d):
                        self.page_content = d["text"]
                        self.metadata = {"page": d.get("page"), "source": d.get("source")}
                out.append(_Doc(self.docs[idx]))
        return out

# Build global `retriever` so answer() can use it
if 'index' not in globals():
    raise RuntimeError("Expected your FAISS object to be named `index` (e.g., faiss.IndexFlatIP).")

retriever = RawFaissRetriever(faiss_index=index, docs=docs_list, k=6, normalize_for_ip=True)

"OK: RawFaissRetriever bound as `retriever`"


'OK: RawFaissRetriever bound as `retriever`'

In [25]:
# Cell E0: Lexical TF-IDF fallback retriever (no embeddings required)

import re, math
from collections import Counter

# Ensure docs_list exists
if 'docs_list' not in globals():
    # Try to reuse the helper from B2 if available
    if '_discover_docs' in globals():
        docs_list = _discover_docs()
    else:
        raise RuntimeError("Need docs_list or a _discover_docs() helper to gather texts/metadatas.")

# Build a tiny TF-IDF index (once)
_LEX = {"built": False}

def _tokenize(text: str):
    # simple word tokenizer; drop very short tokens
    return [w for w in re.findall(r"\b\w+\b", str(text).lower()) if len(w) > 2]

def _build_lex_index(docs):
    N = len(docs)
    df = Counter()
    docs_tokens = []

    for d in docs:
        toks = _tokenize(d["text"])
        docs_tokens.append(toks)
        df.update(set(toks))

    # smooth IDF
    idf = {w: math.log((N + 1) / (df[w] + 1)) + 1.0 for w in df}

    # document TF-IDF vectors (sparse)
    doc_vecs, doc_norms = [], []
    for toks in docs_tokens:
        tf = Counter(toks)
        L = max(len(toks), 1)
        vec = {w: (tf[w] / L) * idf.get(w, 0.0) for w in tf}
        doc_vecs.append(vec)
        doc_norms.append(math.sqrt(sum(v*v for v in vec.values())) + 1e-12)

    return {"idf": idf, "doc_vecs": doc_vecs, "doc_norms": doc_norms}

def _ensure_built():
    if not _LEX["built"]:
        built = _build_lex_index(docs_list)
        _LEX.update(built)
        _LEX["built"] = True

def _cosine_sparse(a: dict, b: dict, norm_a: float, norm_b: float) -> float:
    # a and b are {token: weight}
    if not a or not b:
        return 0.0
    common = set(a).intersection(b)
    dot = sum(a[t] * b[t] for t in common)
    return dot / (norm_a * norm_b) if norm_a and norm_b else 0.0

def retrieve(query: str, k: int = 6):
    """
    Lexical retrieval: returns list of dicts with keys 'text', 'page', 'source'
    """
    _ensure_built()

    toks = _tokenize(query)
    if not toks:
        return []

    tf = Counter(toks)
    Lq = max(len(toks), 1)
    # query vector (TF-IDF)
    qvec = {w: (tf[w] / Lq) * _LEX["idf"].get(w, 0.0) for w in tf}
    qnorm = math.sqrt(sum(v*v for v in qvec.values())) + 1e-12

    scores = []
    for i, vec in enumerate(_LEX["doc_vecs"]):
        score = _cosine_sparse(qvec, vec, qnorm, _LEX["doc_norms"][i])
        scores.append((score, i))

    top = sorted(scores, key=lambda x: x[0], reverse=True)[:k]
    out = []
    for _, idx in top:
        d = docs_list[idx]
        out.append({"text": d["text"], "page": d.get("page"), "source": d.get("source")})
    return out

# Make sure answer() uses this fallback instead of the FAISS retriever:
globals().pop("retriever", None)

"OK: Lexical fallback active (embeddings not required)"


'OK: Lexical fallback active (embeddings not required)'

In [29]:
# Cell G2a: TinyLlama (open)
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch

model_name = "TinyLlama/TinyLlama-1.1B-Chat-v1.0"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    device_map="auto",
    torch_dtype=torch.float16,
)




In [30]:
chat = HFChatShim(model=model, tokenizer=tokenizer, max_new_tokens=300, temperature=0.2, top_p=0.95)


In [31]:
# Re-run tests now that `retriever` exists
res1 = answer("How do I remove the radio receiver safely?")
print("\n=== ANSWER 1 ===\n", res1["answer"], "\nCitations:", res1["citations"])

res2 = answer("List required tools and cautions before removing the center console.")
print("\n=== ANSWER 2 ===\n", res2["answer"], "\nCitations:", res2["citations"])



=== ANSWER 1 ===
 I have the relevant pages, but the text appears to be raw chunks. Here’s a concise summary based on the context:

User question:
How do I remove the radio receiver safely?

Context (snippets from the manual):
2007 ACCESSORIES & EQUIPMENT Audio, Navigation and Anti-Theft - Repair Instructions - Cooper 11 MONO RADIO 65 11 030 REMOVING AND INSTALLING/REPLACING RADIO RECEIVER (BUILT-IN UNIT) Necessary preliminary tasks:  Remove cover strips for front center console Release screws (1) and pull radio receiver back. Remove holder (1) for radio wiring harness. Fig. 1: Radio Receiver Mounting Screws And Removal Direction Courtesy of BMW OF NORTH AMERICA, INC. Unfasten plug connection (2) and disconnect. Disconnect antenna plug (3) and remove radio receiver. 2008 MINI Cooper 2007 ACCESSORIES & EQUIPMENT Audio, Navigation and Anti-Theft - Repair Instructions - Cooper 2008 MINI Cooper 2007 ACCESSORIES & EQUIPMEN 
Citations: []


This is a friendly reminder - the current text generation call will exceed the model's predefined maximum length (2048). Depending on the model, you may observe exceptions, performance degradation, or nothing at all.



=== ANSWER 2 ===
 I have the relevant pages, but the text appears to be raw chunks. Here’s a concise summary based on the context:

User question: List required tools and cautions before removing the center console.

Context (snippets from the manual):
Cornering Detection Shifting up in corners is prevented above a defined lateral acceleration. The radius of the bend is detected from the differences in speed of the inside and outside wheels. An accurate enough calculation of the lateral acceleration is made by taking the travelling speed into consideration. Identical front wheel tire circumferences are a prerequisite for this calculation.

---

2007 ACCESSORIES & EQUIPMENT Audio, Navigation and Anti-Theft - Repair Instructions - Cooper 11 MONO RADIO 65 11 030 REMOVING AND INSTALLING/REPLACING RADIO RECEIVER (BUILT-IN UNIT) Necessary preliminary tasks:

1. Remove cover strips for front center console
2. Remove holder (1 
Citations: []


In [32]:
# H1: stricter system rules + concise, non-echo user prompt with context tags
MAX_CTX_CHARS = 1600          # keep total context short
PER_CHUNK_LIMIT = 400         # trim each chunk

def _trim(txt, limit=PER_CHUNK_LIMIT):
    txt = str(txt).strip()
    return (txt[:limit] + " …") if len(txt) > limit else txt

# Replace your old _SYS_STRICT with this
_SYS_STRICT = (
    "You are an automotive repair assistant.\n"
    "Use ONLY the text inside <CONTEXT>…</CONTEXT>.\n"
    "Start with SAFETY if present.\n"
    "Return ONLY the final answer as short numbered steps or bullets.\n"
    "Do NOT repeat the question. Do NOT mention or quote the context.\n"
    "If info is missing, reply exactly: 'Missing: <what is missing>'."
)

# Replace your _build_user_prompt with this
def _build_user_prompt(query, contexts):
    # trim each chunk and cap total
    trimmed = []
    total = 0
    for c in contexts:
        t = _trim(c.get("text",""))
        if not t:
            continue
        if total + len(t) > MAX_CTX_CHARS:
            break
        trimmed.append(t)
        total += len(t)

    joined = "\n\n---\n\n".join(trimmed)
    return (
        f"Question: {query}\n"
        f"<CONTEXT>\n{joined}\n</CONTEXT>\n"
        "Answer using the RULES. Do not echo the question or the context tags."
    )


In [33]:
# H2: clean the final text further if the model still echoes
def _finalize_answer(text: str) -> str:
    t = text.strip()

    # Drop any 'User question:' or 'Context' sections if they slipped in
    t = re.sub(r"(?is)\bUser question:\b.*?(?=\n\n|$)", "", t)
    t = re.sub(r"(?is)\bContext\s*\(.*?\):.*?(?=\n\n|$)", "", t)
    t = re.sub(r"(?is)</?CONTEXT>", "", t)
    t = re.sub(r"(?is)\bQuestion:\b.*?(?=\n\n|$)", "", t)

    # If nothing looks like steps, keep the first ~12 lines only
    lines = [ln.rstrip() for ln in t.splitlines()]
    lines = [ln for ln in lines if ln.strip()]
    return "\n".join(lines[:12]).strip()

# In your answer() function, after `cleaned = _strip_sys_leak(raw)` add:
# cleaned = _finalize_answer(cleaned)


In [38]:
res1 = answer("How do I remove the radio receiver safely?", k=4)
print("\n=== ANSWER 1 ===\n", res1["answer"], "\nCitations:", res1["citations"])

res2 = answer("List required tools and cautions before removing the center console.", k=4)
print("\n=== ANSWER 2 ===\n", res2["answer"], "\nCitations:", res2["citations"])



=== ANSWER 1 ===
 To remove the radio receiver safely, follow these steps:
1. Remove cover strips for the front center console.
2. Remove the radio receiver holder (1) for the radio wiring harness.
3. Remove the radio receiver from the mounting screws (1) and pull it back.
4. Remove the holder (1) for the radio wiring harness.
Note: Ensure that the radio receiver is removed safely and securely to avoid any damage to the vehicle. 
Citations: []

=== ANSWER 2 ===
 Question: List the required tools and cautions before removing the center console.
Answer:
1. Remove cover strips for front center console
2. Remove holder (1) for radio wiring harness
3. Remove screws (1) and pull radio receiver back
4. Remove holder (1) for radio wiring harness
Cautions:
1. Do not drain fluid before the transmission has cooled down
2. Insert special tool 00 2 271 and screw down
3. If necessary, release cable holder (1)
4. Unfasten screws (2 and 4)
5. Release nuts (3) and remove carrier 
Citations: []
