In [1]:
import pandas as pd
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import re
import string
from collections import defaultdict
import warnings
warnings.filterwarnings('ignore')

In [2]:
class RocchioInformationRetrieval:
    def __init__(self, csv_path, alpha=1.0, beta=0.75, gamma=0.15):
        """
        Initialize Rocchio Information Retrieval System
        
        Parameters:
        - csv_path: path to the CSV file
        - alpha: weight for original query (default: 1.0)
        - beta: weight for relevant documents (default: 0.75)
        - gamma: weight for non-relevant documents (default: 0.15)
        """
        self.csv_path = csv_path
        self.alpha = alpha
        self.beta = beta
        self.gamma = gamma
        
        # Load and preprocess data
        self.df = self.load_data()
        self.documents = self.preprocess_documents()
        
        # Initialize TF-IDF vectorizer
        self.vectorizer = TfidfVectorizer(
            max_features=5000,
            stop_words='english',
            lowercase=True,
            ngram_range=(1, 2)
        )
        
        # Fit vectorizer on documents
        self.doc_vectors = self.vectorizer.fit_transform(self.documents)
        
        print(f"Loaded {len(self.df)} documents")
        print(f"TF-IDF matrix shape: {self.doc_vectors.shape}")
    
    def load_data(self):
        """Load CSV data"""
        try:
            df = pd.read_csv(self.csv_path)
            # Handle missing values
            df['Book-Title'] = df['Book-Title'].fillna('')
            df['Book-Author'] = df['Book-Author'].fillna('')
            df['description'] = df['description'].fillna('')
            return df
        except Exception as e:
            print(f"Error loading data: {e}")
            return None
    
    def preprocess_text(self, text):
        """Preprocess text: lowercase, remove punctuation, etc."""
        if pd.isna(text) or text == '':
            return ''
        
        # Convert to lowercase
        text = str(text).lower()
        
        # Remove punctuation
        text = text.translate(str.maketrans('', '', string.punctuation))
        
        # Remove extra whitespace
        text = ' '.join(text.split())
        
        return text
    
    def preprocess_documents(self):
        """Combine and preprocess document fields"""
        documents = []
        for idx, row in self.df.iterrows():
            # Combine title, author, and description
            combined_text = f"{row['Book-Title']} {row['Book-Author']} {row['description']}"
            processed_text = self.preprocess_text(combined_text)
            documents.append(processed_text)
        
        return documents
    
    def search(self, query, top_k=10):
        """Initial search without relevance feedback"""
        # Preprocess query
        processed_query = self.preprocess_text(query)
        
        # Transform query to TF-IDF vector
        query_vector = self.vectorizer.transform([processed_query])
        
        # Calculate cosine similarity
        similarities = cosine_similarity(query_vector, self.doc_vectors).flatten()
        
        # Get top-k results
        top_indices = np.argsort(similarities)[::-1][:top_k]
        
        results = []
        for idx in top_indices:
            results.append({
                'index': idx,
                'title': self.df.iloc[idx]['Book-Title'],
                'author': self.df.iloc[idx]['Book-Author'],
                'description': self.df.iloc[idx]['description'][:200] + '...' if len(str(self.df.iloc[idx]['description'])) > 200 else self.df.iloc[idx]['description'],
                'score': similarities[idx]
            })
        
        return results, query_vector
    
    def rocchio_feedback(self, original_query_vector, relevant_docs, non_relevant_docs):
        """Apply Rocchio algorithm for query expansion"""
        
        # Start with original query
        new_query_vector = self.alpha * original_query_vector
        
        # Add relevant documents
        if relevant_docs:
            relevant_vectors = self.doc_vectors[relevant_docs]
            centroid_relevant = np.mean(relevant_vectors, axis=0)
            new_query_vector += self.beta * centroid_relevant
        
        # Subtract non-relevant documents
        if non_relevant_docs:
            non_relevant_vectors = self.doc_vectors[non_relevant_docs]
            centroid_non_relevant = np.mean(non_relevant_vectors, axis=0)
            new_query_vector -= self.gamma * centroid_non_relevant
        
        return new_query_vector
    
    def search_with_feedback(self, query, relevant_docs=None, non_relevant_docs=None, top_k=10):
        """Search with Rocchio relevance feedback"""
        # Initial search
        initial_results, original_query_vector = self.search(query, top_k)
        
        # If no feedback provided, return initial results
        if not relevant_docs and not non_relevant_docs:
            return initial_results
        
        # Apply Rocchio feedback
        new_query_vector = self.rocchio_feedback(
            original_query_vector, 
            relevant_docs, 
            non_relevant_docs
        )
        
        # Calculate similarities with new query vector
        similarities = cosine_similarity(new_query_vector, self.doc_vectors).flatten()
        
        # Get top-k results
        top_indices = np.argsort(similarities)[::-1][:top_k]
        
        results = []
        for idx in top_indices:
            results.append({
                'index': idx,
                'title': self.df.iloc[idx]['Book-Title'],
                'author': self.df.iloc[idx]['Book-Author'],
                'description': self.df.iloc[idx]['description'][:200] + '...' if len(str(self.df.iloc[idx]['description'])) > 200 else self.df.iloc[idx]['description'],
                'score': similarities[idx]
            })
        
        return results

In [3]:
class EvaluationMetrics:
    """Class untuk menghitung evaluation metrics"""
    
    @staticmethod
    def precision_at_k(retrieved_docs, relevant_docs, k=5):
        """Calculate Precision@K"""
        if k == 0:
            return 0.0
        
        retrieved_k = retrieved_docs[:k]
        relevant_retrieved = len(set(retrieved_k) & set(relevant_docs))
        return relevant_retrieved / k
    
    @staticmethod
    def recall_at_k(retrieved_docs, relevant_docs, k=5):
        """Calculate Recall@K"""
        if len(relevant_docs) == 0:
            return 0.0
        
        retrieved_k = retrieved_docs[:k]
        relevant_retrieved = len(set(retrieved_k) & set(relevant_docs))
        return relevant_retrieved / len(relevant_docs)
    
    @staticmethod
    def average_precision(retrieved_docs, relevant_docs):
        """Calculate Average Precision"""
        if len(relevant_docs) == 0:
            return 0.0
        
        precision_scores = []
        relevant_count = 0
        
        for i, doc in enumerate(retrieved_docs):
            if doc in relevant_docs:
                relevant_count += 1
                precision_at_i = relevant_count / (i + 1)
                precision_scores.append(precision_at_i)
        
        if precision_scores:
            return sum(precision_scores) / len(relevant_docs)
        return 0.0
    
    @staticmethod
    def map_at_k(queries_results, k=5):
        """Calculate Mean Average Precision@K"""
        if not queries_results:
            return 0.0
        
        ap_scores = []
        for retrieved_docs, relevant_docs in queries_results:
            retrieved_k = retrieved_docs[:k]
            ap = EvaluationMetrics.average_precision(retrieved_k, relevant_docs)
            ap_scores.append(ap)
        
        return sum(ap_scores) / len(ap_scores)
    
    @staticmethod
    def dcg_at_k(retrieved_docs, relevant_docs, k=5):
        """Calculate Discounted Cumulative Gain@K"""
        dcg = 0.0
        for i, doc in enumerate(retrieved_docs[:k]):
            if doc in relevant_docs:
                # Assuming binary relevance (1 if relevant, 0 if not)
                dcg += 1.0 / np.log2(i + 2)  # i+2 because log2(1) = 0
        return dcg
    
    @staticmethod
    def ndcg_at_k(retrieved_docs, relevant_docs, k=5):
        """Calculate Normalized Discounted Cumulative Gain@K"""
        if len(relevant_docs) == 0:
            return 0.0
        
        # Calculate DCG
        dcg = EvaluationMetrics.dcg_at_k(retrieved_docs, relevant_docs, k)
        
        # Calculate IDCG (Ideal DCG)
        ideal_retrieved = relevant_docs[:k]  # Best possible ranking
        idcg = EvaluationMetrics.dcg_at_k(ideal_retrieved, relevant_docs, k)
        
        if idcg == 0:
            return 0.0
        
        return dcg / idcg


In [None]:
def main():
    # Path ke dataset
    csv_path = "D:/IT DEL/Semester 8 (FINAL)/STBI/Information-Retrieval-Books-search-engine-main/Information-Retrieval-Books-search-engine-main/data_books_updated.csv"
    
    # Initialize IR system
    print("Initializing Information Retrieval System...")
    ir_system = RocchioInformationRetrieval(csv_path)
    
    if ir_system.df is None:
        print("Failed to load data. Please check the file path.")
        return
    
    print("\n" + "="*60)
    print("INFORMATION RETRIEVAL SYSTEM WITH ROCCHIO ALGORITHM")
    print("="*60)
    
    while True:
        print("\nOptions:")
        print("1. Search without feedback")
        print("2. Search with relevance feedback") 
        print("3. Evaluate system performance")
        print("4. Exit")
        
        choice = input("\nSelect option (1-4): ").strip()
        
        if choice == '1':
            # Search without feedback
            query = input("\nEnter your search query: ").strip()
            if not query:
                print("Please enter a valid query.")
                continue
                
            print(f"\nSearching for: '{query}'")
            results, _ = ir_system.search(query, top_k=10)
            
            print(f"\nTop 10 Results:")
            print("-" * 80)
            for i, result in enumerate(results, 1):
                print(f"{i}. Title: {result['title']}")
                print(f"   Author: {result['author']}")
                print(f"   Score: {result['score']:.4f}")
                print(f"   Description: {result['description']}")
                print("-" * 80)
        
        elif choice == '2':
            # Search with relevance feedback
            query = input("\nEnter your search query: ").strip()
            if not query:
                print("Please enter a valid query.")
                continue
            
            # Initial search
            print(f"\nInitial search for: '{query}'")
            initial_results, _ = ir_system.search(query, top_k=10)
            
            print(f"\nInitial Top 10 Results:")
            print("-" * 80)
            for i, result in enumerate(initial_results, 1):
                print(f"{i}. Title: {result['title']}")
                print(f"   Author: {result['author']}")
                print(f"   Score: {result['score']:.4f}")
                print("-" * 80)
            
            # Get relevance feedback
            print("\nProvide relevance feedback:")
            relevant_input = input("Enter relevant document numbers (comma-separated, e.g., 1,3,5): ").strip()
            non_relevant_input = input("Enter non-relevant document numbers (comma-separated, e.g., 2,4): ").strip()
            
            relevant_docs = []
            non_relevant_docs = []
            
            # Parse relevant documents
            if relevant_input:
                try:
                    relevant_nums = [int(x.strip()) for x in relevant_input.split(',')]
                    relevant_docs = [initial_results[i-1]['index'] for i in relevant_nums if 1 <= i <= len(initial_results)]
                except:
                    print("Invalid input for relevant documents.")
            
            # Parse non-relevant documents  
            if non_relevant_input:
                try:
                    non_relevant_nums = [int(x.strip()) for x in non_relevant_input.split(',')]
                    non_relevant_docs = [initial_results[i-1]['index'] for i in non_relevant_nums if 1 <= i <= len(initial_results)]
                except:
                    print("Invalid input for non-relevant documents.")
            
            # Search with feedback
            if relevant_docs or non_relevant_docs:
                print(f"\nApplying Rocchio feedback...")
                feedback_results = ir_system.search_with_feedback(
                    query, relevant_docs, non_relevant_docs, top_k=10
                )
                
                print(f"\nResults after Rocchio feedback:")
                print("-" * 80)
                for i, result in enumerate(feedback_results, 1):
                    print(f"{i}. Title: {result['title']}")
                    print(f"   Author: {result['author']}")
                    print(f"   Score: {result['score']:.4f}")
                    print("-" * 80)
            else:
                print("No feedback provided.")
        
        elif choice == '3':
            # Evaluate system performance
            print("\nEvaluating system performance...")
            print("Note: This is a demo evaluation with sample queries.")
            
            # Sample queries untuk evaluasi
            sample_queries = [
                ("python programming", [100, 150, 200]),  # Sample relevant doc indices
                ("machine learning", [50, 75, 125]),
                ("data science", [25, 175, 225])
            ]
            
            metrics = EvaluationMetrics()
            queries_results = []
            
            for query, relevant_docs in sample_queries:
                results, _ = ir_system.search(query, top_k=10)
                retrieved_docs = [r['index'] for r in results]
                queries_results.append((retrieved_docs, relevant_docs))
            
            # Calculate metrics
            map_5 = metrics.map_at_k(queries_results, k=5)
            
            # Calculate average metrics across queries
            precision_scores = []
            recall_scores = []
            ndcg_scores = []
            
            for retrieved_docs, relevant_docs in queries_results:
                precision_scores.append(metrics.precision_at_k(retrieved_docs, relevant_docs, k=5))
                recall_scores.append(metrics.recall_at_k(retrieved_docs, relevant_docs, k=5))
                ndcg_scores.append(metrics.ndcg_at_k(retrieved_docs, relevant_docs, k=5))
            
            avg_precision = sum(precision_scores) / len(precision_scores)
            avg_recall = sum(recall_scores) / len(recall_scores)
            avg_ndcg = sum(ndcg_scores) / len(ndcg_scores)
            
            print(f"\nEvaluation Results:")
            print("-" * 40)
            print(f"MAP@5:        {map_5:.4f}")
            print(f"NDCG@5:       {avg_ndcg:.4f}")
            print(f"Precision@5:  {avg_precision:.4f}")
            print(f"Recall@5:     {avg_recall:.4f}")
            print("-" * 40)
            
        elif choice == '4':
            print("\nThank you for using the Information Retrieval System!")
            break
            
        else:
            print("Invalid option. Please select 1-4.")

if __name__ == "__main__":
    main()

Initializing Information Retrieval System...
Loaded 58477 documents
TF-IDF matrix shape: (58477, 5000)

INFORMATION RETRIEVAL SYSTEM WITH ROCCHIO ALGORITHM

Options:
1. Search without feedback
2. Search with relevance feedback
3. Evaluate system performance
4. Exit



Select option (1-4):  1

Enter your search query:  love of my life



Searching for: 'love of my life'

Top 10 Results:
--------------------------------------------------------------------------------
1. Title: Love Life
   Author: Rob Lowe
   Score: 0.5785
   Description: Rob Lowe is back with stories he only tells his best friends.When Rob Lowe's first book was published in 2011, he received the kind of rapturous reviews that writers dream of and rocketed to the top o...
--------------------------------------------------------------------------------
2. Title: Chris-In-The-Morning: Love, Life, and the Whole Karmic Enchilada
   Author: Louis Chunovic
   Score: 0.5243
   Description: Chris-In-The-Morning is a book about love, life, and the whole karmic enchilada. It's about a man named Chris who is struggling with his relationships and his career. He's trying to figure out what he...
--------------------------------------------------------------------------------
3. Title: Expressions of Faith: Inspirational Stories of Life and Love
   Author: Guidepost


Select option (1-4):  2

Enter your search query:  love of my life



Initial search for: 'love of my life'

Initial Top 10 Results:
--------------------------------------------------------------------------------
1. Title: Love Life
   Author: Rob Lowe
   Score: 0.5785
--------------------------------------------------------------------------------
2. Title: Chris-In-The-Morning: Love, Life, and the Whole Karmic Enchilada
   Author: Louis Chunovic
   Score: 0.5243
--------------------------------------------------------------------------------
3. Title: Expressions of Faith: Inspirational Stories of Life and Love
   Author: Guideposts Magazine
   Score: 0.4261
--------------------------------------------------------------------------------
4. Title: You, My Love: A Diary in Verse
   Author: Richard Atwood (Goodreads Author)
   Score: 0.4089
--------------------------------------------------------------------------------
5. Title: Love Your Life: Living Happy, Healthy and Whole
   Author: Victoria Osteen
   Score: 0.4049
--------------------------------

Enter relevant document numbers (comma-separated, e.g., 1,3,5):  1000
