In [4]:
import os
import time
import logging
import re

import pandas as pd
import pinecone
from dotenv import load_dotenv

from IPython.display import Image, display
from langchain.docstore.document import Document
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
from langchain.embeddings import HuggingFaceEmbeddings
from langchain_groq import ChatGroq
from langchain_pinecone import PineconeVectorStore

# ----------------------------------------------------------
# 1. Logging Configuration
# ----------------------------------------------------------
logging.basicConfig(
    filename='semantic_search.log',
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


In [5]:
# 2. Load Environment & Initialize Embeddings + LLM
# ----------------------------------------------------------
load_dotenv()

# Retrieve your secrets from .env
GROQ_API_KEY = os.getenv("GROQ_API_KEY")    # ChatGroq
PINECONE_KEY = os.getenv("PINECONE_KEY")   # Pinecone
# For the new client approach, environment may be optional 
# if you use ServerlessSpec. Otherwise, you might need "PINECONE_ENV".
# e.g.: PINECONE_ENV = os.getenv("PINECONE_ENV")

if not GROQ_API_KEY:
    raise ValueError("GROQ_API_KEY not found in .env")

if not PINECONE_KEY:
    raise ValueError("PINECONE_KEY not found in .env")

# Initialize HuggingFaceEmbeddings
model_name = "models/fine-tuned-sbert-triplet_20241224_171119_20241226_123849"
embeddings = HuggingFaceEmbeddings(model_name=model_name)
print(f"HuggingFaceEmbeddings initialized with model: {model_name}")
logger.info(f"HuggingFaceEmbeddings initialized with model: {model_name}")

  embeddings = HuggingFaceEmbeddings(model_name=model_name)


HuggingFaceEmbeddings initialized with model: models/fine-tuned-sbert-triplet_20241224_171119_20241226_123849


In [6]:

# ----------------------------------------------------------
# Initialize the ChatGroq LLM
# ----------------------------------------------------------
llm = ChatGroq(
    groq_api_key=GROQ_API_KEY,
    model_name="Llama3-8b-8192",  # Example model name
    temperature=0.7,
    max_tokens=200
)
logger.info("ChatGroq LLM initialized successfully.")


In [7]:

# ----------------------------------------------------------
# Example Data: Products & Services
# ----------------------------------------------------------
df_products = pd.DataFrame({
    "id": [16508, 11349, 51499],
    "name": [
        "Locomotive Men Washed Blue Jeans",
        "Lee Men Blue Chicago Fit Jeans",
        "Denizen Women Blue Jeans"
    ],
    "description": [
        "Casual jeans for men, washed style, perfect for everyday wear.",
        "Men's blue Chicago Fit Jeans, comfortable and stylish for summer.",
        "Women's blue jeans from Denizen, casual and versatile."
    ],
    "gender": ["Men", "Men", "Women"],
    "baseColour": ["Blue", "Blue", "Blue"]
})

df_services = pd.DataFrame({
    "id": [7001, 7002],
    "serviceName": [
        "Home Painting Service",
        "Car Painting Service"
    ],
    "serviceDescription": [
        "Professional home interior and exterior painting solutions.",
        "Automotive painting service for a new look or repairs."
    ],
    "category": ["Home Improvement", "Automotive"]
})

logger.info(f"Loaded {len(df_products)} products and {len(df_services)} services.")

# ----------------------------------------------------------
# Create Documents for Products & Services
# ----------------------------------------------------------
def create_product_documents(df: pd.DataFrame):
    docs = []
    for _, row in df.iterrows():
        page_content = (
            f"Product ID: {row['id']}. "
            f"Name: {row['name']}. "
            f"{row['description']}"
        )
        metadata = {
            "docType": "product",
            "id": row['id'],
            "name": row['name'],
            "description": row['description'],
            "gender": row.get("gender", None),
            "baseColour": row.get("baseColour", None),
        }
        docs.append(Document(page_content=page_content, metadata=metadata))
    return docs

def create_service_documents(df: pd.DataFrame):
    docs = []
    for _, row in df.iterrows():
        page_content = (
            f"Service ID: {row['id']}. "
            f"Name: {row['serviceName']}. "
            f"{row['serviceDescription']}"
        )
        metadata = {
            "docType": "service",
            "id": row['id'],
            "serviceName": row['serviceName'],
            "serviceDescription": row['serviceDescription'],
            "category": row.get("category", None),
        }
        docs.append(Document(page_content=page_content, metadata=metadata))
    return docs

product_docs = create_product_documents(df_products)
service_docs = create_service_documents(df_services)

all_docs = product_docs + service_docs
logger.info(f"Created {len(all_docs)} total documents (products + services).")


In [8]:
from pinecone import Pinecone, ServerlessSpec, PineconeApiException
import time
import logging
import json

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)  # Ensure logging is configured


def initialize_pinecone(pinecone_key):
    """
    Initialize Pinecone client, create index if it doesn't exist, and return the Pinecone index.
    """
    index_name = "test-index"  # Hard-coded index name

    # Initialize Pinecone client
    try:
        pc = Pinecone(api_key=pinecone_key)
        logger.info("Pinecone client created successfully.")
    except Exception as e:
        logger.error(f"Failed to initialize Pinecone client: {e}")
        raise

    # Determine embedding dimension from a test embedding
    try:
        # Ensure 'embeddings' is defined before calling this function
        # Example:
        # from langchain.embeddings import HuggingFaceEmbeddings
        # embeddings = HuggingFaceEmbeddings(model_name="your-model-name")
        test_emb = embeddings.embed_query("test")
        dimension = len(test_emb)
        logger.info(f"Embedding dimension: {dimension}")
    except Exception as e:
        logger.error(f"Failed to determine embedding dimension: {e}")
        raise

    # List existing indexes
    try:
        existing_indexes = pc.list_indexes()  # Ensure parentheses are present
        print(f"existing_indexes: {existing_indexes}")  # Debugging
        logger.info(f"Existing indexes: {existing_indexes}")
    except Exception as e:
        logger.error(f"Error listing Pinecone indexes: {e}")
        existing_indexes = []

    # Check if index exists
    if index_name not in existing_indexes:
        try:
            # Create a new index
            pc.create_index(
                name=index_name,
                dimension=dimension,
                metric="cosine",
                spec=ServerlessSpec(cloud="aws", region="us-east-1"),
                deletion_protection="disabled"
            )
            logger.info(f"Created a new Pinecone index: {index_name}")

            # Wait until the index is ready
            logger.info(f"Waiting for index '{index_name}' to be ready...")
            while True:
                index_status = pc.describe_index(index_name).status
                if index_status['ready']:
                    logger.info(f"Index '{index_name}' is ready.")
                    break
                time.sleep(1)
        except PineconeApiException as e:
            try:
                error_info = json.loads(e.body)
                if error_info['error']['code'] == 'ALREADY_EXISTS':
                    logger.info(f"Index '{index_name}' already exists. Proceeding to use it.")
                else:
                    logger.error(f"Error creating Pinecone index: {e}")
                    raise
            except json.JSONDecodeError:
                logger.error(f"Error parsing PineconeApiException body: {e.body}")
                raise
        except Exception as e:
            logger.error(f"Unexpected error during index creation: {e}")
            raise
    else:
        logger.info(f"Using existing Pinecone index: {index_name}")

    # Get the index object
    try:
        index = pc.Index(index_name)
        logger.info(f"Retrieved Pinecone index: {index_name}")
    except Exception as e:
        logger.error(f"Failed to retrieve Pinecone index '{index_name}': {e}")
        raise

    return index


# Ensure 'embeddings' is defined before calling the function
# Example:
# from langchain.embeddings import HuggingFaceEmbeddings
# embeddings = HuggingFaceEmbeddings(model_name="models/fine-tuned-sbert-triplet_20241224_171119_20241226_123849")

# Initialize Pinecone index
pinecone_index = initialize_pinecone(PINECONE_KEY)


existing_indexes: {'indexes': [{'deletion_protection': 'disabled',
              'dimension': 384,
              'host': 'test-index-yb7bvf4.svc.aped-4627-b74a.pinecone.io',
              'metric': 'cosine',
              'name': 'test-index',
              'spec': {'serverless': {'cloud': 'aws', 'region': 'us-east-1'}},
              'status': {'ready': True, 'state': 'Ready'}}]}


In [9]:



# Initialize the LangChain Pinecone vector store
vector_store = PineconeVectorStore(
    index=pinecone_index,    # Pinecone index object
    embedding=embeddings,     # Pinecone API key
)
logger.info("Pinecone vector store initialized successfully.")
vector_store

<langchain_pinecone.vectorstores.PineconeVectorStore at 0x205c2787830>

In [10]:
def add_documents_to_pinecone(docs, store, index_name, batch_size=100):
    """
    Adds documents to Pinecone in batches. Handles large datasets efficiently.
    
    Args:
        docs (list): List of documents to add.
        store (PineconeVectorStore): Pinecone vector store instance.
        index_name (str): Name of the Pinecone index.
        batch_size (int): Number of documents to add per batch (default: 100).
    """
    try:
        total_docs = len(docs)
        logger.info(f"Preparing to add {total_docs} documents to Pinecone index '{index_name}'.")

        # Batch the documents to avoid overloading the API
        for i in range(0, total_docs, batch_size):
            batch = docs[i:i + batch_size]
            store.add_documents(batch)
            logger.info(f"Added batch {i // batch_size + 1} containing {len(batch)} documents.")
        
        logger.info(f"Successfully added {total_docs} documents to Pinecone index '{index_name}'.")
    except Exception as e:
        logger.error(f"Error adding documents to Pinecone: {e}")
        raise
# Add documents to Pinecone
add_documents_to_pinecone(all_docs, vector_store,pinecone_index )

##  



filter_conditions = {
    "docType": {"$eq": "service"},       # Corrected to lowercase
    "category": {"$eq": "Automotive"}    # Corrected spelling
}

# Define the search query
search_query = "Find me professional painting services."

# Convert the query into a vector using the embeddings
query_vector = embeddings.embed_query(search_query)

# Perform the similarity search with filter
try:
    results = vector_store.similarity_search(
        search_query,
        k=5,
        filter=filter_conditions
    )
    
    if not results:
        print("No documents found matching the filters.")
    else:
        for doc in results:
            print("----------")
            print(f"Content: {doc.page_content}")
            print("Metadata:")
            for key, value in doc.metadata.items():
                print(f"  {key}: {value}")
            print("----------\n")
except Exception as e:
    logger.error(f"An error occurred during the filtered similarity search: {e}")
    print(f"An error occurred during the search: {e}")

In [11]:


# ----------------------------------------------------------
# Step 6: Define map_reduce Prompts & Create RetrievalQA
# ----------------------------------------------------------
map_prompt = PromptTemplate(
    input_variables=["context", "question"],
    template="""
You have the following chunk of data (could be a product or service):
{context}

User question: {question}

- Summarize any relevant items here, referencing ID and name.
- If nothing is relevant, say so.
""",
)

reduce_prompt = PromptTemplate(
    input_variables=["summaries", "question"],
    template="""
We have partial answers from multiple chunks:
{summaries}

Combine them into a single, cohesive answer to: "{question}"

Requirements:
1) Start with a short summary referencing relevant products or services (by ID and name).
2) Provide bullet points referencing IDs and names.
3) If no relevant items are found, say "No relevant items found."
""",
)

def create_qa_chain(llm, vector_store):
    """Create a map_reduce RetrievalQA chain that references the Pinecone vector store."""
    chain = RetrievalQA.from_chain_type(
        llm=llm,
        chain_type="map_reduce",
        retriever=vector_store.as_retriever(search_kwargs={"k": 10}),
        return_source_documents=True,
        chain_type_kwargs={
            "question_prompt": map_prompt,
            "combine_prompt": reduce_prompt
        }
    )
    logger.info("map_reduce RetrievalQA chain created successfully (Pinecone).")
    return chain

qa_chain = create_qa_chain(llm, vector_store)


In [12]:

# ----------------------------------------------------------
# Step 7: Define Semantic Search Tool
# ----------------------------------------------------------
def semantic_search_tool(query):
    """
    1) Calls qa_chain.invoke(query) to run the map_reduce retrieval QA.
    2) Appends product/service metadata (docType=product or service).
    """
    max_retries = 5
    for attempt in range(max_retries):
        try:
            response = qa_chain.invoke(query)

            if isinstance(response, dict):
                llm_result = response.get("result", "No result from LLM.")
                source_docs = response.get("source_documents", [])
            else:
                llm_result = response
                source_docs = []

            if not source_docs:
                return f"**LLM Result:** {llm_result}\n\nNo relevant documents found."

            formatted_results = f"**LLM Result:** {llm_result}\n\n"
            for doc in source_docs:
                meta = doc.metadata
                doc_type = meta.get("docType", "N/A")
                doc_id = meta.get("id", "N/A")

                if doc_type == "product":
                    name = meta.get("name", "N/A")
                    desc = meta.get("description", "")
                    gender = meta.get("gender", "N/A")
                    base_color = meta.get("baseColour", "N/A")

                    product_info = (
                        f"**Product ID:** {doc_id}\n"
                        f"**Product Name:** {name}\n"
                        f"- **Description:** {desc}\n"
                        f"- **Gender:** {gender}\n"
                        f"- **Colour:** {base_color}\n\n"
                    )
                    formatted_results += product_info

                elif doc_type == "service":
                    sname = meta.get("serviceName", "N/A")
                    sdesc = meta.get("serviceDescription", "")
                    category = meta.get("category", "N/A")

                    service_info = (
                        f"**Service ID:** {doc_id}\n"
                        f"**Service Name:** {sname}\n"
                        f"- **Category:** {category}\n"
                        f"- **Description:** {sdesc}\n\n"
                    )
                    formatted_results += service_info

            return formatted_results

        except Exception as e:
            logger.error(f"Attempt {attempt + 1} - Error: {e}")
            if "429" in str(e):
                sleep_time = 2 ** attempt
                logger.info(f"Rate limited. Retrying in {sleep_time} seconds...")
                time.sleep(sleep_time)
            else:
                return f"An error occurred: {e}"

    return "An error occurred after multiple attempts."

# ----------------------------------------------------------
# Example Usage
# ----------------------------------------------------------
if __name__ == "__main__":
    user_query = "Show me some jeans"
    print(f"Query: {user_query}\n")
    result = semantic_search_tool(user_query)
    print(result)


Query: Show me some jeans

**LLM Result:** Based on the provided data, I found the following relevant items that match the user's request to "Show me some jeans":

* Relevant products:
	+ ID: 16508, Name: Locomotive Men Washed Blue Jeans
	+ ID: 51499, Name: Denizen Women Blue Jeans
	+ ID: 11349, Name: Lee Men Blue Chicago Fit Jeans

These items are all types of jeans that match the user's request.

**Product ID:** 16508.0
**Product Name:** Locomotive Men Washed Blue Jeans
- **Description:** Casual jeans for men, washed style, perfect for everyday wear.
- **Gender:** Men
- **Colour:** Blue

**Product ID:** 16508.0
**Product Name:** Locomotive Men Washed Blue Jeans
- **Description:** Casual jeans for men, washed style, perfect for everyday wear.
- **Gender:** Men
- **Colour:** Blue

**Product ID:** 51499.0
**Product Name:** Denizen Women Blue Jeans
- **Description:** Women's blue jeans from Denizen, casual and versatile.
- **Gender:** Women
- **Colour:** Blue

**Product ID:** 51499.0
**P

In [13]:
import spacy

# Load the spaCy model
nlp = spacy.load("en_core_web_sm")

def detect_query_type(query):
    """
    Detects whether the query is about products or services and returns filter criteria.
    
    Args:
        query (str): The user's query.
    
    Returns:
        dict: Filter criteria for the Pinecone search.
    """
    doc = nlp(query.lower())

    # Default to searching both products and services
    filters = {}

    # Look for specific keywords or entities
    if any(token.text in ["service", "services"] for token in doc):
        filters["docType"] = "service"
    elif any(token.text in ["product", "jeans", "item", "products"] for token in doc):
        filters["docType"] = "product"

    # Example: Detect gender-based filters for products
    if "men" in query.lower():
        filters["gender"] = "Men"
    elif "women" in query.lower():
        filters["gender"] = "Women"

    return filters


In [14]:
def semantic_search_tool(query):
    """
    Semantic search tool that applies dynamic metadata filters based on the query.
    """
    # Detect query type and get filters
    filters = detect_query_type(query)

    # Create a retriever with the filters
    retriever = vector_store.as_retriever(search_kwargs={"k": 10, "filter": filters})

    # Create the QA chain
    qa_chain = RetrievalQA.from_chain_type(
        llm=llm,
        chain_type="map_reduce",
        retriever=retriever,
        return_source_documents=True
    )

    # Perform the query
    try:
        response = qa_chain.invoke(query)

        # Process the response
        if isinstance(response, dict):
            llm_result = response.get("result", "No result from LLM.")
            source_docs = response.get("source_documents", [])
        else:
            llm_result = response
            source_docs = []

        # Format the results
        if not source_docs:
            return f"**LLM Result:** {llm_result}\n\nNo relevant documents found."

        formatted_results = f"**LLM Result:** {llm_result}\n\n"
        for doc in source_docs:
            meta = doc.metadata
            doc_type = meta.get("docType", "N/A")
            doc_id = meta.get("id", "N/A")

            if doc_type == "product":
                name = meta.get("name", "N/A")
                desc = meta.get("description", "")
                gender = meta.get("gender", "N/A")
                base_color = meta.get("baseColour", "N/A")

                product_info = (
                    f"**Product ID:** {doc_id}\n"
                    f"**Product Name:** {name}\n"
                    f"- **Description:** {desc}\n"
                    f"- **Gender:** {gender}\n"
                    f"- **Colour:** {base_color}\n\n"
                )
                formatted_results += product_info

            elif doc_type == "service":
                sname = meta.get("serviceName", "N/A")
                sdesc = meta.get("serviceDescription", "")
                category = meta.get("category", "N/A")

                service_info = (
                    f"**Service ID:** {doc_id}\n"
                    f"**Service Name:** {sname}\n"
                    f"- **Category:** {category}\n"
                    f"- **Description:** {sdesc}\n\n"
                )
                formatted_results += service_info

        return formatted_results

    except Exception as e:
        logger.error(f"Error in semantic search tool: {e}")
        return f"An error occurred: {e}"


In [17]:
if __name__ == "__main__":
    queries = [
        "Show me some jeans",
        "I need painting services",
        "Find me women's jeans",
        "Recommend a product",
        "I need automotive services"
    ]

    for query in queries:
        print(f"\nQuery: {query}")
        result = semantic_search_tool(query)
        print(result)



Query: Show me some jeans
**LLM Result:** I think there might be some confusion! I'm an AI, I don't have the ability to physically show you jeans. However, I can provide you with some information about the jeans mentioned in the text:

* Locomotive Men Washed Blue Jeans (Product ID: 16508) - Casual jeans for men, washed style, perfect for everyday wear.
* Denizen Women Blue Jeans (Product ID: 51499) - Women's blue jeans from Denizen, casual and versatile.
* Lee Men Blue Chicago Fit Jeans (Product ID: 11349) - Men's blue Chicago Fit Jeans, comfortable and stylish for summer.

If you're looking for more information or want to know how to purchase these jeans, please let me know!

**Product ID:** 16508.0
**Product Name:** Locomotive Men Washed Blue Jeans
- **Description:** Casual jeans for men, washed style, perfect for everyday wear.
- **Gender:** Men
- **Colour:** Blue

**Product ID:** 16508.0
**Product Name:** Locomotive Men Washed Blue Jeans
- **Description:** Casual jeans for men, w

In [18]:
# Retrieve the metadata schema from Pinecone
def get_metadata_schema():
    """Retrieve the metadata schema from Pinecone."""
    index_stats = pinecone_index.describe_index_stats()
    metadata_schema = index_stats.get("metadata", {})
    return metadata_schema

# Dynamically match query terms to metadata fields
def extract_filters_from_query(query):
    """
    Automatically extract filters from the query by matching query terms to metadata fields.
    
    Args:
        query (str): The user's query.
    
    Returns:
        dict: A dictionary of filters to apply.
    """
    filters = {}
    metadata_schema = get_metadata_schema()
    
    # Process the query using spaCy
    doc = nlp(query.lower())

    # Match query terms to metadata fields
    for token in doc:
        for field, field_info in metadata_schema.items():
            if token.text in field_info.get("values", []):
                filters[field] = token.text.capitalize()

    return filters

In [27]:
def semantic_search_tool(query):
    """
    Semantic search tool that applies dynamic metadata filters based on the query.
    """
    # Detect filters from the query
    filters = detect_query_type(query)
    logger.info(f"Extracted Filters: {filters}")
    # Print the filters for debugging
    print(f"Extracted Filters: {filters}")

    # Use the filters in Pinecone retrieval
    retriever = vector_store.as_retriever(search_kwargs={"k": 10, "filter": filters})

    # Create the QA chain
    qa_chain = RetrievalQA.from_chain_type(
        llm=llm,
        chain_type="map_reduce",
        retriever=retriever,
        return_source_documents=True
    )

    # Perform the query
    try:
        response = qa_chain.invoke(query)

        # Process the response
        if isinstance(response, dict):
            llm_result = response.get("result", "No result from LLM.")
            source_docs = response.get("source_documents", [])
        else:
            llm_result = response
            source_docs = []

        # Format the results
        if not source_docs:
            return f"**LLM Result:** {llm_result}\n\nNo relevant documents found."

        formatted_results = f"**LLM Result:** {llm_result}\n\n"
        for doc in source_docs:
            metadata = doc.metadata or {}
            doc_type = metadata.get("docType", "N/A")
            doc_id = metadata.get("id", "N/A")

            if doc_type == "product":
                name = metadata.get("name", "N/A")
                description = metadata.get("description", "N/A")
                gender = metadata.get("gender", "N/A")
                base_color = metadata.get("baseColour", "N/A")

                product_info = (
                    f"**Product ID:** {doc_id}\n"
                    f"**Product Name:** {name}\n"
                    f"- **Description:** {description}\n"
                    f"- **Gender:** {gender}\n"
                    f"- **Colour:** {base_color}\n\n"
                )
                formatted_results += product_info

            elif doc_type == "service":
                service_name = metadata.get("serviceName", "N/A")
                service_description = metadata.get("serviceDescription", "N/A")
                category = metadata.get("category", "N/A")

                service_info = (
                    f"**Service ID:** {doc_id}\n"
                    f"**Service Name:** {service_name}\n"
                    f"- **Category:** {category}\n"
                    f"- **Description:** {service_description}\n\n"
                )
                formatted_results += service_info

        return formatted_results

    except Exception as e:
        logger.error(f"Error in semantic search tool: {e}")
        return f"An error occurred: {e}"


In [28]:
if __name__ == "__main__":
    queries = [
        "Show me men's blue jeans",
        "I need painting services",
        "Find women's jeans",
        "Recommend a product",
        "I need automotive services"
    ]

    for query in queries:
        print(f"\nQuery: {query}")
        result = semantic_search_tool(query)
        print(result)



Query: Show me men's blue jeans
Extracted Filters: {'docType': 'product', 'gender': 'Men'}
**LLM Result:** Based on the provided texts, I found two options that match your request:

1. Lee Men Blue Chicago Fit Jeans (Product ID: 11349) - described as "comfortable and stylish for summer".
2. Locomotive Men Washed Blue Jeans (Product ID: 16508) - described as "casual jeans for men, washed style, perfect for everyday wear".

Please note that these are the only two options mentioned in the provided texts. If you're looking for more options or specific features, you may need to explore further.

**Product ID:** 11349.0
**Product Name:** Lee Men Blue Chicago Fit Jeans
- **Description:** Men's blue Chicago Fit Jeans, comfortable and stylish for summer.
- **Gender:** Men
- **Colour:** Blue

**Product ID:** 11349.0
**Product Name:** Lee Men Blue Chicago Fit Jeans
- **Description:** Men's blue Chicago Fit Jeans, comfortable and stylish for summer.
- **Gender:** Men
- **Colour:** Blue

**Product