In [11]:
!pip install sentence_transformers
!pip install faiss-gpu
!pip install rank_bm25
!pip install dotenv



In [12]:
import boto3
import numpy as np
import json
import pandas as pd
import time
import os
from sentence_transformers import SentenceTransformer
import faiss
import hashlib
from rank_bm25 import BM25Okapi
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import pickle
from sentence_transformers import CrossEncoder
import requests
from sklearn.metrics.pairwise import cosine_similarity 
from dotenv import load_dotenv
import os
import numpy as np
import faiss
import hashlib
import json
import requests
import pickle
import pandas as pd
from typing import List, Dict, Any, Tuple, Optional, Union
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from rank_bm25 import BM25Okapi
from sentence_transformers import CrossEncoder
import logging

In [25]:
load_dotenv("Credentials")  

bedrock_client = boto3.client(
    service_name = "bedrock-runtime",
    region_name="us-east-2",
    aws_access_key_id = os.getenv("aws_access_key_id"), 
    aws_secret_access_key = os.getenv("aws_secret_access_key")
)

import pickle
import faiss
import logging
import pandas as pd
import numpy as np
import os
import hashlib
import time
import requests
import json
import boto3
from typing import List, Dict, Tuple, Union
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from rank_bm25 import BM25Okapi

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class DocumentPreprocessor:
    """Class for handling document preprocessing tasks"""
    
    @staticmethod
    def standardize_chunks(chunks: List[Dict]) -> List[Dict]:
        """Ensure all chunks have a consistent structure with required fields."""
        standardized_chunks = []
        for chunk in chunks:
            # Create a new chunk to avoid modifying the original
            new_chunk = chunk.copy()
            
            # Fix text field if it's a dictionary
            if isinstance(new_chunk.get('text'), dict) and 'text' in new_chunk['text']:
                new_chunk['text'] = new_chunk['text']['text']
                
            # Ensure chunk_id exists if missing
            if 'chunk_id' not in new_chunk:
                chunk_text = str(new_chunk['text'])
                new_chunk['chunk_id'] = int(hashlib.md5(chunk_text.encode()).hexdigest(), 16) % 10000000
                
            # Ensure metadata exists
            if 'metadata' not in new_chunk:
                new_chunk['metadata'] = {}
                
            standardized_chunks.append(new_chunk)
        
        return standardized_chunks

class TextRetriever:
    """Class for text-based retrieval methods"""
    
    @staticmethod
    def build_tfidf_vectorizer(chunks: List[Dict]) -> Tuple[TfidfVectorizer, np.ndarray]:
        """Create and fit TF-IDF vectorizer for document corpus."""
        # Ensure chunks have the correct structure
        chunks = DocumentPreprocessor.standardize_chunks(chunks)
        
        # Extract text corpus
        corpus = [chunk['text'] for chunk in chunks]
        
        # Create and fit vectorizer
        vectorizer = TfidfVectorizer(
            stop_words='english',
            ngram_range=(1, 3),
            min_df=2,
            max_df=0.9,
            sublinear_tf=True
        )
        
        tfidf_matrix = vectorizer.fit_transform(corpus)
        
        return vectorizer, tfidf_matrix
    
    @staticmethod
    def retrieve_with_tfidf(
        query: str, 
        chunks: List[Dict], 
        vectorizer: TfidfVectorizer, 
        tfidf_matrix: np.ndarray, 
        top_n: int = 50
    ) -> List[Dict]:
        """Retrieve documents using TF-IDF similarity."""
        logger.info(f"TF-IDF retrieval for query: {query}")
        
        # Ensure chunks have the correct structure
        chunks = DocumentPreprocessor.standardize_chunks(chunks)
        
        try:
            # Transform query
            query_vec = vectorizer.transform([query])
            
            # Calculate similarity
            cosine_sim = cosine_similarity(query_vec, tfidf_matrix).flatten()
            
            # Sort and filter results
            scored_chunks = sorted(
                [(i, score) for i, score in enumerate(cosine_sim)],
                key=lambda x: x[1], 
                reverse=True
            )[:top_n]
            
            # Format results
            results = [
                {
                    "metadata": chunks[i]["metadata"],
                    "text": chunks[i]["text"],
                    "chunk_id": chunks[i]["chunk_id"],
                    "score": float(score),
                    "retrieval_method": "tfidf"
                } 
                for i, score in scored_chunks
            ]
            
            return results
        
        except Exception as e:
            logger.error(f"Error in TF-IDF retrieval: {e}")
            return []

def get_titan_embeddings(text, dimensions=512):
    """Generate embeddings using Amazon Titan model."""
    logger.info(f"Generating embedding for query")
    
    try:
        # Initialize Bedrock client
        bedrock_client = boto3.client(
            service_name = "bedrock-runtime",
            region_name="us-east-2",
            aws_access_key_id = os.getenv("aws_access_key_id"), 
            aws_secret_access_key = os.getenv("aws_secret_access_key")
        )        
        body = json.dumps({
            "inputText": text,
            "dimensions": dimensions,
            "normalize": True
        })
        
        response = bedrock_client.invoke_model(
            modelId="amazon.titan-embed-text-v2:0",
            contentType="application/json",
            accept="*/*",
            body=body
        )
        
        # Parse the response
        response_body = json.loads(response['body'].read())
        
        # Extract the embedding
        embedding = response_body['embedding']
        return np.array(embedding, dtype=np.float32)
        
    except Exception as e:
        logger.error(f"Error generating Titan embeddings: {e}")
        # Generate fallback random embedding
        embedding = np.random.rand(dimensions).astype('float32')
        embedding = embedding / np.linalg.norm(embedding)
        return embedding

class VectorRetriever:
    """Class for vector-based retrieval methods"""
    
    @staticmethod
    def create_faiss_index(embeddings: np.ndarray) -> faiss.IndexFlatL2:
        """Create a FAISS index from embeddings."""
        logger.info(f"Creating FAISS index with {len(embeddings)} embeddings")
        
        # Ensure proper typing
        embeddings = np.array(embeddings).astype('float32')
        
        # Validate dimensions
        if embeddings.ndim != 2:
            raise ValueError(f"Expected 2D array, got {embeddings.ndim}D")
            
        # Create index
        index = faiss.IndexFlatL2(embeddings.shape[1])
        index.add(embeddings)
        
        return index
    
    @staticmethod
    def retrieve_with_embeddings(
        query_embedding: np.ndarray,
        faiss_index: faiss.IndexFlatL2, 
        faiss_index_to_chunk: Dict[int, Dict], 
        top_n: int = 20
    ) -> List[Dict]:
        """Retrieve documents using embedding similarity."""
        logger.info(f"Vector retrieval with embedding")
        
        try:
            # Ensure query embedding is 2D
            query_embedding_2d = query_embedding.reshape(1, -1).astype('float32')
            
            # Search index
            distances, indices = faiss_index.search(query_embedding_2d, top_n)
            
            # Format vector results
            results = []
            for i, idx in enumerate(indices[0]):
                if idx >= 0 and idx < len(faiss_index_to_chunk):
                    chunk = faiss_index_to_chunk[idx]
                    if isinstance(chunk.get('text'), dict) and 'text' in chunk['text']:
                        chunk['text'] = chunk['text']['text']
                        
                    results.append({
                        "metadata": chunk,
                        "text": chunk["text"],
                        "chunk_id": chunk["chunk_id"],
                        "score": float(1 / (1 + distances[0][i])),
                        "retrieval_method": "vector"
                    })
            
            return results
            
        except Exception as e:
            logger.error(f"Error in vector retrieval: {e}")
            return []

class HybridRetriever:
    """Class for hybrid retrieval combining multiple methods"""
    
    @staticmethod
    def _deduplicate_results(results: List[Dict]) -> List[Dict]:
        """Deduplicate results by chunk_id or custom identifier."""
        seen = set()
        deduplicated = []
        
        for res in results:
            # Determine a unique identifier
            if 'item_id' in res["metadata"]:
                uid = res["metadata"]["item_id"]
            elif 'restaurant_id' in res["metadata"] and 'menu_item' in res["metadata"]:
                uid = f"{res['metadata']['restaurant_id']}_{res['metadata']['menu_item']}"
            else:
                uid = res['chunk_id']
                
            if uid not in seen:
                seen.add(uid)
                deduplicated.append(res)
                
        return deduplicated
    
    @staticmethod
    def reciprocal_rank_fusion(results_list: List[List[Dict]], k: float = 60.0) -> List[Dict]:
        """Combine multiple result lists using reciprocal rank fusion."""
        # Handle empty results
        if not results_list or all(not results for results in results_list):
            return []
            
        # Deduplicate by unique identifier across all result lists
        all_results = []
        for results in results_list:
            all_results.extend(results)
            
        deduplicated = HybridRetriever._deduplicate_results(all_results)
        
        # Calculate RRF scores
        item_scores = {}
        
        # Process each result list
        for rank_group_idx, result_list in enumerate(results_list):
            # Get scores by rank position
            for rank, item in enumerate(result_list):
                item_id = item['chunk_id']
                if item_id not in item_scores:
                    item_scores[item_id] = 0.0
                    
                # RRF formula: 1 / (k + rank)
                item_scores[item_id] += 1.0 / (k + rank)
        
        # Apply RRF scores to deduplicated results
        for item in deduplicated:
            item['rrf_score'] = item_scores.get(item['chunk_id'], 0.0)
            item['original_score'] = item['score']  # Preserve original score
            item['score'] = item['rrf_score']  # Replace with fusion score
            
        # Sort by fusion score
        return sorted(deduplicated, key=lambda x: x['rrf_score'], reverse=True)

class LLMResponseGenerator:
    """Class for generating responses using LLM through AWS Bedrock"""
    
    @staticmethod
    def format_context(results: List[Dict]) -> str:
        """Format retrieval results into a context string for the LLM."""
        context = "Here are some relevant restaurant items:\n\n"
        
        for i, res in enumerate(results):
            meta = res.get('metadata', {})
            context += f"[{i+1}] {meta.get('restaurant_name', 'Unknown')}: {meta.get('menu_item', 'Unknown Item')}\n"
            context += f"Description: {meta.get('menu_description', 'No description available')}\n"
            context += f"Price: {meta.get('price_tier', 'unknown').capitalize()}\n"
            
            if meta.get('cuisine_types'):
                context += f"Cuisine: {', '.join(meta.get('cuisine_types', []))}\n"
                
            if meta.get('ingredients'):
                context += f"Ingredients: {', '.join(meta.get('ingredients', []))}\n"
                
            context += f"Location: {meta.get('location', 'Unknown')}\n\n"
        
        return context
    
    @staticmethod
    def call_bedrock_claude(prompt: str) -> str:
        """Call AWS Bedrock Claude model to generate a response using the specified payload structure."""
        try:
            # Initialize Bedrock client
            bedrock_client = boto3.client(
                service_name = "bedrock-runtime",
                region_name="us-east-2",
                aws_access_key_id = os.getenv("aws_access_key_id"), 
                aws_secret_access_key = os.getenv("aws_secret_access_key")
            )            
            
            # Create payload using the provided structure
            payload = {
                "modelId": os.getenv("inference_profile"),
                "body": json.dumps({
                    "anthropic_version": "bedrock-2023-05-31",
                    "max_tokens": 500,
                    "temperature": 0.3,
                    "system": """You are a helpful restaurant recommendation assistant. 
                    Provide concise, helpful recommendations based on the retrieved information.
                    Focus on menu items, price, location, and cuisine types that best match the query.""",
                    "messages": [{
                        "role": "user",
                        "content": [{
                            "type": "text",
                            "text": prompt
                        }]
                    }]
                })
            }
            
            # Call the model using invoke_model
            response = bedrock_client.invoke_model(**payload)
            
            # Parse the response
            result = json.loads(response["body"].read().decode("utf-8"))
            generated_text = result.get("content", [{}])[0].get("text", "").strip()
            
            return generated_text
            
        except Exception as e:
            logger.error(f"Error calling AWS Bedrock Claude: {e}")
            return None
    
    @staticmethod
    def simple_fallback_response(query: str, results: List[Dict]) -> str:
        """Generate a simple response without calling an LLM."""
        if not results:
            return f"I couldn't find any restaurant recommendations for '{query}'. Could you provide more details or try a different search?"
        
        response = f"Here are some restaurant recommendations for '{query}':\n\n"
        
        for i, result in enumerate(results[:3]):
            meta = result.get('metadata', {})
            response += (
                f"{i+1}. {meta.get('restaurant_name', 'Unknown')}\n"
                f"   - Menu Item: {meta.get('menu_item', 'N/A')}\n"
                f"   - Price: {meta.get('price_tier', '').capitalize()}\n"
            )
            
            if meta.get('cuisine_types'):
                response += f"   - Cuisine: {', '.join(meta.get('cuisine_types', []))}\n"
                
            if meta.get('ingredients'):
                response += f"   - Ingredients: {', '.join(meta.get('ingredients', []))}\n"
                
            response += f"   - Location: {meta.get('location', 'Unknown')}\n\n"
        
        return response
    
    @staticmethod
    def generate_response(query: str, results: List[Dict]) -> str:
        """Generate a response to the query using retrieved results."""
        logger.info(f"Generating response for query: {query}")
        
        if not results:
            return f"I couldn't find any restaurant recommendations for '{query}'. Could you provide more details or try a different search?"
        
        # Format the context and prompt for the LLM
        context = LLMResponseGenerator.format_context(results)
        prompt = (
            f"Based on the following restaurant information, provide a helpful response to this query: '{query}'\n\n"
            f"{context}\n"
            f"Provide a concise recommendation that highlights the best matches for the query. "
            f"Include details about the food, price, and location."
        )
        
        # Try calling the LLM first
        llm_response = LLMResponseGenerator.call_bedrock_claude(prompt)
        
        # If the LLM call fails, use the fallback
        if llm_response is None:
            logger.warning("LLM call failed, using fallback response generator")
            return LLMResponseGenerator.simple_fallback_response(query, results)
        
        return llm_response

class RestaurantRecommender:
    """Main class for the restaurant recommendation system"""
    
    def __init__(self, chunks: List[Dict], embeddings: np.ndarray):
        """Initialize the recommender system with chunks and embeddings."""
        logger.info("Initializing Restaurant Recommender")
        
        # Standardize document chunks
        self.chunks = DocumentPreprocessor.standardize_chunks(chunks)
        
        # Create TF-IDF components
        self.vectorizer, self.tfidf_matrix = TextRetriever.build_tfidf_vectorizer(self.chunks)
        
        # Create FAISS index
        self.faiss_index = VectorRetriever.create_faiss_index(embeddings)
        
        # Create index mapping
        self.faiss_index_to_chunk = {
            i: {
                **chunk["metadata"],
                "text": chunk["text"],
                "chunk_id": chunk["chunk_id"]
            } for i, chunk in enumerate(self.chunks)
        }
    
    def save_components(self, save_dir: str = 'saved_objects'):
        """Save all fitted components to disk."""
        # Create save directory if it doesn't exist
        os.makedirs(save_dir, exist_ok=True)
        
        # Define paths for saved components
        vectorizer_path = os.path.join(save_dir, 'tfidf_vectorizer.pkl')
        tfidf_matrix_path = os.path.join(save_dir, 'tfidf_matrix.pkl')
        faiss_index_path = os.path.join(save_dir, 'faiss_index.bin')
        faiss_mapping_path = os.path.join(save_dir, 'faiss_index_to_chunk.pkl')
        chunks_save_path = os.path.join(save_dir, 'processed_chunks.pkl')
        
        # Save TF-IDF vectorizer
        logger.info(f"Saving TF-IDF vectorizer to {vectorizer_path}")
        with open(vectorizer_path, 'wb') as f:
            pickle.dump(self.vectorizer, f)
        
        # Save TF-IDF matrix
        logger.info(f"Saving TF-IDF matrix to {tfidf_matrix_path}")
        with open(tfidf_matrix_path, 'wb') as f:
            pickle.dump(self.tfidf_matrix, f)
        
        # Save FAISS index
        logger.info(f"Saving FAISS index to {faiss_index_path}")
        faiss.write_index(self.faiss_index, faiss_index_path)
        
        # Save FAISS index mapping
        logger.info(f"Saving FAISS index mapping to {faiss_mapping_path}")
        with open(faiss_mapping_path, 'wb') as f:
            pickle.dump(self.faiss_index_to_chunk, f)
        
        # Save processed chunks
        logger.info(f"Saving processed chunks to {chunks_save_path}")
        with open(chunks_save_path, 'wb') as f:
            pickle.dump(self.chunks, f)
        
        logger.info("All components saved successfully!")
    
    def search(self, query: str, top_k: int = 5) -> List[Dict]:
        """Perform hybrid search with the given query."""
        logger.info(f"Searching for: {query}")
        
        # Step 1: Retrieve with TF-IDF
        tfidf_results = TextRetriever.retrieve_with_tfidf(
            query, self.chunks, self.vectorizer, self.tfidf_matrix, 50
        )
        
        # Step 2: Generate query embedding
        try:
            query_embedding = get_titan_embeddings(query)
            
            # Step 3: Search with vector embedding
            vector_results = VectorRetriever.retrieve_with_embeddings(
                query_embedding, 
                self.faiss_index, 
                self.faiss_index_to_chunk, 
                20
            )
            
            # Step 4: Combine results with RRF
            results_list = [tfidf_results]
            if vector_results:
                results_list.append(vector_results)
                
            combined = HybridRetriever.reciprocal_rank_fusion(results_list)
            
        except Exception as e:
            logger.error(f"Error in vector search: {e}")
            # Fallback to TF-IDF results only
            combined = tfidf_results
        
        # Sort by score and limit results
        return sorted(combined, key=lambda x: x.get("score", 0), reverse=True)[:top_k]
    
    def generate_recommendation(self, query: str, top_k: int = 5) -> dict:
        """Search and generate a response using retrieved results."""
        logger.info(f"Generating recommendation for: {query}")
        
        start_time = time.time()
        
        # Search for relevant chunks
        results = self.search(query, top_k=top_k)
        
        # Generate response using LLM with fallback
        response = LLMResponseGenerator.generate_response(query, results)
        
        # Calculate execution time
        execution_time = time.time() - start_time
        
        # Return comprehensive response object
        return {
            "query": query,
            "response": response,
            "results": results,
            "execution_time": execution_time,
            "result_count": len(results)
        }

try:
    # Path to the chunks and embeddings data - updated to use the smaller files
    chunks_path = 'chunks_5000.pkl'
    embeddings_path = 'embeddings_5000.pkl'
    save_dir = 'saved_objects'
    
    # Create save directory if it doesn't exist
    os.makedirs(save_dir, exist_ok=True)
    
    start = time.time()
    
    # Load chunks
    logger.info(f"Loading chunks from {chunks_path}")
    with open(chunks_path, 'rb') as f:
        chunks = pickle.load(f)
    
    # Load embeddings
    logger.info(f"Loading embeddings from {embeddings_path}")
    with open(embeddings_path, 'rb') as f:
        embeddings_data = pickle.load(f)
        
    embeddings = embeddings_data
    
    # Initialize recommender
    logger.info("Creating restaurant recommender")
    recommender = RestaurantRecommender(chunks, embeddings)
    
    # Save all components
    logger.info("Saving all components")
    recommender.save_components(save_dir)
    
    # Test search query
    test_query = "spicy vegetarian food in san francisco"
    logger.info(f"Testing recommendation for query: {test_query}")
    
    recommendation = recommender.generate_recommendation(test_query, top_k=3)
    
    # Display the recommendation
    logger.info(f"Recommendation for '{test_query}':")
    logger.info(f"Response: {recommendation['response']}")
    logger.info(f"Found {recommendation['result_count']} results in {recommendation['execution_time']:.2f} seconds")
    
    # Display sample results
    for i, res in enumerate(recommendation['results']):
        logger.info(f"Result {i+1}: {res['metadata'].get('restaurant_name', 'Unknown')} - "
                  f"{res['metadata'].get('menu_item', 'Unknown')} - "
                  f"Score: {res['score']:.4f}")
    
    end = time.time()
    logger.info(f"Total processing time: {(end - start):.2f} seconds")
    
    logger.info("Process completed successfully!")
    
except Exception as e:
    logger.error(f"Error in main process: {e}", exc_info=True)

2025-04-28 05:47:20,821 - __main__ - INFO - Loading chunks from chunks_5000.pkl
2025-04-28 05:47:21,215 - __main__ - INFO - Loading embeddings from embeddings_5000.pkl
2025-04-28 05:47:21,225 - __main__ - INFO - Creating restaurant recommender
2025-04-28 05:47:21,226 - __main__ - INFO - Initializing Restaurant Recommender
2025-04-28 05:47:23,099 - __main__ - INFO - Creating FAISS index with 5000 embeddings
2025-04-28 05:47:23,134 - __main__ - INFO - Saving all components
2025-04-28 05:47:23,135 - __main__ - INFO - Saving TF-IDF vectorizer to saved_objects/tfidf_vectorizer.pkl
2025-04-28 05:47:23,173 - __main__ - INFO - Saving TF-IDF matrix to saved_objects/tfidf_matrix.pkl
2025-04-28 05:47:23,203 - __main__ - INFO - Saving FAISS index to saved_objects/faiss_index.bin
2025-04-28 05:47:23,243 - __main__ - INFO - Saving FAISS index mapping to saved_objects/faiss_index_to_chunk.pkl
2025-04-28 05:47:23,281 - __main__ - INFO - Saving processed chunks to saved_objects/processed_chunks.pkl
202