<a href="https://colab.research.google.com/github/Amna-Ghulam-Nabie/Assignment-3/blob/master/IRAssignment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import os
import json
import pickle
import numpy as np
from collections import defaultdict
from typing import List, Dict, Tuple
import re
from pathlib import Path

# Core NLP imports
try:
    from sklearn.feature_extraction.text import TfidfVectorizer
    from sklearn.metrics.pairwise import cosine_similarity
    import nltk
    from nltk.corpus import stopwords
    from nltk.stem import PorterStemmer
    from nltk.tokenize import word_tokenize
except ImportError:
    print("Installing required packages...")
    import subprocess
    subprocess.check_call(["pip", "install", "scikit-learn", "nltk", "rank-bm25"])
    from sklearn.feature_extraction.text import TfidfVectorizer
    from sklearn.metrics.pairwise import cosine_similarity
    import nltk
    from nltk.corpus import stopwords
    from nltk.stem import PorterStemmer
    from nltk.tokenize import word_tokenize

# Download NLTK data
try:
    nltk.data.find('corpora/stopwords')
    nltk.data.find('tokenizers/punkt')
    nltk.data.find('tokenizers/punkt_tab') # Add this line to check for punkt_tab
except LookupError:
    nltk.download('stopwords', quiet=True)
    nltk.download('punkt', quiet=True)
    nltk.download('punkt_tab', quiet=True) # Add this line to download punkt_tab


class TextPreprocessor:
    """Handles text preprocessing and normalization"""

    def __init__(self):
        self.stemmer = PorterStemmer()
        self.stop_words = set(stopwords.words('english'))

    def preprocess(self, text: str, stem: bool = True, remove_stopwords: bool = True) -> str:
        """Preprocess text with tokenization, lowercasing, stemming, and stopword removal"""
        # Lowercase
        text = text.lower()

        # Remove special characters but keep spaces
        text = re.sub(r'[^a-z0-9\s]', ' ', text)

        # Tokenize
        tokens = word_tokenize(text)

        # Remove stopwords
        if remove_stopwords:
            tokens = [t for t in tokens if t not in self.stop_words]

        # Stem
        if stem:
            tokens = [self.stemmer.stem(t) for t in tokens]

        return ' '.join(tokens)

    def tokenize(self, text: str) -> List[str]:
        """Tokenize and return list of tokens"""
        processed = self.preprocess(text)
        return processed.split()


class BM25Retriever:
    """BM25 ranking algorithm implementation"""

    def __init__(self, k1: float = 1.5, b: float = 0.75):
        self.k1 = k1
        self.b = b
        self.corpus_size = 0
        self.avgdl = 0
        self.doc_freqs = []
        self.idf = {}
        self.doc_len = []
        self.tokenized_corpus = []

    def fit(self, corpus: List[str]):
        """Build BM25 index from corpus"""
        preprocessor = TextPreprocessor()
        self.tokenized_corpus = [preprocessor.tokenize(doc) for doc in corpus]
        self.corpus_size = len(corpus)
        self.doc_len = [len(doc) for doc in self.tokenized_corpus]
        self.avgdl = sum(self.doc_len) / self.corpus_size

        # Calculate document frequencies
        df = defaultdict(int)
        for doc in self.tokenized_corpus:
            for word in set(doc):
                df[word] += 1

        # Calculate IDF
        self.idf = {}
        for word, freq in df.items():
            self.idf[word] = np.log((self.corpus_size - freq + 0.5) / (freq + 0.5) + 1)

    def score(self, query: str, doc_id: int) -> float:
        """Calculate BM25 score for a document given a query"""
        preprocessor = TextPreprocessor()
        query_tokens = preprocessor.tokenize(query)
        doc = self.tokenized_corpus[doc_id]
        doc_len = self.doc_len[doc_id]

        score = 0.0
        for token in query_tokens:
            if token not in self.idf:
                continue

            # Term frequency in document
            tf = doc.count(token)

            # BM25 formula
            numerator = tf * (self.k1 + 1)
            denominator = tf + self.k1 * (1 - self.b + self.b * (doc_len / self.avgdl))
            score += self.idf[token] * (numerator / denominator)

        return score

    def search(self, query: str, top_k: int = 10) -> List[Tuple[int, float]]:
        """Search and return top-k documents"""
        scores = [(i, self.score(query, i)) for i in range(self.corpus_size)]
        scores.sort(key=lambda x: x[1], reverse=True)
        return scores[:top_k]


class BooleanRetriever:
    """Boolean retrieval with inverted index"""

    def __init__(self):
        self.inverted_index = defaultdict(set)
        self.doc_count = 0

    def fit(self, corpus: List[str]):
        """Build inverted index"""
        preprocessor = TextPreprocessor()
        self.doc_count = len(corpus)

        for doc_id, doc in enumerate(corpus):
            tokens = preprocessor.tokenize(doc)
            for token in set(tokens):
                self.inverted_index[token].add(doc_id)

    def search(self, query: str, top_k: int = 10) -> List[Tuple[int, float]]:
        """Boolean AND search"""
        preprocessor = TextPreprocessor()
        query_tokens = preprocessor.tokenize(query)

        if not query_tokens:
            return []

        # Start with documents containing first term
        result_docs = self.inverted_index.get(query_tokens[0], set()).copy()

        # AND with remaining terms
        for token in query_tokens[1:]:
            result_docs &= self.inverted_index.get(token, set())

        # Score by number of matching terms (for ranking)
        scores = []
        for doc_id in result_docs:
            score = len(query_tokens)  # All terms matched
            scores.append((doc_id, float(score)))

        scores.sort(key=lambda x: x[1], reverse=True)
        return scores[:top_k]


class TfidfRetriever:
    """TF-IDF based retrieval using cosine similarity"""

    def __init__(self, max_features: int = 10000):
        self.vectorizer = TfidfVectorizer(
            max_features=max_features,
            stop_words='english',
            ngram_range=(1, 2)
        )
        self.tfidf_matrix = None

    def fit(self, corpus: List[str]):
        """Build TF-IDF matrix"""
        self.tfidf_matrix = self.vectorizer.fit_transform(corpus)

    def search(self, query: str, top_k: int = 10) -> List[Tuple[int, float]]:
        """Search using cosine similarity"""
        query_vec = self.vectorizer.transform([query])
        similarities = cosine_similarity(query_vec, self.tfidf_matrix).flatten()

        # Get top-k indices
        top_indices = np.argsort(similarities)[::-1][:top_k]
        scores = [(int(idx), float(similarities[idx])) for idx in top_indices]

        return scores


class HybridRetriever:
    """Combines multiple retrieval strategies"""

    def __init__(self, weights: Dict[str, float] = None):
        self.weights = weights or {'tfidf': 0.4, 'bm25': 0.5, 'boolean': 0.1}
        self.tfidf = TfidfRetriever()
        self.bm25 = BM25Retriever()
        self.boolean = BooleanRetriever()

    def fit(self, corpus: List[str]):
        """Fit all retrievers"""
        print("Building TF-IDF index...")
        self.tfidf.fit(corpus)
        print("Building BM25 index...")
        self.bm25.fit(corpus)
        print("Building Boolean index...")
        self.boolean.fit(corpus)

    def search(self, query: str, top_k: int = 10) -> List[Tuple[int, float]]:
        """Hybrid search combining all strategies"""
        # Get results from each retriever
        tfidf_results = dict(self.tfidf.search(query, top_k=50))
        bm25_results = dict(self.bm25.search(query, top_k=50))
        boolean_results = dict(self.boolean.search(query, top_k=50))

        # Normalize scores to [0, 1]
        def normalize_scores(results):
            if not results:
                return {}
            max_score = max(results.values()) if results else 1
            if max_score == 0:
                return {k: 0 for k in results}
            return {k: v / max_score for k, v in results.items()}

        tfidf_norm = normalize_scores(tfidf_results)
        bm25_norm = normalize_scores(bm25_results)
        boolean_norm = normalize_scores(boolean_results)

        # Combine scores
        all_docs = set(tfidf_norm.keys()) | set(bm25_norm.keys()) | set(boolean_norm.keys())
        combined_scores = []

        for doc_id in all_docs:
            score = (
                self.weights['tfidf'] * tfidf_norm.get(doc_id, 0) +
                self.weights['bm25'] * bm25_norm.get(doc_id, 0) +
                self.weights['boolean'] * boolean_norm.get(doc_id, 0)
            )
            combined_scores.append((doc_id, score))

        combined_scores.sort(key=lambda x: x[1], reverse=True)
        return combined_scores[:top_k]


class IRSystem:
    """Main Information Retrieval System"""

    def __init__(self, retrieval_method: str = 'hybrid'):
        """
        Initialize IR System

        Args:
            retrieval_method: 'tfidf', 'bm25', 'boolean', or 'hybrid'
        """
        self.retrieval_method = retrieval_method
        self.documents = []
        self.doc_ids = []
        self.retriever = None
        self.is_fitted = False

    def load_documents(self, path: str):
        """Load documents from directory or file"""
        path = Path(path)

        if path.is_file():
            # Single file
            with open(path, 'r', encoding='utf-8', errors='ignore') as f:
                content = f.read()
                self.documents.append(content)
                self.doc_ids.append(path.name)

        elif path.is_dir():
            # Directory of files
            for file_path in sorted(path.glob('**/*.txt')):
                try:
                    with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
                        content = f.read()
                        if content.strip():
                            self.documents.append(content)
                            self.doc_ids.append(str(file_path.relative_to(path)))
                except Exception as e:
                    print(f"Error reading {file_path}: {e}")

        print(f"Loaded {len(self.documents)} documents")

    def build_index(self):
        """Build retrieval index"""
        if not self.documents:
            raise ValueError("No documents loaded. Call load_documents() first.")

        print(f"Building index using {self.retrieval_method} method...")

        if self.retrieval_method == 'tfidf':
            self.retriever = TfidfRetriever()
        elif self.retrieval_method == 'bm25':
            self.retriever = BM25Retriever()
        elif self.retrieval_method == 'boolean':
            self.retriever = BooleanRetriever()
        elif self.retrieval_method == 'hybrid':
            self.retriever = HybridRetriever()
        else:
            raise ValueError(f"Unknown retrieval method: {self.retrieval_method}")

        self.retriever.fit(self.documents)
        self.is_fitted = True
        print("Index built successfully!")

    def search(self, query: str, top_k: int = 10) -> List[Dict]:
        """
        Search for documents matching the query

        Returns:
            List of dicts with keys: doc_id, score, content
        """
        if not self.is_fitted:
            raise ValueError("Index not built. Call build_index() first.")

        results = self.retriever.search(query, top_k=top_k)

        output = []
        for doc_idx, score in results:
            output.append({
                'rank': len(output) + 1,
                'doc_id': self.doc_ids[doc_idx],
                'score': score,
                'content': self.documents[doc_idx][:500] + '...' if len(self.documents[doc_idx]) > 500 else self.documents[doc_idx]
            })

        return output

    def save_index(self, filepath: str):
        """Save index to disk"""
        data = {
            'retrieval_method': self.retrieval_method,
            'documents': self.documents,
            'doc_ids': self.doc_ids,
            'retriever': self.retriever
        }
        with open(filepath, 'wb') as f:
            pickle.dump(data, f)
        print(f"Index saved to {filepath}")

    def load_index(self, filepath: str):
        """Load index from disk"""
        with open(filepath, 'rb') as f:
            data = pickle.load(f)

        self.retrieval_method = data['retrieval_method']
        self.documents = data['documents']
        self.doc_ids = data['doc_ids']
        self.retriever = data['retriever']
        self.is_fitted = True
        print(f"Index loaded from {filepath}")


def main():
    """Example usage"""
    # Create a dummy 'documents' directory and files if they don't exist
    docs_dir = Path('documents')
    if not docs_dir.exists():
        docs_dir.mkdir()
        with open(docs_dir / 'doc1.txt', 'w') as f:
            f.write("This is the first document. It talks about information retrieval.")
        with open(docs_dir / 'doc2.txt', 'w') as f:
            f.write("The second document discusses search engines and ranking algorithms.")
        with open(docs_dir / 'doc3.txt', 'w') as f:
            f.write("Information technology is a vast field. Retrieval of data is crucial.")
        print("Created dummy 'documents' directory with example files.")

    # Initialize system
    ir_system = IRSystem(retrieval_method='hybrid')

    # Load documents
    # Replace 'documents' with your actual document directory
    ir_system.load_documents('documents')

    # Build index
    ir_system.build_index()

    # Save index (optional)
    ir_system.save_index('ir_index.pkl')

    # Search
    query = input("Enter search query: ")
    results = ir_system.search(query, top_k=5)

    print(f"\n{'='*80}")
    print(f"Top {len(results)} results for query: '{query}'")
    print(f"{'='*80}\n")

    for result in results:
        print(f"Rank: {result['rank']}")
        print(f"Document: {result['doc_id']}")
        print(f"Score: {result['score']:.4f}")
        print(f"Content: {result['content']}")
        print(f"{'-'*80}\n")


if __name__ == '__main__':
    main()

Created dummy 'documents' directory with example files.
Loaded 3 documents
Building index using hybrid method...
Building TF-IDF index...
Building BM25 index...
Building Boolean index...
Index built successfully!
Index saved to ir_index.pkl
Enter search query: Amna is a Student

Top 3 results for query: 'Amna is a Student'

Rank: 1
Document: doc1.txt
Score: 0.0000
Content: This is the first document. It talks about information retrieval.
--------------------------------------------------------------------------------

Rank: 2
Document: doc2.txt
Score: 0.0000
Content: The second document discusses search engines and ranking algorithms.
--------------------------------------------------------------------------------

Rank: 3
Document: doc3.txt
Score: 0.0000
Content: Information technology is a vast field. Retrieval of data is crucial.
--------------------------------------------------------------------------------



In [2]:
import os
import json
import pickle
import numpy as np
from collections import defaultdict
from typing import List, Dict, Tuple
import re
from pathlib import Path

# NLP imports
try:
    from sklearn.feature_extraction.text import TfidfVectorizer
    from sklearn.metrics.pairwise import cosine_similarity
    import nltk
    from nltk.corpus import stopwords
    from nltk.stem import PorterStemmer
    from nltk.tokenize import word_tokenize
except ImportError:
    import subprocess
    subprocess.check_call(["pip", "install", "scikit-learn", "nltk", "rank-bm25"])
    from sklearn.feature_extraction.text import TfidfVectorizer
    from sklearn.metrics.pairwise import cosine_similarity
    import nltk
    from nltk.corpus import stopwords
    from nltk.stem import PorterStemmer
    from nltk.tokenize import word_tokenize


# Download NLTK
try:
    nltk.data.find('corpora/stopwords')
    nltk.data.find('tokenizers/punkt')
    nltk.data.find('tokenizers/punkt_tab')
except LookupError:
    nltk.download('stopwords', quiet=True)
    nltk.download('punkt', quiet=True)
    nltk.download('punkt_tab', quiet=True)


# ---------------------------------------------------------
# TEXT NORMALIZER
# ---------------------------------------------------------
class TextCleaner:
    """Handles all text cleaning operations"""

    def __init__(self):
        self.word_stemmer = PorterStemmer()
        self.stopword_list = set(stopwords.words('english'))

    def clean(self, text_input: str, apply_stem: bool = True, remove_stop: bool = True) -> str:
        text_input = text_input.lower()
        text_input = re.sub(r'[^a-z0-9\s]', ' ', text_input)
        token_list = word_tokenize(text_input)

        if remove_stop:
            token_list = [t for t in token_list if t not in self.stopword_list]

        if apply_stem:
            token_list = [self.word_stemmer.stem(t) for t in token_list]

        return ' '.join(token_list)

    def tokens(self, text_input: str) -> List[str]:
        cleaned = self.clean(text_input)
        return cleaned.split()


# ---------------------------------------------------------
# BM25 RANKER
# ---------------------------------------------------------
class BM25Ranker:
    """BM25 Ranking Implementation"""

    def __init__(self, k1: float = 1.5, b: float = 0.75):
        self.k1_val = k1
        self.b_val = b
        self.doc_total = 0
        self.avg_len = 0
        self.doc_frequencies = []
        self.idf_map = {}
        self.length_list = []
        self.token_docs = []

    def build(self, corpus_list: List[str]):
        cleaner = TextCleaner()
        self.token_docs = [cleaner.tokens(doc) for doc in corpus_list]
        self.doc_total = len(corpus_list)
        self.length_list = [len(doc) for doc in self.token_docs]
        self.avg_len = sum(self.length_list) / self.doc_total

        df_counter = defaultdict(int)
        for doc in self.token_docs:
            for word in set(doc):
                df_counter[word] += 1

        self.idf_map = {}
        for word, freq in df_counter.items():
            self.idf_map[word] = np.log((self.doc_total - freq + 0.5) /
                                        (freq + 0.5) + 1)

    def score_doc(self, query_str: str, doc_index: int) -> float:
        cleaner = TextCleaner()
        q_tokens = cleaner.tokens(query_str)
        document = self.token_docs[doc_index]
        doc_len = self.length_list[doc_index]

        score_total = 0.0

        for tok in q_tokens:
            if tok not in self.idf_map:
                continue

            tf_value = document.count(tok)
            numerator = tf_value * (self.k1_val + 1)
            denominator = tf_value + self.k1_val * (
                    1 - self.b_val + self.b_val * (doc_len / self.avg_len)
            )
            score_total += self.idf_map[tok] * (numerator / denominator)

        return score_total

    def top(self, query_str: str, top_n: int = 10):
        results = [(i, self.score_doc(query_str, i)) for i in range(self.doc_total)]
        results.sort(key=lambda x: x[1], reverse=True)
        return results[:top_n]


# ---------------------------------------------------------
# BOOLEAN RANKER
# ---------------------------------------------------------
class BooleanRanker:
    """Boolean Retrieval Engine"""

    def __init__(self):
        self.inv_map = defaultdict(set)
        self.total_docs = 0

    def build(self, doc_list: List[str]):
        cleaner = TextCleaner()
        self.total_docs = len(doc_list)

        for d_id, doc in enumerate(doc_list):
            tokens = cleaner.tokens(doc)
            for word in set(tokens):
                self.inv_map[word].add(d_id)

    def top(self, query_str: str, top_n: int = 10):
        cleaner = TextCleaner()
        q_tokens = cleaner.tokens(query_str)

        if not q_tokens:
            return []

        doc_set = self.inv_map.get(q_tokens[0], set()).copy()

        for tok in q_tokens[1:]:
            doc_set &= self.inv_map.get(tok, set())

        ranked = [(idx, float(len(q_tokens))) for idx in doc_set]
        ranked.sort(key=lambda x: x[1], reverse=True)

        return ranked[:top_n]


# ---------------------------------------------------------
# TF-IDF RANKER
# ---------------------------------------------------------
class TfidfRanker:
    """TF-IDF Search Engine"""

    def __init__(self, max_feats: int = 10000):
        self.vec = TfidfVectorizer(
            max_features=max_feats,
            stop_words='english',
            ngram_range=(1, 2)
        )
        self.matrix = None

    def build(self, doc_list: List[str]):
        self.matrix = self.vec.fit_transform(doc_list)

    def top(self, query_str: str, top_n: int = 10):
        qry = self.vec.transform([query_str])
        sims = cosine_similarity(qry, self.matrix).flatten()

        idxs = np.argsort(sims)[::-1][:top_n]
        return [(int(i), float(sims[i])) for i in idxs]


# ---------------------------------------------------------
# HYBRID RANKER
# ---------------------------------------------------------
class MixedRanker:
    """Weighted Hybrid Combination Retriever"""

    def __init__(self, weight_map=None):
        self.weight_map = weight_map or {'tfidf': 0.4, 'bm25': 0.5, 'bool': 0.1}
        self.tf_module = TfidfRanker()
        self.bm_module = BM25Ranker()
        self.bool_module = BooleanRanker()

    def build(self, docs):
        print("→ Building TF-IDF model...")
        self.tf_module.build(docs)
        print("→ Building BM25 model...")
        self.bm_module.build(docs)
        print("→ Building Boolean model...")
        self.bool_module.build(docs)

    def top(self, q, k=10):
        tf_res = dict(self.tf_module.top(q, top_n=50))
        bm_res = dict(self.bm_module.top(q, top_n=50))
        bl_res = dict(self.bool_module.top(q, top_n=50))

        def norm(scores):
            if not scores:
                return {}
            m = max(scores.values())
            if m == 0:
                return {x: 0 for x in scores}
            return {d: v / m for d, v in scores.items()}

        tf_norm = norm(tf_res)
        bm_norm = norm(bm_res)
        bl_norm = norm(bl_res)

        all_ids = set(tf_norm.keys()) | set(bm_norm.keys()) | set(bl_norm.keys())
        final_scores = []

        for doc_id in all_ids:
            final_val = (
                self.weight_map['tfidf'] * tf_norm.get(doc_id, 0) +
                self.weight_map['bm25'] * bm_norm.get(doc_id, 0) +
                self.weight_map['bool'] * bl_norm.get(doc_id, 0)
            )
            final_scores.append((doc_id, final_val))

        final_scores.sort(key=lambda x: x[1], reverse=True)
        return final_scores[:k]


# ---------------------------------------------------------
# MAIN IR SYSTEM
# ---------------------------------------------------------
class RetrievalCore:
    """Main IR System Controller"""

    def __init__(self, mode: str = 'hybrid'):
        self.mode = mode
        self.raw_docs = []
        self.file_ids = []
        self.engine = None
        self.ready = False

    def ingest(self, folder_path):
        folder_path = Path(folder_path)

        if folder_path.is_file():
            content = folder_path.read_text(encoding='utf-8', errors='ignore')
            self.raw_docs.append(content)
            self.file_ids.append(folder_path.name)

        elif folder_path.is_dir():
            for f in sorted(folder_path.glob('**/*.txt')):
                try:
                    txt = f.read_text(encoding='utf-8', errors='ignore')
                    if txt.strip():
                        self.raw_docs.append(txt)
                        self.file_ids.append(str(f.relative_to(folder_path)))
                except:
                    pass

        print(f"Loaded {len(self.raw_docs)} files.")

    def create_index(self):
        if not self.raw_docs:
            raise ValueError("No documents found.")

        print(f"Using search mode: {self.mode}")

        if self.mode == 'tfidf':
            self.engine = TfidfRanker()
        elif self.mode == 'bm25':
            self.engine = BM25Ranker()
        elif self.mode == 'boolean':
            self.engine = BooleanRanker()
        else:
            self.engine = MixedRanker()

        self.engine.build(self.raw_docs)
        self.ready = True
        print("Index built successfully.")

    def query(self, q, k=10):
        if not self.ready:
            raise ValueError("Index not initialized.")

        results = self.engine.top(q, k)
        output = []

        for idx, score_val in results:
            output.append({
                "rank": len(output) + 1,
                "file": self.file_ids[idx],
                "score": score_val,
                "preview": self.raw_docs[idx][:500] + "..."
            })

        return output


# ---------------------------------------------------------
# RUNNER
# ---------------------------------------------------------
def main():
    data_path = Path("documents")
    if not data_path.exists():
        data_path.mkdir()
        (data_path / "doc1.txt").write_text("This document is about information retrieval.")
        (data_path / "doc2.txt").write_text("This text explains search engines and ranking models.")
        (data_path / "doc3.txt").write_text("Data retrieval is important in many fields.")

    system = RetrievalCore(mode="hybrid")
    system.ingest("documents")
    system.create_index()

    query_text = input("Enter your query: ")
    results = system.query(query_text, k=5)

    print("\n======================= RESULTS =======================")
    for res in results:
        print(f"Rank: {res['rank']}")
        print(f"File: {res['file']}")
        print(f"Score: {res['score']:.4f}")
        print(f"Preview: {res['preview']}")
        print("------------------------------------------------------")


if __name__ == "__main__":
    main()


Loaded 3 files.
Using search mode: hybrid
→ Building TF-IDF model...
→ Building BM25 model...
→ Building Boolean model...
Index built successfully.
Enter your query: Information Reterival

Rank: 1
File: doc1.txt
Score: 0.9000
Preview: This is the first document. It talks about information retrieval....
------------------------------------------------------
Rank: 2
File: doc3.txt
Score: 0.7069
Preview: Information technology is a vast field. Retrieval of data is crucial....
------------------------------------------------------
Rank: 3
File: doc2.txt
Score: 0.0000
Preview: The second document discusses search engines and ranking algorithms....
------------------------------------------------------
