# KK2 – Retrieval‑Augmented QA (English)

This notebook implements a compact RAG pipeline without external API keys.
Pipeline: load → chunk → vectorize (TF‑IDF / BM25 / optional SBERT) → retrieve → answer → evaluate.


In [None]:
# Imports & setup
import os, re, json, math, string
from pathlib import Path
from typing import List, Tuple, Dict
import numpy as np
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from rank_bm25 import BM25Okapi

# Optional semantic model
USE_SBERT = False
SBERT = None
try:
    from sentence_transformers import SentenceTransformer
    SBERT = SentenceTransformer('all-MiniLM-L6-v2')
    USE_SBERT = True
except Exception:
    USE_SBERT = False
print('SBERT enabled:', USE_SBERT)


## 1) Load data & make chunks

In [None]:
DATA_PATH = Path('data/handbook_en.txt')
raw_text = DATA_PATH.read_text(encoding='utf-8')

def chunk_text(text: str, chunk_size: int = 120, overlap: int = 20) -> List[str]:
    words = text.split()
    chunks, i = [], 0
    while i < len(words):
        chunk = words[i:i+chunk_size]
        chunks.append(' '.join(chunk))
        i += max(1, chunk_size - overlap)
    return chunks

chunks = chunk_text(raw_text, chunk_size=120, overlap=20)
pd.DataFrame({'chunk_id': range(len(chunks)), 'text': chunks}).head()


## 2) Vectorization: TF‑IDF, BM25, (optional) SBERT

In [None]:
# TF‑IDF
tfidf = TfidfVectorizer(ngram_range=(1,2), min_df=1)
X_tfidf = tfidf.fit_transform(chunks)

# BM25
tokenized = [c.split() for c in chunks]
bm25 = BM25Okapi(tokenized)

# SBERT (if available)
if USE_SBERT:
    X_sbert = SBERT.encode(chunks, normalize_embeddings=True)
else:
    X_sbert = None
print('TF‑IDF:', X_tfidf.shape, '| BM25 docs:', len(tokenized), '| SBERT:', None if X_sbert is None else X_sbert.shape)


## 3) Retrieval functions

In [None]:
from sklearn.preprocessing import normalize
def retrieve(query: str, top_k: int = 3, w_tfidf: float = 1.0, w_bm25: float = 1.0, use_sbert: bool = USE_SBERT, w_sbert: float = 1.0):
    q = query.strip()
    scores = np.zeros(len(chunks), dtype=float)
    # TF‑IDF
    q_tfidf = tfidf.transform([q])
    scores += w_tfidf * cosine_similarity(q_tfidf, X_tfidf)[0]
    # BM25
    bm25_scores = np.array(bm25.get_scores(q.split()))
    # normalize BM25 to 0..1
    if bm25_scores.max() > 0:
        bm25_scores = bm25_scores / bm25_scores.max()
    scores += w_bm25 * bm25_scores
    # SBERT
    if use_sbert and X_sbert is not None:
        q_s = SBERT.encode([q], normalize_embeddings=True)
        sbert_scores = X_sbert @ q_s[0]
        scores += w_sbert * sbert_scores
    # Sort
    idx = np.argsort(-scores)[:top_k]
    return [(int(i), float(scores[i]), chunks[int(i)]) for i in idx]

retrieve('Where should IT incidents be reported?')


## 4) Simple answer generation from retrieved context

In [None]:
def generate_answer(query: str, top_k=3, min_score=0.05, **weights):
    hits = retrieve(query, top_k=top_k, **weights)
    kept = [(i,s,c) for (i,s,c) in hits if s >= min_score]
    if not kept:
        return {'query': query, 'answer': "I couldn't find a direct answer in the material.", 'sources': []}
    context = '\n'.join([c for (_,_,c) in kept])
    answer = f"Based on the handbook:\n{context}"
    return {'query': query, 'answer': answer, 'sources': [int(i) for (i,_,_) in kept]}

generate_answer('When is a medical certificate required for sick leave?')


## 5) Ask questions

In [None]:
questions = [
    'Where should IT incidents be reported?',
    'When is a medical certificate required for sick leave?',
    'What is the mileage reimbursement?'
]
for q in questions:
    res = generate_answer(q, top_k=3)
    print('\nQ:', q)
    print(res['answer'])
    print('Sources chunk ids:', res['sources'])


## 6) Evaluation: EM and token‑level F1

In [None]:
from collections import Counter

def normalize_text(s: str) -> str:
    s = s.lower()
    s = ''.join(ch for ch in s if ch.isalnum() or ch.isspace())
    return ' '.join(s.split())

def f1_score(pred: str, gold: str) -> float:
    p_tokens = normalize_text(pred).split()
    g_tokens = normalize_text(gold).split()
    if not p_tokens and not g_tokens:
        return 1.0
    common = Counter(p_tokens) & Counter(g_tokens)
    num_same = sum(common.values())
    if num_same == 0:
        return 0.0
    precision = num_same / len(p_tokens)
    recall = num_same / len(g_tokens)
    return 2 * precision * recall / (precision + recall)

def exact_match(pred: str, gold: str) -> int:
    return int(normalize_text(pred) == normalize_text(gold))

def evaluate(eval_path='eval/eval_set.jsonl'):
    ems, f1s = [], []
    with open(eval_path, 'r', encoding='utf-8') as f:
        for line in f:
            ex = json.loads(line)
            q, gold = ex['question'], ex['answer']
            ans = generate_answer(q)['answer']
            # crude heuristic: take last line as the most specific sentence
            # or just use the whole answer
            pred = ans
            ems.append(exact_match(pred, gold))
            f1s.append(f1_score(pred, gold))
    return {'EM': float(np.mean(ems)), 'F1': float(np.mean(f1s))}

evaluate()
