# LLM Deployment Notebook (Colab-ready)
This notebook is self-contained for the Week 3 assignment. It uses open-source LLMs (Llama-style) and FAISS for RAG.
**Security note**: enter your ngrok token at runtime when prompted; do not hardcode secrets.

In [None]:
# Install dependencies
!pip install -q transformers accelerate bitsandbytes sentence-transformers faiss-cpu flask pyngrok uvicorn[standard]

In [None]:
# Load prompting files (knowledge base, prompts, QA) from local repo if available.
# The notebook will try several candidate directories (uploaded to Colab or mounted Drive).
import os, glob, re, yaml

candidate_dirs = [
    '/content/livedrop-JoeyMerhej/docs',
    '/content/livedrop/docs',
    '/content/docs',
    '/content/drive/MyDrive/livedrop-JoeyMerhej/docs',
    '/content/drive/MyDrive/livedrop/docs'
]

KNOWLEDGE_BASE = []
PROMPTS = None
GROUND_TRUTH_QA = []

def parse_knowledge_base_md(text):
    # Expect sections starting with '## Document N: Title'
    docs = []
    pattern = r'^##\s+Document\s+\d+:\s*(.+)$'
    matches = list(re.finditer(pattern, text, flags=re.M))
    if not matches:
        # fallback: try splitting on '---' blocks and take first line as title
        parts = [p.strip() for p in text.split('\n---\n') if p.strip()]
        for i, p in enumerate(parts):
            title_line = p.splitlines()[0][:200]
            docs.append({'id': f'doc{i+1}', 'title': title_line, 'content': p})
        return docs
    for idx, m in enumerate(matches):
        title = m.group(1).strip()
        start = m.end()
        end = matches[idx+1].start() if idx+1 < len(matches) else len(text)
        content = text[start:end].strip()
        docs.append({'id': f'doc{idx+1}', 'title': title, 'content': content})
    return docs


def parse_ground_truth_md(text):
    # Simple parser: find blocks starting with '### QNN:' and capture question and authoritative answer
    items = []
    blocks = re.split(r'\n\n###\s+', text)
    for b in blocks:
        if not b.strip():
            continue
        # restore leading ### if stripped
        if b.startswith('Q') or b.lower().startswith('q'):
            lines = b.splitlines()
            # first line: Q##: question
            first = lines[0]
            qparts = first.split(':',1)
            question = qparts[1].strip() if len(qparts)>1 else first.strip()
            # find authoritative answer
            ans = ''
            for i, L in enumerate(lines):
                if L.strip().lower().startswith('**authoritative answer:**'):
                    # collect following lines until blank
                    ans_lines = []
                    for rest in lines[i+1:]:
                        if rest.strip()=='' and len(ans_lines)>0:
                            break
                        ans_lines.append(rest.strip(' *'))
                    ans = ' '.join([a for a in ans_lines if a])
                    break
            items.append({'q': question, 'a': ans})
    return items

found = False
for base in candidate_dirs:
    kb_path = os.path.join(base, 'prompting', 'knowledge-base.md')
    prompts_path = os.path.join(base, 'prompting', 'assistant-prompts.yml')
    qa_path = os.path.join(base, 'prompting', 'ground-truth-qa.md')
    if os.path.exists(kb_path):
        print('Loading knowledge base from', kb_path)
        with open(kb_path, 'r', encoding='utf-8') as f:
            kb_text = f.read()
        KNOWLEDGE_BASE = parse_knowledge_base_md(kb_text)
        # load prompts if available
        if os.path.exists(prompts_path):
            try:
                with open(prompts_path, 'r', encoding='utf-8') as f:
                    PROMPTS = yaml.safe_load(f)
                print('Loaded prompts from', prompts_path)
            except Exception as e:
                print('Failed to load prompts YAML:', e)
        # load QA if available
        if os.path.exists(qa_path):
            try:
                with open(qa_path, 'r', encoding='utf-8') as f:
                    qa_text = f.read()
                GROUND_TRUTH_QA = parse_ground_truth_md(qa_text)
                print('Loaded ground-truth QA from', qa_path)
            except Exception as e:
                print('Failed to parse ground-truth QA:', e)
        found = True
        break

if not found:
    print('No prompting files found in candidate_dirs. Falling back to embedded minimal KB and prompts.')
    # Minimal embedded KB (keeps previous content)
    KNOWLEDGE_BASE = [
        {
            'id': 'doc1',
            'title': 'Shoplite User Registration Process',
            'content': 'To create a Shoplite account, users visit the registration page and provide an email, password, and basic profile information...'
        },
        {
            'id': 'doc2',
            'title': 'Shoplite Shopping Cart Features',
            'content': 'The Shoplite shopping cart allows users to add multiple items from different sellers, apply promotional codes...'
        }
    ]
    PROMPTS = {
        'base_retrieval_prompt': {
            'role': 'You are a helpful Shoplite customer service assistant.',
            'goal': 'Provide accurate answers using only the provided Shoplite documentation.',
            'response_format': 'Answer: [Your response]\nSources: [List titles]'
        }
    }

print('Knowledge base documents:', len(KNOWLEDGE_BASE))


In [None]:
# Load sentence-transformers for embeddings and build a FAISS index
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np

embed_model = SentenceTransformer('all-MiniLM-L6-v2')
texts = [d['content'] for d in KNOWLEDGE_BASE]
embeddings = embed_model.encode(texts, convert_to_numpy=True)
index = faiss.IndexFlatL2(embeddings.shape[1])
index.add(np.array(embeddings))

def retrieve(query, k=3):
    q_emb = embed_model.encode([query], convert_to_numpy=True)
    D, I = index.search(q_emb, k)
    return [KNOWLEDGE_BASE[i] for i in I[0]]

In [None]:
# Minimal generation using a small open-source causal LM from Hugging Face
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline, BitsAndBytesConfig
import torch

# NOTE: In Colab, choose a compatible open-source HF model that fits GPU memory.
# Suggested options (pick one based on available GPU VRAM):
# - 'tiiuae/falcon-7b-instruct' (good 7B instruct model; requires careful memory handling)
# - 'meta-llama/Llama-2-7b-chat-hf' (if you have access and it fits)
# - smaller causal models (gpt2, distilgpt2, facebook/opt-125m) are safe fallbacks for low-memory runtimes

# We'll default to Falcon-7B instruct as a non-OpenAI open-source model.
MODEL_NAME = 'tiiuae/falcon-7b-instruct'
SMALL_MODEL = 'gpt2'  # small causal fallback if the large model doesn't fit

# Use the new BitsAndBytesConfig via the `quantization_config` param to avoid deprecated args.
try:
    tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
    bnb_config = BitsAndBytesConfig(load_in_8bit=True, llm_int8_enable_fp32_cpu_offload=True)
    model = AutoModelForCausalLM.from_pretrained(
        MODEL_NAME,
        device_map='auto',
        quantization_config=bnb_config,
        torch_dtype=torch.float16,
        low_cpu_mem_usage=True,
    )
    generator = pipeline('text-generation', model=model, tokenizer=tokenizer, device_map='auto')
except Exception as e:
    print('Model load failed or OOM; trying a smaller causal model as fallback. Error:', e)
    # Fallback: load a small causal LM that fits in minimal memory (gpt2)
    tokenizer = AutoTokenizer.from_pretrained(SMALL_MODEL)
    model = AutoModelForCausalLM.from_pretrained(SMALL_MODEL)
    generator = pipeline('text-generation', model=model, tokenizer=tokenizer, device_map='auto')

def generate_answer(prompt, max_tokens=200):
    out = generator(prompt, max_new_tokens=max_tokens, do_sample=False)[0]['generated_text']
    return out


In [None]:
# Flask app exposing /chat, /ping, /health
from flask import Flask, request, jsonify
app = Flask(__name__)

@app.route('/health')
def health():
    return jsonify({'status':'ok'})

@app.route('/ping', methods=['POST'])
def ping():
    data = request.json or {}
    q = data.get('question','')
    return jsonify({'answer': q, 'sources': []})

@app.route('/chat', methods=['POST'])
def chat():
    data = request.json or {}
    q = data.get('question','')
    if not q:
        return jsonify({'error': 'no question provided'}), 400

    # Retrieve top-k documents
    docs = retrieve(q, k=3)
    context = "\n\n".join([f"Title: {d['title']}\n{d['content']}" for d in docs])

    # Build a safe prompt that instructs the model to use only the provided context
    prompt = (
        "You are a helpful Shoplite assistant. Answer the question using only the information in the provided documents. "
        "If the answer is not present, say you don't know.\n\n" +
        f"{context}\n\nQuestion: {q}\n\nAnswer:"
    )

    # Generate answer using the loaded open-source model
    try:
        answer = generate_answer(prompt, max_tokens=200)
    except Exception as e:
        return jsonify({'error': f'generation failed: {e}'}), 500

    return jsonify({'answer': answer, 'sources': [d['title'] for d in docs]})

# Note: In Colab, run the app with uvicorn in a background cell, e.g.:
# !uvicorn notebook_app:app --host 0.0.0.0 --port 8080 &
# Or save this cell as a Python file and run uvicorn against it.


In [None]:
# ngrok tunnel setup - interactive token input (do NOT hardcode tokens)
from pyngrok import ngrok
print("When prompted, paste your ngrok auth token. It will only be kept in memory for this session.")
ngrok_token = input("Enter your ngrok token: ")
# Set token in the running process only (not saved to disk)
ngrok.set_auth_token(ngrok_token)
public_url = ngrok.connect(8080).public_url
print('ngrok tunnel created at', public_url)

# Usage note: copy `public_url` and use it to call the endpoints from your local machine.
# When testing is finished, close the tunnel:
# ngrok.disconnect(public_url)
# ngrok.kill()


In [None]:
# End-to-end smoke tests: run 5 ground-truth QA checks and print pass/fail
import time, re

DEFAULT_PROMPT = {
    'role': 'You are a helpful Shoplite customer service assistant.',
    'goal': 'Provide accurate answers using only the provided Shoplite documentation.',
    'response_format': 'Answer: [Your response]\nSources: [List titles]'
}

def build_prompt(prompt_cfg, docs, user_query, max_context_chars=2000):
    role = prompt_cfg.get('role','')
    goal = prompt_cfg.get('goal','')
    response_format = prompt_cfg.get('response_format','')
    # Build doc context -- truncate long docs
    doc_texts = []
    for d in docs:
        text = f"Title: {d['title']}\n{d['content']}"
        if len(text) > max_context_chars:
            text = text[:max_context_chars] + " ... [TRUNCATED]"
        doc_texts.append(text)
    context = "\n\n".join(doc_texts)
    prompt = (
        f"{role}\n\nGoal: {goal}\n\nUse ONLY the following documents to answer. If the answer isn't present, say you don't know.\n\n"
        + f"{context}\n\nQuestion: {user_query}\n\n{response_format}"
    )
    return prompt


def score_answer(answer, authoritative):
    # Simple overlap heuristic: count longer keyword matches from authoritative answer
    a = (authoritative or '').lower()
    ans = (answer or '').lower()
    keywords = [w for w in re.findall(r"\w+", a) if len(w)>3]
    if not keywords:
        # if no authoritative tokens, pass if answer is non-empty
        return len(ans.strip())>20, 0
    matches = sum(1 for kw in keywords if kw in ans)
    score = matches / max(1, len(keywords))
    # pass threshold: at least 33% of keywords or at least 1 match
    passed = (matches >= 1) and (score >= 0.33)
    return passed, score


def run_smoke_tests(n=5, k=3):
    prompt_cfg = (PROMPTS or DEFAULT_PROMPT).get('base_retrieval_prompt', PROMPTS or DEFAULT_PROMPT)
    tests = GROUND_TRUTH_QA if GROUND_TRUTH_QA else []
    if not tests:
        print('No ground-truth QA loaded (GROUND_TRUTH_QA empty). Aborting smoke tests.')
        return
    total = min(n, len(tests))
    passed = 0
    results = []
    for i in range(total):
        item = tests[i]
        q = item.get('q') or item.get('question') or ''
        authoritative = item.get('a','')
        docs = retrieve(q, k=k)
        prompt = build_prompt(prompt_cfg, docs, q)
        t0 = time.time()
        try:
            ans = generate_answer(prompt, max_tokens=150)
        except Exception as e:
            ans = f'ERROR: generation failed: {e}'
        elapsed = (time.time()-t0)*1000
        ok, score = score_answer(ans, authoritative)
        results.append({'q': q, 'passed': ok, 'score': score, 'latency_ms': int(elapsed), 'answer_snippet': ans[:300], 'expected_snippet': authoritative[:300]})
        if ok: passed += 1
    # Summary
    print(f"Smoke test results: {passed}/{total} passed")
    for r in results:
        print('---')
        print('Q:', r['q'])
        print('Pass:', r['passed'], 'Score:', round(r['score'],2), 'Latency(ms):', r['latency_ms'])
        print('Answer snippet:', r['answer_snippet'])
        print('Expected snippet:', r['expected_snippet'])

# Run tests when this cell executes
run_smoke_tests(n=5, k=3)
