In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
# Module 1: Wikipedia Ingestion

from typing import List
import spacy


# ---------- 1. Loader ----------
def load_raw_wikipedia(path: str) -> str:
    """
    Load the raw Simple Wikipedia text file.
    """
    with open(path, "r", encoding="utf-8") as f:
        text = f.read()
    return text


# ---------- 2. Normalizer ----------
def normalize_text(text: str) -> str:
    """
    Normalize whitespace without changing semantics.
    """
    lines = text.splitlines()
    cleaned_lines = []

    for line in lines:
        line = line.strip()
        if line:
            cleaned_lines.append(line)

    return "\n".join(cleaned_lines)


# ---------- 3. Sentence Splitter ----------
def split_into_sentences(text: str, batch_size: int = 100_000) -> List[str]:
    """
    Split very large text into sentences safely by processing in chunks.
    """
    nlp = spacy.load(
        "en_core_web_sm",
        disable=["ner", "tagger", "lemmatizer"]
    )

    sentences = []

    for i in range(0, len(text), batch_size):
        text_slice = text[i:i + batch_size]
        doc = nlp(text_slice)

        for sent in doc.sents:
            sent_text = sent.text.strip()
            if sent_text:
                sentences.append(sent_text)

    return sentences

# ---------- 4. Public Interface ----------
def ingest_wikipedia(path: str) -> List[str]:
    """
    End-to-end ingestion pipeline.
    """
    raw_text = load_raw_wikipedia(path)
    normalized_text = normalize_text(raw_text)
    sentences = split_into_sentences(normalized_text)
    return sentences

In [3]:
sentences = ingest_wikipedia("/content/sample_data/AllCombined.txt")
print(len(sentences))
print(sentences[0])

402457
April
April (Apr.) is the fourth month of the year in the Julian and Gregorian calendars, and comes between March and May.


In [4]:
import pickle

sentences = ingest_wikipedia("/content/sample_data/AllCombined.txt")

with open("/content/drive/MyDrive/wiki_sentences.pkl", "wb") as f:
    pickle.dump(sentences, f)


In [5]:
with open("/content/drive/MyDrive/wiki_sentences.pkl", "rb") as f:
    sentences = pickle.load(f)

print(len(sentences))

1919487


In [6]:
from typing import List, Dict
import pickle

def count_tokens(text: str) -> int:
    """
    Approximate token count using word count.
    1 token ≈ 0.75 words (English heuristic).
    """
    return int(len(text.split()) / 0.75)

def build_chunks(
    sentences: List[str],
    min_tokens: int = 300,
    max_tokens: int = 600,
    overlap_tokens: int = 100
) -> List[Dict]:
    """
    Build sentence-aware overlapping chunks.
    """

    chunks = []
    current_sentences = []
    current_tokens = 0
    chunk_id = 0

    for sentence in sentences:
        sentence_tokens = count_tokens(sentence)

        # If adding this sentence exceeds max_tokens, finalize chunk
        if current_tokens + sentence_tokens > max_tokens:
            if current_tokens >= min_tokens:
                chunk_text = " ".join(current_sentences)
                chunks.append({
                    "chunk_id": chunk_id,
                    "text": chunk_text,
                    "token_count": current_tokens
                })
                chunk_id += 1

            # Overlap handling
            overlap = []
            overlap_token_count = 0

            for sent in reversed(current_sentences):
                sent_tokens = count_tokens(sent)
                if overlap_token_count + sent_tokens > overlap_tokens:
                    break
                overlap.insert(0, sent)
                overlap_token_count += sent_tokens

            current_sentences = overlap
            current_tokens = overlap_token_count

        current_sentences.append(sentence)
        current_tokens += sentence_tokens

    # Add final chunk
    if current_tokens >= min_tokens:
        chunks.append({
            "chunk_id": chunk_id,
            "text": " ".join(current_sentences),
            "token_count": current_tokens
        })

    return chunks

def chunk_wikipedia_sentences(sentences: List[str]) -> List[Dict]:
    """
    Public API for Module 2.
    """
    return build_chunks(sentences)


In [7]:
chunks = chunk_wikipedia_sentences(sentences)

with open("/content/drive/MyDrive/wiki_chunks.pkl", "wb") as f:
    pickle.dump(chunks, f)

print(f"Saved {len(chunks)} chunks")

Saved 78115 chunks


In [8]:

with open("/content/drive/MyDrive/wiki_chunks.pkl", "rb") as f:
    chunks = pickle.load(f)

print(len(chunks), chunks[0]["token_count"])

78115 589


In [9]:
print(len(chunks))
print(chunks[0]["token_count"])
print(chunks[0]["text"][:300])

78115
589
April
April (Apr.) is the fourth month of the year in the Julian and Gregorian calendars, and comes between March and May. It is one of four months to have 30 days. April always begins on the same day of the week as July, and additionally, January in leap years. April always ends on the same day of 


In [10]:
!pip install sentence-transformers faiss-cpu

Collecting faiss-cpu
  Downloading faiss_cpu-1.13.2-cp310-abi3-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (7.6 kB)
Downloading faiss_cpu-1.13.2-cp310-abi3-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl (23.8 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m23.8/23.8 MB[0m [31m87.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: faiss-cpu
Successfully installed faiss-cpu-1.13.2


In [11]:
from typing import List, Dict, Tuple
import pickle
import numpy as np
import faiss

from sentence_transformers import SentenceTransformer

with open("/content/drive/MyDrive/wiki_chunks.pkl", "rb") as f:
    chunks: List[Dict] = pickle.load(f)

print(f"Loaded {len(chunks)} chunks")


Loaded 78115 chunks


In [12]:
def load_embedding_model() -> SentenceTransformer:
    """
    Load sentence embedding model.
    """
    model = SentenceTransformer("intfloat/e5-large")
    return model


In [13]:
def embed_chunks(
    model: SentenceTransformer,
    chunks: List[Dict],
    batch_size: int = 64
) -> np.ndarray:
    """
    Encode chunk texts into normalized float32 embeddings.
    """

    texts = ["passage: " + c["text"] for c in chunks]

    embeddings = model.encode(
        texts,
        batch_size=batch_size,
        show_progress_bar=True,
        normalize_embeddings=True
    )

    # Ensure FAISS compatibility
    embeddings = np.array(embeddings, dtype="float32")

    return embeddings

In [14]:
def build_faiss_index(embeddings: np.ndarray) -> faiss.Index:
    """
    Build a cosine-similarity FAISS index.
    """
    dim = embeddings.shape[1]
    index = faiss.IndexFlatIP(dim)
    index.add(embeddings)
    return index


In [15]:
# Build everything once
embedding_model = load_embedding_model()
chunk_embeddings = embed_chunks(embedding_model, chunks)
index = build_faiss_index(chunk_embeddings)

# Save to Drive
faiss.write_index(index, "/content/drive/MyDrive/wiki_faiss.index")

with open("/content/drive/MyDrive/wiki_chunks_meta.pkl", "wb") as f:
    pickle.dump(chunks, f)

print("FAISS index and metadata saved")


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


modules.json:   0%|          | 0.00/387 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/57.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/611 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/1.34G [00:00<?, ?B/s]

Loading weights:   0%|          | 0/391 [00:00<?, ?it/s]

BertModel LOAD REPORT from: intfloat/e5-large
Key                     | Status     |  | 
------------------------+------------+--+-
embeddings.position_ids | UNEXPECTED |  | 

Notes:
- UNEXPECTED	:can be ignored when loading from different task/architecture; not ok if you expect identical arch.


tokenizer_config.json:   0%|          | 0.00/385 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/201 [00:00<?, ?B/s]

Batches:   0%|          | 0/1221 [00:00<?, ?it/s]

FAISS index and metadata saved


In [16]:
index = faiss.read_index("/content/drive/MyDrive/wiki_faiss.index")

with open("/content/drive/MyDrive/wiki_chunks_meta.pkl", "rb") as f:
    chunks = pickle.load(f)

embedding_model = load_embedding_model()


Loading weights:   0%|          | 0/391 [00:00<?, ?it/s]

BertModel LOAD REPORT from: intfloat/e5-large
Key                     | Status     |  | 
------------------------+------------+--+-
embeddings.position_ids | UNEXPECTED |  | 

Notes:
- UNEXPECTED	:can be ignored when loading from different task/architecture; not ok if you expect identical arch.


In [17]:
def retrieve_top_k(
    query: str,
    model: SentenceTransformer,
    index: faiss.Index,
    chunks: List[Dict],
    k: int = 3
) -> List[Tuple[Dict, float]]:
    """
    Retrieve top-k most relevant chunks for a query.
    """

    # Prefix required for E5 models
    query_text = "query: " + query

    query_embedding = model.encode(
        [query_text],
        normalize_embeddings=True
    )

    # Ensure float32 for FAISS
    query_embedding = np.array(query_embedding, dtype="float32")

    scores, ids = index.search(query_embedding, k)

    results = []
    for idx, score in zip(ids[0], scores[0]):
        results.append((chunks[idx], float(score)))

    return results

In [18]:
results = retrieve_top_k(
    query="What is gravity?",
    model=embedding_model,
    index=index,
    chunks=chunks,
    k=3
)

for chunk, score in results:
    print(score)
    print(chunk["text"][:200])
    print("-" * 40)


0.8558967709541321
so it seems natural for astronomers to use the term 'satellite' for these as well. Our Local Group is itself part of an even larger group, the Virgo Supercluster. There are other, even larger, groups 
----------------------------------------
0.8500293493270874
For example, a force causes an affected object to be pushed or pulled in a certain direction. This changes the object's momentum. Forces cause objects to accelerate, add to the object's overall pressu
----------------------------------------
0.8426462411880493
The velocity of an object tells you how fast the object changes position, and where it is moving. Velocity is a vector just like position: a car can move "160 kilometers per hour "west"" (100 miles pe
----------------------------------------


In [19]:
!pip install transformers torch



In [20]:
from typing import List, Dict
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM

def load_answerer_model(model_name: str):
    """
    Load a small deterministic language model.
    """
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForCausalLM.from_pretrained(model_name)

    model.eval()
    return tokenizer, model

def build_prompt(question: str, retrieved_chunks: List[Dict]) -> str:
    """
    Build a strict rewriting prompt.
    """
    context = "\n\n".join(chunk["text"] for chunk in retrieved_chunks)

    prompt = (
        "You are an editor.\n"
        "Use ONLY the information in the given text.\n"
        "Do not add facts.\n"
        "If the text does not answer the question, say:\n"
        "\"Not enough information in the Simple Wikipedia dataset.\"\n"
        "Use simple English.\n"
        "Write at most 3 short sentences.\n\n"
        f"Question:\n{question}\n\n"
        f"Text:\n{context}\n\n"
        "Answer:\n"
    )
    return prompt

def build_prompt(question: str, retrieved_chunks: List[Dict]) -> str:
    """
    Build a strict rewriting prompt.
    """
    context = "\n\n".join(chunk["text"] for chunk in retrieved_chunks)

    prompt = (
        "You are an editor.\n"
        "Use ONLY the information in the given text.\n"
        "Do not add facts.\n"
        "If the text does not answer the question, say:\n"
        "\"Not enough information in the Simple Wikipedia dataset.\"\n"
        "Use simple English.\n"
        "Write at most 3 short sentences.\n\n"
        f"Question:\n{question}\n\n"
        f"Text:\n{context}\n\n"
        "Answer:\n"
    )
    return prompt


In [21]:
def generate_raw_answer(
    question: str,
    retrieved_chunks: List[Dict],
    tokenizer,
    model,
    max_new_tokens: int = 100
) -> str:
    """
    Generate a raw rewritten answer.
    """
    prompt = build_prompt(question, retrieved_chunks)

    inputs = tokenizer(prompt, return_tensors="pt")

    with torch.no_grad():
        outputs = model.generate(
            **inputs,
            max_new_tokens=max_new_tokens,
            do_sample=False,
            temperature=0.0,
            top_p=1.0,
            pad_token_id=tokenizer.eos_token_id
        )

    decoded = tokenizer.decode(outputs[0], skip_special_tokens=True)

    # Extract only the answer part
    answer = decoded.split("Answer:")[-1].strip()
    return answer

In [22]:
results = retrieve_top_k(query, model, index, chunks, k=3)
retrieved_chunks = [chunk for chunk, score in results]

NameError: name 'query' is not defined

In [None]:
raw_answer = generate_raw_answer(
    question=query,
    retrieved_chunks=retrieved_chunks,
    tokenizer=tokenizer,
    model=model
)

In [None]:
import re
def remove_parentheses(text: str) -> str:
    """
    Remove all content inside parentheses.
    """
    return re.sub(r"\([^)]*\)", "", text)
def split_sentences_simple(text: str) -> list:
    """
    Split text into sentences using punctuation.
    """
    sentences = re.split(r'(?<=[.!?]) +', text)
    return [s.strip() for s in sentences if s.strip()]
def merge_short_sentences(sentences: list, min_words: int = 5) -> list:
    """
    Merge sentences that are too short into neighbors.
    """
    merged = []
    buffer = ""

    for s in sentences:
        if len(s.split()) < min_words:
            buffer += " " + s
        else:
            if buffer:
                merged.append((buffer + " " + s).strip())
                buffer = ""
            else:
                merged.append(s)

    if buffer:
        merged.append(buffer.strip())

    return merged
def enforce_sentence_limit(sentences: list, max_sentences: int = 3) -> list:
    return sentences[:max_sentences]
def check_refusal_needed(answer: str, retrieved_chunks: list, threshold: float = 0.1) -> bool:
    """
    If too many words in answer are not in retrieved text,
    trigger refusal.
    """
    retrieved_text = " ".join(chunk["text"] for chunk in retrieved_chunks)
    retrieved_words = set(retrieved_text.lower().split())

    answer_words = answer.lower().split()
    unseen = [w for w in answer_words if w not in retrieved_words]

    if len(unseen) / max(len(answer_words), 1) > threshold:
        return True

    return False
def post_process_answer(raw_answer: str, retrieved_chunks: list) -> str:
    """
    Full deterministic cleanup pipeline.
    """

    # Step 1: Remove parentheses
    cleaned = remove_parentheses(raw_answer)

    # Step 2: Sentence split
    sentences = split_sentences_simple(cleaned)

    # Step 3: Merge short fragments
    sentences = merge_short_sentences(sentences)

    # Step 4: Enforce 3 sentence max
    sentences = enforce_sentence_limit(sentences, max_sentences=3)

    final_answer = " ".join(sentences).strip()

    # Step 5: Hallucination check
    if check_refusal_needed(final_answer, retrieved_chunks):
        return "Not enough information in the Simple Wikipedia dataset."

    # Step 6: Empty check
    if not final_answer:
        return "Not enough information in the Simple Wikipedia dataset."

    return final_answer


In [None]:
def retrieve_context(question: str, k: int = 3):
    """
    Retrieve top-k chunks and scores.
    """
    results = retrieve_top_k(
        query=question,
        model=embedding_model,
        index=index,
        chunks=chunks,
        k=k
    )

    retrieved_chunks = [chunk for chunk, score in results]
    scores = [score for chunk, score in results]

    return retrieved_chunks, scores
def answer_question(question: str, k: int = 3) -> dict:
    """
    Full RAG pipeline.
    Returns structured output for evaluation.
    """

    # Step 1: Retrieval
    retrieved_chunks, scores = retrieve_context(question, k=k)

    # Step 2: Raw generation
    raw_answer = generate_raw_answer(
        question=question,
        retrieved_chunks=retrieved_chunks,
        tokenizer=tokenizer,
        model=model
    )

    # Step 3: Post-processing
    final_answer = post_process_answer(
        raw_answer=raw_answer,
        retrieved_chunks=retrieved_chunks
    )

    return {
        "question": question,
        "retrieved_chunks": retrieved_chunks,
        "retrieval_scores": scores,
        "raw_answer": raw_answer,
        "final_answer": final_answer
    }
result = answer_question("What is gravity?")

print("FINAL ANSWER:")
print(result["final_answer"])
