<a href="https://colab.research.google.com/github/RahafSobh/RahafSobh/blob/main/hw1_informationRetrival.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import re, math, json, random, collections, pathlib, string, os, sys, time
from dataclasses import dataclass
from typing import List, Dict, Tuple, Iterable, Set
from collections import Counter, defaultdict

In [2]:
ARTIFACTS_DIR = pathlib.Path("artifacts")
ARTIFACTS_DIR.mkdir(exist_ok=True)

def normalize_text(s: str) -> str:
    s = s.lower()
    s = s.replace("’","'").replace("‘","'").replace("—","-").replace("–","-")
    s = re.sub(r"[^a-z0-9'\s-]", " ", s)
    s = re.sub(r"\s+", " ", s).strip()
    return s

def tokenize(s: str) -> List[str]:
    s = normalize_text(s)
    tokens = re.findall(r"[a-z0-9]+(?:'[a-z0-9]+)?", s)
    return tokens

print("Ready. Artifacts:", ARTIFACTS_DIR.resolve())


Ready. Artifacts: /content/artifacts


In [3]:
#@title Porter Stemmer (from scratch)
vowels = set("aeiou")
def is_consonant(word, i):
    ch = word[i]
    if ch in vowels: return False
    if ch == 'y':
        return i == 0 or not is_consonant(word, i-1)
    return True

def measure(word):
    prev_c = None
    m = 0
    for i,ch in enumerate(word):
        c = is_consonant(word, i)
        if prev_c is None:
            prev_c = c
        elif prev_c and not c:
            m += 1
            prev_c = False
        else:
            prev_c = c
    return m

def contains_vowel(stem):
    return any(not is_consonant(stem, i) for i in range(len(stem)))

def ends_with_double_consonant(word):
    return len(word) >= 2 and word[-1] == word[-2] and is_consonant(word, len(word)-1)

def ends_cvc(word):
    if len(word) < 3: return False
    c1 = is_consonant(word, -1)
    v = not is_consonant(word, -2)
    c0 = is_consonant(word, -3)
    if c0 and v and c1:
        return word[-1] not in "wxy"
    return False

def porter_stem(w: str) -> str:
    word = w
    if len(word) <= 2: return word

    # Step 1a
    if word.endswith("sses"):
        word = word[:-2]
    elif word.endswith("ies"):
        word = word[:-2]
    elif word.endswith("ss"):
        pass
    elif word.endswith("s"):
        word = word[:-1]

    # Step 1b
    if word.endswith("eed"):
        stem = word[:-3]
        if measure(stem) > 0:
            word = stem + "ee"
    else:
        did = False
        if word.endswith("ed"):
            stem = word[:-2]
            if contains_vowel(stem):
                word = stem; did = True
        elif word.endswith("ing"):
            stem = word[:-3]
            if contains_vowel(stem):
                word = stem; did = True
        if did:
            if word.endswith(("at","bl","iz")):
                word += "e"
            elif ends_with_double_consonant(word) and word[-1] not in "lsz":
                word = word[:-1]
            elif measure(word) == 1 and ends_cvc(word):
                word += "e"

    # Step 1c
    if word.endswith("y"):
        stem = word[:-1]
        if contains_vowel(stem):
            word = stem + "i"

    # Step 2
    step2 = {
        "ational":"ate", "tional":"tion", "enci":"ence", "anci":"ance", "izer":"ize",
        "abli":"able", "alli":"al", "entli":"ent", "eli":"e", "ousli":"ous",
        "ization":"ize", "ation":"ate", "ator":"ate", "alism":"al", "iveness":"ive",
        "fulness":"ful", "ousness":"ous", "aliti":"al", "iviti":"ive", "biliti":"ble", "logi":"log"
    }
    for suf, rep in step2.items():
        if word.endswith(suf) and measure(word[:-len(suf)]) > 0:
            word = word[:-len(suf)] + rep
            break

    # Step 3
    step3 = {
        "icate":"ic","ative":"", "alize":"al","iciti":"ic","ical":"ic",
        "ful":"", "ness":""
    }
    for suf, rep in step3.items():
        if word.endswith(suf) and measure(word[:-len(suf)]) > 0:
            word = word[:-len(suf)] + rep
            break

    # Step 4
    step4 = ("al","ance","ence","er","ic","able","ible","ant","ement","ment","ent",
             "ion","ou","ism","ate","iti","ous","ive","ize")
    for suf in step4:
        if word.endswith(suf):
            stem = word[:-len(suf)]
            if suf == "ion":
                if measure(stem) > 1 and (stem.endswith("s") or stem.endswith("t")):
                    word = stem
                    break
            else:
                if measure(stem) > 1:
                    word = stem
                    break

    # Step 5
    if word.endswith("e"):
        stem = word[:-1]
        m = measure(stem)
        if m > 1 or (m == 1 and not ends_cvc(stem)):
            word = stem
    if measure(word) > 1 and ends_with_double_consonant(word) and word.endswith("l"):
        word = word[:-1]

    return word

# quick demo
for w in ["caresses","ponies","ties","caress","cats","feed","agreed","plastered","bled",
          "motoring","sing","conflated","troubled","size","hopping","tanned","falling",
          "hissing","fizzed","fairly","happiness","relational"]:
    print(w, "->", porter_stem(w))

caresses -> caress
ponies -> poni
ties -> ti
caress -> caress
cats -> cat
feed -> feed
agreed -> agreed
plastered -> plaster
bled -> bled
motoring -> motor
sing -> sing
conflated -> conflat
troubled -> troubl
size -> size
hopping -> hop
tanned -> tan
falling -> fall
hissing -> hiss
fizzed -> fizz
fairly -> fairli
happiness -> happi
relational -> relat


In [4]:
#@title Build Inverted Index for a folder of .txt files
DOCS_DIR = "docs"  #@param {type:"string"}
USE_PORTER_STEMMER = True  #@param {type:"boolean"}
USE_STOPWORDS = True  #@param {type:"boolean"}

DEFAULT_STOPWORDS = set("""
the of and to a in that is it for on as with was were be by at are from or an this which
""".split())

def preprocess_tokens(text: str) -> List[str]:
    toks = tokenize(text)
    if USE_STOPWORDS:
        toks = [t for t in toks if t not in DEFAULT_STOPWORDS]
    if USE_PORTER_STEMMER:
        toks = [porter_stem(t) for t in toks]
    return toks

@dataclass
class InvertedIndex:
    postings: Dict[str, Dict[str, int]]  # term -> doc_id -> tf
    df: Dict[str, int]
    idf: Dict[str, float]
    doc_lengths: Dict[str, float]  # for cosine normalization
    vocab: Set[str]

def build_inverted_index(root: str) -> InvertedIndex:
    postings = defaultdict(lambda: defaultdict(int))
    doc_lengths = {}
    vocab = set()
    doc_files = []
    root_path = pathlib.Path(root)
    if not root_path.exists():
        root_path.mkdir(parents=True, exist_ok=True)
        (root_path/"doc1.txt").write_text("Cats like milk. Cats chase mice.")
        (root_path/"doc2.txt").write_text("Dogs chase cats and love bones.")
    for p in root_path.glob("*.txt"):
        doc_files.append(p)

    for p in doc_files:
        text = p.read_text(encoding="utf-8", errors="ignore")
        toks = preprocess_tokens(text)
        for t in toks:
            postings[t][p.name] += 1
            vocab.add(t)

    N = len(doc_files)
    df = {t: len(docs) for t,docs in postings.items()}
    idf = {t: math.log((N + 1) / (df_t + 1)) + 1.0 for t,df_t in df.items()}  # smoothed
    # compute doc vector lengths
    for p in doc_files:
        name = p.name
        s = 0.0
        for t, d in postings.items():
            tf = d.get(name, 0)
            if tf:
                s += (tf * idf[t])**2
        doc_lengths[name] = math.sqrt(s) if s>0 else 1.0

    idx = InvertedIndex(dict(postings), df, idf, doc_lengths, vocab)
    (ARTIFACTS_DIR / "inverted_index.json").write_text(json.dumps({
        "postings": {k: dict(v) for k,v in idx.postings.items()},
        "df": idx.df, "idf": idx.idf, "doc_lengths": idx.doc_lengths
    }, indent=2))
    print(f"Indexed {N} docs, vocab size={len(vocab)}; saved to artifacts/inverted_index.json")
    return idx

index = build_inverted_index(DOCS_DIR)
print("Sample vocab:", sorted(list(index.vocab))[:20])

Indexed 2 docs, vocab size=8; saved to artifacts/inverted_index.json
Sample vocab: ['bone', 'cat', 'chase', 'dog', 'like', 'love', 'mice', 'milk']


In [5]:
#@title Retrieval functions + demo search
QUERY_USE_BOOLEAN = False  #@param {type:"boolean"}
BOOLEAN_MODE = "AND"  #@param ["AND","OR"]
TOP_K = 5  #@param {type:"integer"}

def preprocess_query(q: str) -> List[str]:
    toks = preprocess_tokens(q)
    return toks

def search_vector_space(idx: InvertedIndex, query: str, k=5) -> List[Tuple[str, float]]:
    q_toks = preprocess_query(query)
    if not q_toks: return []
    q_tf = Counter(q_toks)
    q_vec = {}
    for t, tf in q_tf.items():
        if t in idx.idf:
            q_vec[t] = tf * idx.idf[t]
    q_norm = math.sqrt(sum(v*v for v in q_vec.values())) or 1.0

    scores = defaultdict(float)
    for t, q_w in q_vec.items():
        if t not in idx.postings: continue
        for doc, tf in idx.postings[t].items():
            scores[doc] += q_w * (tf * idx.idf[t])

    results = []
    for doc, num in scores.items():
        denom = (idx.doc_lengths.get(doc,1.0) * q_norm) or 1.0
        results.append((doc, num/denom))
    results.sort(key=lambda x: x[1], reverse=True)
    return results[:k]

def search_boolean(idx: InvertedIndex, query: str, mode="AND") -> List[str]:
    q_toks = preprocess_query(query)
    if not q_toks: return []
    sets = []
    for t in q_toks:
        docs = set(idx.postings.get(t,{}).keys())
        sets.append(docs)
    if not sets: return []
    if mode == "AND":
        s = set.intersection(*sets) if sets else set()
    else:
        s = set.union(*sets) if sets else set()
    return sorted(s)

query = "cats chase"  #@param {type:"string"}
if QUERY_USE_BOOLEAN:
    print("Boolean", BOOLEAN_MODE, "→", search_boolean(index, query, BOOLEAN_MODE))
else:
    print("Vector space results:")
    for doc,score in search_vector_space(index, query, TOP_K):
        print(f"{doc:20s}  score={score:.4f}")

Vector space results:
doc1.txt              score=0.6418
doc2.txt              score=0.5023


In [7]:
# =========================
# Option 4 — Generate Stop-List (Top-50) from 10 random English Wikipedia pages

import time
from collections import Counter
import pathlib
import requests
import random

ARTIFACTS_DIR = pathlib.Path("artifacts"); ARTIFACTS_DIR.mkdir(exist_ok=True)

N_PAGES = 10
SLEEP_BETWEEN = 0.6
MAX_ATTEMPTS = 50
USE_PORTER_IN_STOPLIST = False
FALLBACK_TO_LOCAL_DOCS = True   # if Wikipedia blocked, derive top-50 from your local docs

UA = "IR-Student-Stoplist/1.0 (contact: you@example.com)"
session = requests.Session()
session.headers.update({"User-Agent": UA, "Accept": "application/json"})

def fetch_random_text_rest():

    r = session.get("https://en.wikipedia.org/api/rest_v1/page/random/summary", timeout=15)
    r.raise_for_status()
    j = r.json()

    txt = (j.get("extract") or "") + "\n" + (j.get("description") or "")
    return txt.strip()

def fetch_random_text_action():

    r = session.get("https://en.wikipedia.org/w/api.php", params={
        "action": "query", "format": "json", "list": "random", "rnnamespace": 0, "rnlimit": 1
    }, timeout=15)
    r.raise_for_status()
    pageid = r.json()["query"]["random"][0]["id"]
    r2 = session.get("https://en.wikipedia.org/w/api.php", params={
        "action": "query", "format": "json", "prop": "extracts", "explaintext": 1, "pageids": pageid
    }, timeout=15)
    r2.raise_for_status()
    page = r2.json()["query"]["pages"][str(pageid)]
    return page.get("extract", "").strip()

def fetch_random_wikipedia_plaintext():
    # Try REST first; if it fails, try Action API
    try:
        txt = fetch_random_text_rest()
        if txt: return txt
    except Exception as e:
        pass
    # fallback to Action API
    txt = fetch_random_text_action()
    return txt

texts = []
attempts = 0
while len(texts) < N_PAGES and attempts < MAX_ATTEMPTS:
    attempts += 1
    try:
        t = fetch_random_wikipedia_plaintext()
        if t and len(t.split()) > 50:
            texts.append(t)
            print(f"Fetched page {len(texts)}/{N_PAGES}, words ~{len(t.split())}")
        else:
            print("Got very short text, retrying…")
    except requests.HTTPError as e:

        print(f"HTTP error ({e.response.status_code}), backing off…")
        time.sleep(1.5 + random.random())
    except Exception as e:
        print("Error fetching page:", repr(e))
    time.sleep(SLEEP_BETWEEN + random.random()*0.4)

if len(texts) < N_PAGES:
    print(f"\nFetched only {len(texts)}/{N_PAGES} pages after {attempts} attempts.")
    if FALLBACK_TO_LOCAL_DOCS:
        print("Falling back to building a stop-list from local docs/*.txt")
        import glob, io
        docs_text = []
        for p in glob.glob("docs/*.txt"):
            try:
                with open(p, "r", encoding="utf-8", errors="ignore") as f:
                    docs_text.append(f.read())
            except:
                pass
        if not docs_text:
            raise RuntimeError("No Wikipedia text and no local docs found. Add some files to docs/ or re-run later.")
        texts = docs_text


all_tokens = []
for t in texts:
    toks = tokenize(t)
    if USE_PORTER_IN_STOPLIST:
        toks = [porter_stem(x) for x in toks]
    all_tokens.extend(toks)

counts = Counter(all_tokens)
top50 = counts.most_common(50)

print("\nTop 50 words:")
for w,c in top50:
    print(f"{w:20s} {c}")

stoplist_path = ARTIFACTS_DIR / "top_50_stoplist.txt"
with open(stoplist_path, "w", encoding="utf-8") as f:
    for w,_ in top50:
        f.write(w+"\n")
print("\nSaved:", stoplist_path.resolve())


Fetched page 1/10, words ~67
Fetched page 2/10, words ~70
Fetched page 3/10, words ~52
Got very short text, retrying…
Got very short text, retrying…
Got very short text, retrying…
Fetched page 4/10, words ~61
Got very short text, retrying…
Got very short text, retrying…
Fetched page 5/10, words ~118
Fetched page 6/10, words ~67
Got very short text, retrying…
Fetched page 7/10, words ~62
Got very short text, retrying…
Fetched page 8/10, words ~102
Got very short text, retrying…
Got very short text, retrying…
Got very short text, retrying…
Fetched page 9/10, words ~66
Fetched page 10/10, words ~83

Top 50 words:
the                  67
and                  25
in                   23
of                   23
is                   15
a                    15
by                   14
as                   9
with                 8
to                   7
video                7
an                   6
game                 6
was                  6
who                  5
it                   5
on     