## Setup and Imports Cell

In [6]:
# Additional imports needed
import os
from dotenv import load_dotenv
import pandas as pd
import numpy as np
from IPython.display import display, HTML

# Load environment variables
load_dotenv()

# Get API keys and config
config = {
    "weaviate_cluster_url": os.getenv("WEAVIATE_CLUSTER_URL"),
    "weaviate_api_key": os.getenv("WEAVIATE_API_KEY"),
    "tavily_api_key": os.getenv("TAVILY_API_KEY"),
    "product_data_path": os.getenv("PRODUCT_DATA_PATH")
}

## Data loading and preprocessing

In [7]:
import pandas as pd
from bs4 import BeautifulSoup

# Load the provided XLSX file
file_path = "Assignment-product-db.xlsx"  # Update the path if necessary
xls = pd.ExcelFile(file_path)

# Load the first sheet
df = pd.read_excel(xls, sheet_name=xls.sheet_names[0])

# Selecting relevant columns
columns_to_keep = [
    "Title", "D", "Type", "Tags", "Category: Name", "Variant Price",
    "Metafield: custom.key_ingredients_list [list.single_line_text_field]",
    "Metafield: custom.popular [list.single_line_text_field]"
]

# Keeping only the necessary columns
df_cleaned = df[columns_to_keep].copy()

# Renaming columns for better clarity
df_cleaned.rename(columns={
    "Title": "Product Name",
    "D": "Description",
    "Category: Name": "Category",
    "Variant Price": "Price",
    "Metafield: custom.key_ingredients_list [list.single_line_text_field]": "Key Ingredients",
    "Metafield: custom.popular [list.single_line_text_field]": "On Sale"
}, inplace=True)

# Function to remove HTML tags from the "Description" column
def clean_html(text):
    if isinstance(text, str):  # Check if the value is a string
        return BeautifulSoup(text, "html.parser").get_text().strip()
    return text  # Return unchanged if it's not a string

# Apply cleaning function to remove HTML tags from descriptions
df_cleaned["Description"] = df_cleaned["Description"].apply(clean_html)

# Dropping rows where "Description" is empty after cleaning
df_cleaned = df_cleaned[df_cleaned["Description"].notna() & df_cleaned["Description"].str.strip().ne("")]

# Dropping duplicates
df_cleaned.drop_duplicates(inplace=True)

# Save cleaned dataset (optional)
df_cleaned.to_excel("Cleaned_Product_Data.xlsx", index=False)

# Display cleaned dataset preview
print("Number of products after cleaning:", len(df_cleaned))
print(df_cleaned.head())


Number of products after cleaning: 2644
                                   Product Name  \
0                           Melaglow Rich Cream   
1                 New Follihair Tablet (Bottle)   
2  Episoft AC Moisturiser with Sunscreen SPF 30   
4               Neutriderm Hair Enhancer Lotion   
5        Epique Intensive Cellular Repair Serum   

                                         Description             Type  \
0  Melaglow Rich Cream is a powerful depigmentati...       Skin Cream   
1  New Follihair Tablet (Bottle) is a comprehensi...  Hair Supplement   
2  Episoft AC Moisturiser with Sunscreen SPF 30 i...        Sunscreen   
4  Neutriderm Hair Enhancer Lotion, powered by sa...      Hair Lotion   
5  Epique Intensive Cellular Repair Serum is infu...       Skin Serum   

                                                Tags   Category   Price  \
0  Abbott, Age spots, Ageing, AHA-BHA Range, Anti...  Skin Care   389.0   
1  Abbott, Abbott Healthcare, Amino Acids, daily ...  Hair Care 

In [8]:
import pandas as pd
import json
import ast
from typing import Dict, List, Any, Optional
import numpy as np

class DataProcessor:
    """
    Class for processing skincare product data from XLSX file
    """
    def __init__(self, file_path: str):
        """
        Initialize the DataProcessor with the path to the XLSX file
        
        Args:
            file_path: Path to the XLSX file containing product data
        """
        self.file_path = file_path
        self.df = None
        self.processed_data = None
        
    def load_data(self) -> pd.DataFrame:
        """
        Load data from XLSX file
        
        Returns:
            Pandas DataFrame containing the loaded data
        """
        self.df = pd.read_excel(self.file_path)
        print(f"Loaded {len(self.df)} products")
        return self.df
    
    def clean_data(self) -> pd.DataFrame:
        """
        Clean the loaded data
        
        Returns:
            Cleaned pandas DataFrame
        """
        if self.df is None:
            self.load_data()
        
        # Handle NaN values
        self.df = self.df.fillna('')
        
        # Convert price to numeric
        self.df['Price'] = pd.to_numeric(self.df['Price'], errors='coerce')
        
        # Process tags and ingredients as lists if they're strings
        for col in ['Tags', 'Key Ingredients', 'On Sale']:
            self.df[col] = self.df[col].apply(self._parse_list_field)
        
        print("Data cleaning completed")
        return self.df
    
    def _parse_list_field(self, field_value: Any) -> List[str]:
        """
        Parse string representations of lists into actual lists
        
        Args:
            field_value: Value to parse, could be string, list or other
            
        Returns:
            List of strings
        """
        if isinstance(field_value, list):
            return field_value
        
        if isinstance(field_value, str) and field_value:
            try:
                # Try to parse as JSON/list literal
                parsed = ast.literal_eval(field_value)
                if isinstance(parsed, list):
                    return parsed
                return [str(parsed)]
            except (SyntaxError, ValueError):
                # If can't parse, return as single item list
                return [field_value]
        
        return [] if field_value == '' else [str(field_value)]
    
    def create_documents(self) -> List[Dict[str, Any]]:
        """
        Convert DataFrame to a list of document dictionaries for vector db
        
        Returns:
            List of document dictionaries
        """
        if self.df is None or len(self.df) == 0:
            self.clean_data()
        
        documents = []
        
        for _, row in self.df.iterrows():
            # Create the text field by combining relevant product info
            # This will be used for embeddings and semantic search
            text_for_embedding = f"""
            Product: {row['Product Name']}
            Description: {row['Description']}
            Type: {row['Type']}
            Category: {row['Category']}
            Key Ingredients: {', '.join(row['Key Ingredients'])}
            """
            
            # Create the document with both text and metadata
            document = {
                "text": text_for_embedding.strip(),
                "metadata": {
                    "product_name": row['Product Name'],
                    "description": row['Description'],
                    "type": row['Type'],
                    "tags": row['Tags'],
                    "category": row['Category'],
                    "price": row['Price'],
                    "key_ingredients": row['Key Ingredients'],
                    "on_sale": row['On Sale']
                }
            }
            
            documents.append(document)
        
        self.processed_data = documents
        print(f"Created {len(documents)} document objects")
        return documents 

## Vector database operations

In [9]:
import os
import weaviate
from weaviate.classes.config import Property, DataType
from langchain_weaviate import WeaviateVectorStore
from sentence_transformers import SentenceTransformer
from langchain.embeddings.base import Embeddings
from langchain.schema import Document
from typing import List, Dict, Any, Optional
import tqdm
import random
import numpy as np

# Create a custom embeddings class using SentenceTransformer
class BGEEmbeddings(Embeddings):
    """Wrapper around BGE embeddings from SentenceTransformer."""
    
    def __init__(self, model_name: str = "BAAI/bge-large-en"):
        """Initialize the BGE embeddings.
        
        Args:
            model_name: Name of the BGE model to use
        """
        try:
            self.model = SentenceTransformer(model_name)
            self.model_name = model_name
        except Exception as e:
            print(f"Error loading BGE model: {e}")
            raise
    
    def embed_documents(self, texts: List[str]) -> List[List[float]]:
        """Embed a list of documents using BGE.
        
        Args:
            texts: List of text strings to embed
            
        Returns:
            List of embeddings, one per input text
        """
        try:
            embeddings = self.model.encode(texts)
            return embeddings.tolist()
        except Exception as e:
            print(f"Error generating embeddings with BGE: {e}")
            raise
    
    def embed_query(self, text: str) -> List[float]:
        """Embed a query using BGE.
        
        Args:
            text: Query text to embed
            
        Returns:
            Embedding for the query text
        """
        try:
            embedding = self.model.encode([text])[0]
            return embedding.tolist()
        except Exception as e:
            print(f"Error generating query embedding with BGE: {e}")
            raise

class VectorStore:
    """
    Class for handling vector database operations with Weaviate using BGE embeddings
    """
    def __init__(self, cluster_url: str, api_key: str):
        """
        Initialize the vector store with Weaviate credentials
        
        Args:
            cluster_url: Weaviate cluster URL
            api_key: Weaviate API key
        """
        self.cluster_url = cluster_url
        self.api_key = api_key
        self.client = None
        self.index_name = "SkinCareProducts"
        # Initialize BGE embeddings
        self.embeddings = BGEEmbeddings()
        self.vector_store = None
        
    def connect(self) -> None:
        """
        Connect to Weaviate cluster
        """
        # Connect to Weaviate cloud using the latest API
        try:
            # Connect without any additional headers
            self.client = weaviate.connect_to_weaviate_cloud(
                cluster_url=self.cluster_url,
                auth_credentials=weaviate.auth.AuthApiKey(api_key=self.api_key)
            )
            print("Connected to Weaviate cluster")
        except Exception as e:
            print(f"Error connecting to Weaviate: {e}")
            raise
    
    def create_schema(self) -> None:
        """
        Create schema in Weaviate for skincare products
        """
        if self.client is None:
            self.connect()
        
        # Check if collection already exists
        try:
            # In Weaviate v4.x, we use collections instead of schema.get
            if self.client.collections.exists(self.index_name):
                print(f"Collection '{self.index_name}' already exists")
                return
        except Exception as e:
            print(f"Error checking if collection exists: {e}")
            # Continue with creation attempt
        
        # Create collection with properties
        try:
            # In Weaviate v4.x, we create collections directly
            collection = self.client.collections.create(
                name=self.index_name,
                description="Skincare product information for RAG system",
                # Use 'none' vectorizer since we'll provide vectors manually from BGE
                vectorizer_config=weaviate.classes.config.Configure.Vectorizer.none(),
                vector_index_config=weaviate.classes.config.Configure.VectorIndex.hnsw(
                    distance_metric=weaviate.classes.config.VectorDistances.COSINE
                ),
                properties=[
                    weaviate.classes.config.Property(
                        name="text",
                        data_type=weaviate.classes.config.DataType.TEXT,
                    ),
                    weaviate.classes.config.Property(
                        name="product_name",
                        data_type=weaviate.classes.config.DataType.TEXT
                    ),
                    weaviate.classes.config.Property(
                        name="description",
                        data_type=weaviate.classes.config.DataType.TEXT
                    ),
                    weaviate.classes.config.Property(
                        name="type",
                        data_type=weaviate.classes.config.DataType.TEXT
                    ),
                    weaviate.classes.config.Property(
                        name="tags",
                        data_type=weaviate.classes.config.DataType.TEXT_ARRAY
                    ),
                    weaviate.classes.config.Property(
                        name="category",
                        data_type=weaviate.classes.config.DataType.TEXT
                    ),
                    weaviate.classes.config.Property(
                        name="price",
                        data_type=weaviate.classes.config.DataType.NUMBER
                    ),
                    weaviate.classes.config.Property(
                        name="key_ingredients",
                        data_type=weaviate.classes.config.DataType.TEXT_ARRAY
                    ),
                    weaviate.classes.config.Property(
                        name="on_sale",
                        data_type=weaviate.classes.config.DataType.TEXT_ARRAY
                    )
                ]
            )
            print(f"Created collection '{self.index_name}' in Weaviate")
        except Exception as e:
            print(f"Error creating collection: {e}")
            raise
    
    def reset_collection(self) -> None:
        """
        Delete and recreate the collection (use with caution)
        """
        if self.client is None:
            self.connect()
            
        try:
            # Delete the collection if it exists
            if self.client.collections.exists(self.index_name):
                self.client.collections.delete(self.index_name)
                print(f"Deleted existing collection '{self.index_name}'")
            
            # Recreate the collection
            self.create_schema()
            print(f"Reset of collection '{self.index_name}' completed")
        except Exception as e:
            print(f"Error resetting collection: {e}")
            raise
    
    def _create_random_vector(self, seed=None):
        """
        Create a random vector for demonstration purposes
        
        Args:
            seed: Optional seed for reproducibility
            
        Returns:
            A random 1024-dimensional vector (matching BGE embedding dimensions)
        """
        if seed is not None:
            random.seed(seed)
            np.random.seed(seed)
        
        # Create a random vector with 1024 dimensions (matching BGE embedding size)
        vector = np.random.rand(1024)
        # Normalize to unit length for cosine similarity
        vector = vector / np.linalg.norm(vector)
        return vector.tolist()
    
    def load_documents(self, documents: List[Dict[str, Any]]) -> None:
        """
        Load documents into Weaviate
        
        Args:
            documents: List of document dictionaries with text and metadata
        """
        if self.client is None:
            self.connect()
            
        # Reset the collection to make sure we have a clean state with correct settings
        try:
            self.reset_collection()
        except Exception as e:
            print(f"Warning: Collection reset failed but continuing: {e}")
        
        try:
            # Make sure the collection exists
            if not self.client.collections.exists(self.index_name):
                self.create_schema()
                
            # Get the collection and add documents directly using batch import
            print("Adding documents directly to Weaviate...")
            collection = self.client.collections.get(self.index_name)
            
            # Batch import data
            with collection.batch.dynamic() as batch:
                for i, doc in enumerate(documents):
                    if i % 100 == 0:
                        print(f"Processing document {i}/{len(documents)}")
                        
                    # Combine text and metadata into properties
                    properties = {
                        "text": doc["text"],
                    }
                    
                    # Add all metadata properties
                    for key, value in doc["metadata"].items():
                        properties[key] = value
                    
                    # Generate embedding for the document text
                    vector = self.embeddings.embed_query(doc["text"])
                    
                    # Add the object with the embedding vector
                    batch.add_object(properties=properties, vector=vector)
            
            print(f"Successfully added {len(documents)} documents to Weaviate")
            
            # Initialize vector store for retrieval
            self.vector_store = WeaviateVectorStore(
                client=self.client,
                index_name=self.index_name,
                text_key="text",
                embedding=self.embeddings
            )
            
        except Exception as e:
            print(f"Error loading documents into Weaviate: {e}")
            raise
    
    def delete_class(self) -> None:
        """
        Delete the class from Weaviate (use with caution)
        """
        if self.client is None:
            self.connect()
        
        try:
            # In Weaviate v4.x, we delete collections
            if self.client.collections.exists(self.index_name):
                self.client.collections.delete(self.index_name)
                print(f"Deleted collection '{self.index_name}' from Weaviate")
            else:
                print(f"Collection '{self.index_name}' does not exist")
        except Exception as e:
            print(f"Error deleting collection: {e}")
            raise
    
    def similarity_search(self, query: str, top_k: int = 5) -> List[Document]:
        """
        Perform similarity search in vector store
        
        Args:
            query: Query text
            top_k: Number of results to return
            
        Returns:
            List of relevant Document objects
        """
        if self.vector_store is None:
            print("Vector store not initialized. Attempting to initialize...")
            try:
                # Initialize vector store for search if not already done
                self.vector_store = WeaviateVectorStore(
                    client=self.client,
                    index_name=self.index_name,
                    text_key="text",
                    embedding=self.embeddings
                )
                print("Vector store initialized successfully")
            except Exception as e:
                print(f"Error initializing vector store: {e}")
                # Return empty list if we can't initialize
                return []
        
        try:
            print(f"Searching for query: '{query}'")
            collection = self.client.collections.get(self.index_name)
            
            # Generate embedding for the query using BGE
            query_vector = self.embeddings.embed_query(query)
            
            # Perform vector search
            response = collection.query.near_vector(
                near_vector=query_vector,
                limit=top_k,
            )
            
            # Convert to Document format for consistency
            results = []
            for item in response.objects:
                props = item.properties
                doc = Document(
                    page_content=props.get("text", ""),
                    metadata={
                        k: v for k, v in props.items() if k != "text"
                    }
                )
                results.append(doc)
            
            return results
            
        except Exception as e:
            print(f"Error during search: {e}")
            
            # Fall back to keyword search by filtering on text content
            try:
                print("Falling back to keyword search...")
                collection = self.client.collections.get(self.index_name)
                response = collection.query.fetch_objects(
                    filters=weaviate.classes.query.Filter.by_property("text").contains(query),
                    limit=top_k
                )
                
                # Convert to Document format
                results = []
                for item in response.objects:
                    props = item.properties
                    doc = Document(
                        page_content=props.get("text", ""),
                        metadata={
                            k: v for k, v in props.items() if k != "text"
                        }
                    )
                    results.append(doc)
                
                return results
            except Exception as e2:
                print(f"Keyword search also failed: {e2}")
                return []
    
    def hybrid_search(self, 
                     query: str, 
                     alpha: float = 0.5,
                     top_k: int = 5) -> List[Document]:
        """
        Hybrid search combining keyword and vector search
        
        Args:
            query: Query text
            alpha: Weight between keyword (0) and vector (1) search
            top_k: Number of results to return
            
        Returns:
            List of relevant Document objects
        """
        if self.client is None:
            self.connect()
        
        try:
            print(f"Performing hybrid search for: '{query}' with alpha={alpha}")
            collection = self.client.collections.get(self.index_name)
            
            # Generate embedding for the query using BGE
            query_vector = self.embeddings.embed_query(query)
            
            # Use Weaviate's hybrid search API
            response = collection.query.hybrid(
                query=query,
                vector=query_vector,
                alpha=alpha,
                limit=top_k
            )
            
            # Convert to Document format
            results = []
            for item in response.objects:
                props = item.properties
                doc = Document(
                    page_content=props.get("text", ""),
                    metadata={
                        k: v for k, v in props.items() if k != "text"
                    }
                )
                results.append(doc)
            
            return results
        
        except Exception as e:
            print(f"Error during hybrid search: {e}")
            
            # Fall back to regular similarity search
            try:
                return self.similarity_search(query, top_k=top_k)
            except Exception as e2:
                print(f"Fallback search also failed: {e2}")
                return []
    
    def filter_search(self, 
                      query: str, 
                      filters: Dict[str, Any],
                      top_k: int = 5) -> List[Document]:
        """
        Perform search with metadata filters
        
        Args:
            query: Query text
            filters: Dictionary of metadata filters
            top_k: Number of results to return
            
        Returns:
            List of relevant Document objects
        """
        if self.client is None:
            self.connect()
        
        try:
            print(f"Filtering search for query: '{query}' with filters: {filters}")
            collection = self.client.collections.get(self.index_name)
            
            # Start with base filter
            weaviate_filter = None
            
            # Add price filter if specified
            if "price_max" in filters:
                price_value = float(filters["price_max"])
                weaviate_filter = weaviate.classes.query.Filter.by_property("price").less_than_equal(price_value)
            
            # Add on_sale filter if specified
            elif "on_sale" in filters and filters["on_sale"]:
                # For array properties, we need to check if the array contains any of the values
                weaviate_filter = weaviate.classes.query.Filter.by_property("on_sale").contains_any(["On Sale"])
            
            # Generate embedding for the query using BGE
            query_vector = self.embeddings.embed_query(query)
            
            # Combine vector search with filter
            if weaviate_filter:
                response = collection.query.near_vector(
                    near_vector=query_vector,
                    filters=weaviate_filter,
                    limit=top_k
                )
            else:
                # No filter, just do vector search
                response = collection.query.near_vector(
                    near_vector=query_vector,
                    limit=top_k
                )
            
            # Convert to Document format
            results = []
            for item in response.objects:
                props = item.properties
                doc = Document(
                    page_content=props.get("text", ""),
                    metadata={
                        k: v for k, v in props.items() if k != "text"
                    }
                )
                results.append(doc)
            
            return results
            
        except Exception as e:
            print(f"Error during filtered search: {e}")
            
            # Fall back to regular search
            try:
                return self.similarity_search(query, top_k=top_k)
            except Exception as e2:
                print(f"Fallback search also failed: {e2}")
                return [] 

  from .autonotebook import tqdm as notebook_tqdm


## Web search integration

In [10]:
from tavily import TavilyClient
from typing import List, Dict, Any, Optional
from langchain.schema import Document

class WebSearch:
    """
    Class for handling web search for general skincare queries using Tavily API
    """
    def __init__(self, api_key: str):
        """
        Initialize the web search client
        
        Args:
            api_key: Tavily API key
        """
        self.client = TavilyClient(api_key=api_key)
    
    def search(self, query: str, max_results: int = 5) -> List[Document]:
        """
        Search the web for information related to the query
        
        Args:
            query: Search query
            max_results: Maximum number of results to return
            
        Returns:
            List of Document objects with search results
        """
        # Add skincare context to the query
        skincare_query = f"skincare {query}"
        
        # Execute search
        search_results = self.client.search(
            query=skincare_query,
            search_depth="advanced",
            max_results=max_results,
            include_answer=True,
            include_raw_content=True,
            include_images=False
        )
        
        # Convert to Document format for consistency with vector store
        documents = []
        if "results" in search_results:
            for result in search_results["results"]:
                # Create document with content and metadata
                doc = Document(
                    page_content=result.get("content", ""),
                    metadata={
                        "title": result.get("title", ""),
                        "url": result.get("url", ""),
                        "source": "web_search"
                    }
                )
                documents.append(doc)
        
        # Also include the generated answer if available
        if "answer" in search_results and search_results["answer"]:
            answer_doc = Document(
                page_content=search_results["answer"],
                metadata={
                    "title": "Tavily Generated Answer",
                    "source": "web_search_answer"
                }
            )
            # Put the answer at the front of the list
            documents.insert(0, answer_doc)
        
        return documents 

## Query classification

In [11]:
import re
from langchain_openai import ChatOpenAI
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
from typing import Dict, Any, Literal, Tuple

class QueryRouter:
    """
    Class for handling query routing between product search and general skincare advice
    """
    def __init__(self, openai_api_key: str):
        """
        Initialize the query router
        
        Args:
            openai_api_key: OpenAI API key
        """
        self.llm = ChatOpenAI(
            model="gpt-4o",
            temperature=0,
            api_key=openai_api_key
        )
        
        # Define the prompt for query classification
        self.classification_prompt = PromptTemplate(
            template="""You are a skincare expert assistant that helps route user queries.
Your goal is to determine if a query is asking for:
1. Product recommendations (user wants specific skincare product suggestions)
2. General skincare advice (user wants information about skincare topics, routines, or ingredients)

Query types:
- PRODUCT: Queries about finding, comparing, or buying specific skincare products
- GENERAL: Queries about skincare topics, routines, treatments, or ingredients without specifically requesting product recommendations

Examples:
- "Recommend a moisturizer for sensitive skin" -> PRODUCT
- "What's a good sunscreen for oily skin?" -> PRODUCT
- "How to treat acne scars" -> GENERAL
- "Benefits of niacinamide in skincare" -> GENERAL
- "What products help with hyperpigmentation?" -> PRODUCT
- "How should I layer skincare products?" -> GENERAL
- "Cleanser under 500 for dry skin" -> PRODUCT
- "How often should I exfoliate my face?" -> GENERAL

User Query: {query}

Analyze this query and respond ONLY with either "PRODUCT" or "GENERAL".""",
            input_variables=["query"]
        )
        
        # Chain for query classification
        self.classification_chain = LLMChain(
            llm=self.llm,
            prompt=self.classification_prompt
        )

        # Define the prompt for extracting price filters
        self.price_filter_prompt = PromptTemplate(
            template="""Extract the maximum price mentioned in this skincare product query.
If a specific price limit is mentioned (like "under 500", "less than 1000", etc.), identify that value.
If a price range is mentioned (like "between 500 and 1000"), identify the maximum value.
If no price is mentioned, respond with "None".

Examples:
- "Moisturizer under 500 rupees" -> 500
- "Face wash less than 1000" -> 1000
- "Products between 500 and 2000" -> 2000
- "Affordable sunscreen" -> None (no specific price mentioned)
- "Serums under Rs. 1200" -> 1200
- "Budget-friendly face cream" -> None (no specific price mentioned)

User Query: {query}

Extract maximum price (respond ONLY with the number or "None"):""",
            input_variables=["query"]
        )
        
        # Chain for price extraction
        self.price_filter_chain = LLMChain(
            llm=self.llm,
            prompt=self.price_filter_prompt
        )
        
    def classify_query(self, query: str) -> Literal["PRODUCT", "GENERAL"]:
        """
        Classify a query as either product recommendation or general skincare advice
        
        Args:
            query: User query text
            
        Returns:
            Classification as either "PRODUCT" or "GENERAL"
        """
        result = self.classification_chain.run(query).strip().upper()
        if result == "PRODUCT":
            return "PRODUCT"
        else:
            return "GENERAL"
    
    def extract_filters(self, query: str) -> Dict[str, Any]:
        """
        Extract filters from a product query
        
        Args:
            query: User query text
            
        Returns:
            Dictionary of extracted filters
        """
        filters = {}
        
        # Extract price information using LLM
        price_response = self.price_filter_chain.run(query).strip()
        if price_response.lower() != "none" and price_response.isdigit():
            filters["price_max"] = int(price_response)
        
        # Check for "on sale" mentions
        on_sale_pattern = re.compile(r'\b(on\s*sale|discount)\b', re.IGNORECASE)
        if on_sale_pattern.search(query):
            filters["on_sale"] = True
        
        return filters
    
    def parse_query(self, query: str) -> Tuple[Literal["PRODUCT", "GENERAL"], Dict[str, Any]]:
        """
        Parse a query to determine its type and extract any filters
        
        Args:
            query: User query text
            
        Returns:
            Tuple of (query_type, filters)
        """
        query_type = self.classify_query(query)
        filters = {}
        
        if query_type == "PRODUCT":
            filters = self.extract_filters(query)
        
        return query_type, filters 

## Main RAG system

In [19]:
import os
from typing import Dict, List, Any, Optional
from langchain.schema import Document
from langchain_openai import ChatOpenAI
from langchain.chains import ConversationalRetrievalChain
from langchain.prompts import PromptTemplate
from langchain.memory import ConversationBufferMemory
from data_processor import DataProcessor
from vector_store import VectorStore
from query_router import QueryRouter
from web_search import WebSearch

class RAGSystem:
    """
    Main class for the Retrieval Augmented Generation system
    """
    def __init__(self, config: Dict[str, str]):
        """
        Initialize the RAG system with configuration
        
        Args:
            config: Dictionary containing API keys and configuration
        """
        # Set up configuration
        self.openai_api_key = config.get("openai_api_key", "")
        self.tavily_api_key = config.get("tavily_api_key", "")
        self.weaviate_cluster_url = config.get("weaviate_cluster_url", "")
        self.weaviate_api_key = config.get("weaviate_api_key", "")
        self.product_data_path = config.get("product_data_path", "")
        
        # Initialize components
        self.data_processor = DataProcessor(self.product_data_path)
        self.vector_store = VectorStore(self.weaviate_cluster_url, self.weaviate_api_key)
        self.query_router = QueryRouter(self.openai_api_key)
        self.web_search = WebSearch(self.tavily_api_key)
        
        # Set up LLM with higher temperature for more creative responses
        self.llm = ChatOpenAI(
            model="gpt-4o",
            temperature=0.7,
            api_key=self.openai_api_key
        )
        
        # Set up memory with a larger buffer size
        self.memory = ConversationBufferMemory(
            memory_key="chat_history",
            return_messages=True,
            output_key="answer"
        )
        
        # Set up prompts with enhanced context handling
        self.product_prompt = PromptTemplate(
            template="""You are a knowledgeable skincare assistant. Your goal is to provide accurate, helpful product recommendations based on the user's query and conversation history.

CONVERSATION HISTORY:
{chat_history}

CURRENT CONTEXT (Product Information):
{context}

USER QUERY: {question}

Instructions for generating response:
1. First, analyze the conversation history to understand any previously mentioned:
   - Skin type/concerns
   - Product preferences
   - Budget constraints
   - Previously recommended products
   
2. Then, provide a detailed response that:
   - References relevant previous recommendations if they fit the current query
   - Suggests new products that complement previous recommendations
   - Explains why each product is suitable based on the user's specific needs
   - Includes key features and ingredients of recommended products
   - Mentions price information for each product
   
3. If the query references previous recommendations but you don't find relevant context in the history:
   - Focus on providing new, personalized recommendations
   - Explain that you're making fresh recommendations based on the current query
   - Ask clarifying questions if needed

4. Always prioritize PERSONALIZATION over history if you must choose:
   - Give precedence to the current query's specific needs
   - Use history to enhance, not restrict, your recommendations
   - Feel free to suggest different products if they better match current needs

Answer in a friendly, conversational tone. Be specific and reference actual products from the context.
If you're not sure about something, be honest and suggest consulting with a dermatologist for personalized advice.""",
            input_variables=["context", "chat_history", "question"]
        )
        
        self.general_prompt = PromptTemplate(
            template="""You are a knowledgeable skincare assistant. Your goal is to provide helpful, educational information about skincare based on the user's query and conversation history.

CONVERSATION HISTORY:
{chat_history}

CURRENT CONTEXT (Search Results):
{context}

USER QUERY: {question}

Instructions for generating response:
1. First, analyze the conversation history to understand:
   - Previously discussed skincare concerns
   - Mentioned routines or practices
   - Specific products or ingredients discussed
   
2. Then, provide a detailed response that:
   - Builds upon previous advice if relevant
   - Provides new, evidence-based information
   - Explains skincare concepts clearly
   - Offers practical, actionable advice
   
3. If the query references previous discussion but you don't find relevant context:
   - Focus on providing fresh, comprehensive advice
   - Explain that you're offering new recommendations
   - Ask clarifying questions if needed

4. Always prioritize ACCURACY and PERSONALIZATION over history:
   - Focus on providing the most accurate, up-to-date information
   - Tailor advice to the current query's specific needs
   - Use history to enhance, not limit, your response

Answer in a friendly, conversational tone. Provide explanations that are easy to understand but scientifically accurate.
If there are conflicting opinions in the search results, acknowledge them and provide balanced information.
If you're not sure about something, be honest and suggest consulting with a dermatologist.""",
            input_variables=["context", "chat_history", "question"]
        )
        
        # Retrieval chains will be initialized later
        self.product_chain = None
        self.general_chain = None
    
    def initialize(self) -> None:
        """
        Initialize the RAG system by setting up the vector store and retrieval chains
        """
        # Connect to the vector database
        self.vector_store.connect()
        
        # Don't initialize retrieval chains yet - we'll do that after loading documents
        print("RAG system initialized successfully")
    
    def _init_retrieval_chains(self) -> None:
        """
        Initialize the retrieval chains for both product and general queries
        """
        # Make sure vector_store is initialized
        if self.vector_store.vector_store is None:
            print("Warning: Vector store not initialized with documents yet. Retrieval chains will be initialized after loading documents.")
            return
            
        # Set up the product retrieval chain with memory
        self.product_chain = ConversationalRetrievalChain.from_llm(
            llm=self.llm,
            retriever=self.vector_store.vector_store.as_retriever(
                search_kwargs={"k": 5}
            ),
            memory=self.memory,
            combine_docs_chain_kwargs={"prompt": self.product_prompt},
            return_source_documents=True,
            chain_type="stuff"
        )
        
        print("Retrieval chains initialized successfully")
    
    def process_data(self) -> None:
        """
        Process the product data and load it into the vector store
        """
        # Load and clean data
        self.data_processor.load_data()
        self.data_processor.clean_data()
        
        # Create documents for vector database
        documents = self.data_processor.create_documents()
        
        # Create schema and load documents into vector store
        self.vector_store.create_schema()
        self.vector_store.load_documents(documents)
        
        # Now that documents are loaded, initialize the retrieval chains
        self._init_retrieval_chains()
        
        print("Data processing and loading completed")
    
    def process_query(self, query: str) -> str:
        """
        Process a user query and generate a response
        
        Args:
            query: User query text
            
        Returns:
            Generated response text
        """
        # Classify the query and extract filters
        query_type, filters = self.query_router.parse_query(query)
        
        if query_type == "PRODUCT":
            # Handle product recommendation query
            return self._process_product_query(query, filters)
        else:
            # Handle general skincare advice query
            return self._process_general_query(query)
    
    def _process_product_query(self, query: str, filters: Dict[str, Any]) -> str:
        """
        Process a product recommendation query
        
        Args:
            query: User query text
            filters: Dictionary of extracted filters
            
        Returns:
            Generated response text
        """
        # Check if vector store is initialized
        if self.vector_store.vector_store is None:
            return "I'm not ready to answer product questions yet. Please process the product data first by clicking 'Process Product Data' in the sidebar."
            
        # Initialize the product chain if not done already
        if self.product_chain is None:
            self._init_retrieval_chains()
            
        # Make sure product chain is initialized
        if self.product_chain is None:
            return "I'm having trouble accessing the product database. Please try processing the data again or contact support."
        
        try:
            # Use filters if available
            if filters:
                # Get relevant documents with filters
                docs = self.vector_store.filter_search(query, filters)
                
                # Create a formatted context from the documents
                context_text = self._format_documents(docs)
                
                # Generate response using the LLM with the filtered results
                response = self.llm.invoke(
                    self.product_prompt.format(
                        context=context_text,
                        chat_history=self.memory.buffer,
                        question=query
                    )
                )
                
                # Update memory manually
                self.memory.chat_memory.add_user_message(query)
                self.memory.chat_memory.add_ai_message(response.content)
                
                return response.content
            else:
                # Use the standard retrieval chain
                response = self.product_chain({"question": query})
                return response["answer"]
                
        except Exception as e:
            print(f"Error processing product query: {e}")
            return "I apologize, but I encountered an error while processing your query. Please try again or rephrase your question."
    
    def _process_general_query(self, query: str) -> str:
        """
        Process a general skincare advice query
        
        Args:
            query: User query text
            
        Returns:
            Generated response text
        """
        # Perform web search for the query
        search_results = self.web_search.search(query)
        
        # Format the search results
        context_text = self._format_documents(search_results)
        
        # Generate response using the LLM with the search results
        response = self.llm.invoke(
            self.general_prompt.format(
                context=context_text,
                chat_history=self.memory.buffer,
                question=query
            )
        )
        
        # Update memory
        self.memory.chat_memory.add_user_message(query)
        self.memory.chat_memory.add_ai_message(response.content)
        
        return response.content
    
    def _format_documents(self, documents: List[Document]) -> str:
        """
        Format a list of documents into a single context string
        
        Args:
            documents: List of Document objects
            
        Returns:
            Formatted context string
        """
        if not documents:
            return "No relevant information found."
        
        formatted_docs = []
        
        for i, doc in enumerate(documents):
            # Format the document
            if doc.metadata.get("source") == "web_search_answer":
                # Format Tavily generated answer differently
                formatted_docs.append(f"GENERATED ANSWER: {doc.page_content}")
            elif doc.metadata.get("source") == "web_search":
                # Format web search result
                title = doc.metadata.get("title", "Unknown Title")
                url = doc.metadata.get("url", "Unknown URL")
                formatted_docs.append(f"SOURCE {i+1}: {title}\nURL: {url}\n{doc.page_content}")
            else:
                # Format product document
                product_name = doc.metadata.get("product_name", "Unknown Product")
                price = doc.metadata.get("price", "Unknown Price")
                category = doc.metadata.get("category", "Unknown Category")
                key_ingredients = ", ".join(doc.metadata.get("key_ingredients", []))
                
                formatted = f"PRODUCT {i+1}: {product_name}\n"
                formatted += f"PRICE: {price}\n"
                formatted += f"CATEGORY: {category}\n"
                formatted += f"KEY INGREDIENTS: {key_ingredients}\n"
                formatted += f"DESCRIPTION: {doc.metadata.get('description', '')}\n"
                
                formatted_docs.append(formatted)
        
        return "\n\n".join(formatted_docs)
    
    def reset_conversation(self) -> None:
        """
        Reset the conversation history
        """
        self.memory.clear()
        print("Conversation history cleared") 
        

## Testing Cell

In [22]:
# Initialize the RAG system
rag_system = RAGSystem(config)
rag_system.initialize()

# Process data
rag_system.process_data()

# Test queries
test_queries = [
    "Suggest me a Skin emulsion",
    "Can you recommend a moisturizer under ₹500 that contains Niacinamide?",
    "I'm traveling to a humid city next week. Can you recommend a lightweight skincare routine suitable for that climate?",
    "I have sensitive skin. What ingredients should I avoid in skincare products?",
    "Based our conversation history can you suggest a routine for my skin type using those products?",
    "That routine sounds great! Can you also recommend an anti-aging serum under ₹2000 that contains Hyaluronic Acid and is on sale?",
    "What ingredients should I look for in anti-aging products?"
]

for query in test_queries:
    print(f"\nQuery: {query}")
    response = rag_system.process_query(query)
    print(f"Response: {response}")

Connected to Weaviate cluster
RAG system initialized successfully
Loaded 2644 products
Data cleaning completed
Created 2644 document objects
Collection 'SkinCareProducts' already exists
Deleted existing collection 'SkinCareProducts'
Created collection 'SkinCareProducts' in Weaviate
Reset of collection 'SkinCareProducts' completed
Adding documents directly to Weaviate...
Processing document 0/2644
Processing document 100/2644
Processing document 200/2644
Processing document 300/2644
Processing document 400/2644
Processing document 500/2644
Processing document 600/2644
Processing document 700/2644
Processing document 800/2644
Processing document 900/2644
Processing document 1000/2644
Processing document 1100/2644
Processing document 1200/2644
Processing document 1300/2644
Processing document 1400/2644
Processing document 1500/2644
Processing document 1600/2644
Processing document 1700/2644
Processing document 1800/2644
Processing document 1900/2644
Processing document 2000/2644
Processin

## 👆🏻Please Check the above example outputs for different kind of prompts

## Visualization Cell

In [14]:
def display_product_results(results):
    for i, result in enumerate(results, 1):
        display(HTML(f"""
        <div style="border:1px solid #ddd; padding:10px; margin:5px; border-radius:5px;">
            <h3>{result.metadata.get('product_name', 'Unknown Product')}</h3>
            <p><b>Price:</b> ₹{result.metadata.get('price', 'N/A')}</p>
            <p><b>Category:</b> {result.metadata.get('category', 'N/A')}</p>
            <p><b>Description:</b> {result.metadata.get('description', 'N/A')}</p>
            <p><b>Key Ingredients:</b> {', '.join(result.metadata.get('key_ingredients', []))}</p>
        </div>
        """))

# Test visualization
query = "Suggest me a Skin emulsion"
results = rag_system.vector_store.similarity_search(query, top_k=3)
display_product_results(results)

Searching for query: 'Suggest me a Skin emulsion'
