In [1]:
import os
import json
import time
import numpy as np
import faiss
from openai import OpenAI
from sentence_transformers import CrossEncoder

In [37]:


OPENAI_API_KEY = 'knvh5u5rfuyg6877gi'


OPENAI_EMBEDDING_MODEL = "text-embedding-3-small"
OPENAI_CHAT_MODEL = "gpt-3.5-turbo"
OPENAI_CHAT_MODEL_FOR_ANALYSIS = ''

FAISS_INDEX_DIMENSION = None
RERANKER_MODEL_NAME = 'cross-encoder/ms-marco-MiniLM-L-6-v2'

PRODUCT_CATALOG_JSON_PATH = "C:\\Users\\shiva250535\\Downloads\\sample_data.json"

faiss_index = None
product_metadata_storage = [] 

client = OpenAI(api_key=OPENAI_API_KEY)


reranker_model = CrossEncoder(RERANKER_MODEL_NAME)
print(f"Re-ranker model '{RERANKER_MODEL_NAME}' loaded successfully.")





Re-ranker model 'cross-encoder/ms-marco-MiniLM-L-6-v2' loaded successfully.


In [None]:
def get_openai_embedding(text: str, model: str = OPENAI_EMBEDDING_MODEL) -> list[float] | None:
    global FAISS_INDEX_DIMENSION
    if not client:
        print("OpenAI client not initialized. Cannot get embedding.")
        return None
    try:
        text = text.replace("\n", " ")
        response = client.embeddings.create(input=[text], model=model)
        embedding = response.data[0].embedding
        if FAISS_INDEX_DIMENSION is None:
            FAISS_INDEX_DIMENSION = len(embedding)
            print(f"FAISS index dimension set to: {FAISS_INDEX_DIMENSION}")
        return embedding
    except Exception as e:
        print(f"Error getting OpenAI embedding: {e}")
        return None

def load_product_data(json_path: str) -> list[dict]:
    try:
        with open(json_path, 'r', encoding='utf-8') as f:
            data = json.load(f)
        print(f"Successfully loaded {len(data)} products from '{json_path}'.")
        return data
    except FileNotFoundError:
        print(f"Error: Product catalog file not found at '{json_path}'.")
        return []
    except json.JSONDecodeError:
        print(f"Error: Could not decode JSON from '{json_path}'. Please check its format.")
        return []

def build_faiss_index_and_metadata(products: list[dict]):
    global faiss_index, product_metadata_storage, FAISS_INDEX_DIMENSION
    
    if not products:
        print("No products to build index from.")
        return

    print("Building FAISS index and metadata store...")
    embeddings_list = []
    product_metadata_storage.clear()
    if not FAISS_INDEX_DIMENSION:
        sample_product_text_for_embedding = f"{products[0].get('name', '')} {products[0].get('description', '')}"
        sample_embedding = get_openai_embedding(sample_product_text_for_embedding)
        if not sample_embedding:
            print("Error: Could not determine embedding dimension. Index building aborted.")
            return

    for i, product in enumerate(products):
        name = product.get("name", "")
        description = product.get("description", "")
        specs = product.get("specifications", {})
        specs_str = ""
        if isinstance(specs, dict):
            specs_str = ". ".join([f"{k}: {v}" for k, v in specs.items()])
        elif isinstance(specs, str):
            specs_str = specs
        
        text_to_embed = f"Product Name: {name}. Description: {description}. Specifications: {specs_str}"
        
        embedding = get_openai_embedding(text_to_embed)
        if embedding:
            embeddings_list.append(embedding)
            product_metadata_storage.append(product) 
        else:
            print(f"Warning: Could not generate embedding for product ID {product.get('product_id', 'N/A')}. Skipping.")

        if (i + 1) % 10 == 0:
            print(f"Processed {i+1}/{len(products)} products for indexing.")

    if not embeddings_list:
        print("No embeddings generated. FAISS index cannot be built.")
        return

    embeddings_np = np.array(embeddings_list).astype('float32')
    
    if FAISS_INDEX_DIMENSION is None:
        print("Error: FAISS_INDEX_DIMENSION is not set. Cannot build index.")
        return
        
    faiss_index = faiss.IndexFlatL2(FAISS_INDEX_DIMENSION)
    faiss_index.add(embeddings_np)
    print(f"FAISS index built successfully with {faiss_index.ntotal} vectors.")
    print(f"Product metadata storage populated with {len(product_metadata_storage)} items.")


def analyze_query_with_openai(user_query: str, model: str = OPENAI_CHAT_MODEL_FOR_ANALYSIS) -> dict:
    if not client:
        print("OpenAI client not initialized. Cannot analyze query.")
        return {"category": None, "attributes": [], "raw_query": user_query}
        
    system_message = """
    You are an AI assistant that helps analyze user queries for product recommendations.
    Your goal is to extract the main product category the user is looking for and any key attributes or features.
    The product categories are typically nouns like "Laptops", "Smartphones", "Dishwashers", "Coffee Makers", "Headphones".
    If the user mentions multiple items, try to pick the primary one or the one that seems most like a product category.
    Respond ONLY in JSON format with the following keys:
    - "category": A string representing the identified product category (e.g., "Laptops", "Smartphones"). If no specific category is clear, return null or a very generic term like "product" if absolutely necessary, but try to be specific.
    - "attributes": A list of strings representing key features, adjectives, or requirements mentioned by the user (e.g., ["lightweight", "good battery life", "under $1000", "excellent camera"]).

    Example User Query: "I need a lightweight laptop for travel with a good battery life, maybe under $1000"
    Example JSON Response:
    {
        "category": "Laptops",
        "attributes": ["lightweight", "travel", "good battery life", "under $1000"]
    }

    User Query: "Show me some quiet and energy efficient dishwashers for a small apartment"
    Example JSON Response:
    {
        "category": "Dishwashers",
        "attributes": ["quiet", "energy efficient", "small apartment"]
    }
    
    User Query: "durable smartphone with excellent camera"
    Example JSON Response:
    {
        "category": "Smartphones",
        "attributes": ["durable", "excellent camera"]
    }
    
    User Query: "tools for my garden"
    Example JSON Response:
    {
        "category": "Gardening Tools",
        "attributes": []
    }
    """
    try:
        response = client.chat.completions.create(
            model=model,
            messages=[
                {"role": "system", "content": system_message},
                {"role": "user", "content": user_query}
            ],
            temperature=0,
            response_format={"type": "json_object"}
        )
        content = response.choices[0].message.content
        extracted_info = json.loads(content)
        extracted_info["raw_query"] = user_query
        return extracted_info
    except Exception as e:
        print(f"Error during OpenAI query analysis: {e}")
        attributes = [word.lower() for word in user_query.split() if len(word) > 3]
        return {"category": None, "attributes": attributes, "raw_query": user_query}


def filter_products_by_category(extracted_category: str | None, all_metadata: list[dict]) -> list[tuple[int, dict]]:
    if not extracted_category:
        print("No category extracted, returning all products for vector search (or consider this an error).")
        return [(i, data) for i, data in enumerate(all_metadata)] 

    filtered_products_with_indices = []
    extracted_category_lower = extracted_category.lower()
    
    for original_idx, product_data in enumerate(all_metadata):
       
        product_categories = product_data.get("categories", []) 
        if isinstance(product_categories, list):
            for cat in product_categories:
                if extracted_category_lower in cat.lower(): 
                    filtered_products_with_indices.append((original_idx, product_data))
                    break 
    print(f"Metadata filtering: Extracted category '{extracted_category}'. Found {len(filtered_products_with_indices)} matching products.")
    return filtered_products_with_indices


def search_faiss_on_filtered_subset(query_embedding_np: np.ndarray,
                                     original_faiss_index: faiss.Index,
                                     filtered_product_indices: list[int],
                                     top_k: int) -> tuple[list[float], list[int]]:

    if not filtered_product_indices:
        return [], []

    if not original_faiss_index or original_faiss_index.ntotal == 0:
        print("Original FAISS index is not available or empty.")
        return [],[]

    num_filtered = len(filtered_product_indices)
    
    valid_filtered_indices = [idx for idx in filtered_product_indices if 0 <= idx < original_faiss_index.ntotal]
    if not valid_filtered_indices:
        print("No valid indices after filtering. Cannot search.")
        return [], []

    try:
       
        filtered_vectors_list = []
        for idx in valid_filtered_indices:
            try:
                vec = original_faiss_index.reconstruct(idx)
                filtered_vectors_list.append(vec)
            except Exception as e_reconstruct:
                print(f"Warning: Could not reconstruct vector for index {idx}. Error: {e_reconstruct}. Skipping this vector.")
        
        if not filtered_vectors_list:
            print("No vectors could be reconstructed for the filtered set.")
            return [], []
            
        filtered_vectors_np = np.array(filtered_vectors_list).astype('float32')

    except Exception as e:
        print(f"Error reconstructing vectors for FAISS sub-search: {e}")
        return [], []

    if filtered_vectors_np.shape[0] == 0:
        print("No vectors in the filtered subset for FAISS search.")
        return [], []

    temp_faiss_index = faiss.IndexFlatL2(FAISS_INDEX_DIMENSION)
    temp_faiss_index.add(filtered_vectors_np)

    actual_top_k = min(top_k, temp_faiss_index.ntotal)
    if actual_top_k == 0:
        return [], []
        
    distances, temp_indices = temp_faiss_index.search(query_embedding_np, actual_top_k)

    original_indices_of_results = [valid_filtered_indices[i] for i in temp_indices[0]]
    
    return distances[0].tolist(), original_indices_of_results


def rerank_products(query: str, products_to_rerank: list[dict], top_n: int = 5) -> list[dict]:

    if not reranker_model:
        print("Re-ranker model not loaded. Skipping re-ranking.")
        return products_to_rerank[:top_n]

    if not products_to_rerank:
        return []

    pairs = []
    for product in products_to_rerank:
        prod_name = product.get("name", "")
        prod_desc = product.get("description", "")
        prod_specs = product.get("specifications", {})
        specs_str = ""
        if isinstance(prod_specs, dict):
            specs_str = ", ".join([f"{k}: {v}" for k,v in list(prod_specs.items())[:3]]) # Top 3 specs
        elif isinstance(prod_specs, str):
            specs_str = prod_specs[:100]
        
        product_text = f"{prod_name}. {prod_desc}. Key Specs: {specs_str}"
        pairs.append([query, product_text])

    if not pairs:
        return []

    print(f"Re-ranking {len(pairs)} product pairs...")
    scores = reranker_model.predict(pairs)

    for i, product in enumerate(products_to_rerank):
        product['rerank_score'] = scores[i]

    reranked_products = sorted(products_to_rerank, key=lambda x: x.get('rerank_score', -float('inf')), reverse=True)
    
    return reranked_products[:top_n]


def generate_recommendation_with_openai(user_query: str, ranked_products: list[dict], model: str = OPENAI_CHAT_MODEL) -> str:
    """Generates a natural language recommendation using OpenAI."""
    if not client:
        return "OpenAI client not initialized. Cannot generate recommendation."
        
    if not ranked_products:
        return "I couldn't find any specific products matching your refined query after filtering and searching. Could you try rephrasing or being more general?"

    context = "Based on your query and our product catalog, here are some recommendations:\n\n"
    for i, prod in enumerate(ranked_products):
        context += f"Product {i+1}:\n"
        context += f"  ID: {prod.get('product_id', 'N/A')}\n"
        context += f"  Name: {prod.get('name', 'N/A')}\n"
        context += f"  Price: ${prod.get('price', 'N/A')}\n"
        categories = prod.get('categories', [])
        if isinstance(categories, list) and categories:
            context += f"  Category: {', '.join(categories)}\n"
        
        specs = prod.get("specifications")
        specs_summary = "Not available"
        if isinstance(specs, dict) and specs:
            specs_summary = ", ".join([f"{k}: {v}" for k, v in list(specs.items())[:4]]) # First 4 specs
        elif isinstance(specs, str) and specs:
            specs_summary = specs[:150] + "..." if len(specs) > 150 else specs
        context += f"  Key Specifications: {specs_summary}\n"
        if 'rerank_score' in prod:
            context += f"  Relevance Score (Re-ranker): {prod['rerank_score']:.4f}\n"
        elif 'faiss_distance' in prod:
            context += f"  Relevance Score (FAISS Distance - lower is better): {prod['faiss_distance']:.4f}\n"
        context += "\n"
        
    system_message = """
    You are a helpful AI product recommendation assistant.
    Based on the user's query and the provided product information (which has been retrieved and ranked), generate a friendly and helpful recommendation.
    Explain briefly why certain products are a good fit. If multiple products are good, you can mention them.
    If the retrieved products seem like a mixed bag, try to pick the best ones or explain the trade-offs.
    Be concise but informative. Do not refer to yourself as an AI model. Just provide the recommendation.
    Focus on the products provided in the context.
    """
    prompt_messages = [
        {"role": "system", "content": system_message},
        {"role": "user", "content": f"My query is: \"{user_query}\"\n\nHere's the product information I found:\n{context}"}
    ]

    try:
        response = client.chat.completions.create(
            model=model,
            messages=prompt_messages,
            temperature=0.7,
            max_tokens=700
        )
        return response.choices[0].message.content.strip()
    except Exception as e:
        print(f"Error during OpenAI recommendation generation: {e}")
        return "I encountered an issue while trying to generate the recommendation."



In [38]:
def main():
    global faiss_index

    products = load_product_data(PRODUCT_CATALOG_JSON_PATH)
    if not products:
        print("Exiting due to no product data.")
        return


    build_faiss_index_and_metadata(products)
    if faiss_index is None or faiss_index.ntotal == 0:
        print("FAISS index not built or empty. Exiting.")
        return



In [39]:
if __name__ == "__main__":

    if OPENAI_API_KEY and client:
        main()
    else:
        print("OpenAI API key not configured or client initialization failed")


Successfully loaded 500 products from 'C:\Users\shiva250535\Downloads\sample_data.json'.
Building FAISS index and metadata store...
Error getting OpenAI embedding: Error code: 401 - {'error': {'message': 'Incorrect API key provided: knvh5u5r******77gi. You can find your API key at https://platform.openai.com/account/api-keys.', 'type': 'invalid_request_error', 'param': None, 'code': 'invalid_api_key'}}
Error: Could not determine embedding dimension. Index building aborted.
FAISS index not built or empty. Exiting.
