In [1]:
# Required installations:
# !pip install pandas numpy sqlalchemy pymongo chromadb sentence-transformers python-dotenv tqdm langchain openai langchain-community

In [2]:
## Importing required packages

import json
import pandas as pd
import numpy as np
from datetime import datetime
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, Text, text
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm.exc import FlushError
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import chromadb
from sentence_transformers import SentenceTransformer
from tqdm import tqdm
import logging
from concurrent.futures import ThreadPoolExecutor
from typing import List, Dict
import os
from dotenv import load_dotenv
from langchain.llms import OpenAI
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate

from google.colab import userdata

  from tqdm.autonotebook import tqdm, trange


In [3]:
## Setting up logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('pipeline.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

# 1. Creating a dataset since a dataset was not provided

In [4]:
# 1. Sample Dataset Creation
def create_sample_dataset(num_records=1000):
    """Create a sample dataset with mixed structured and unstructured data."""

    # Sample product categories and conditions
    categories = ['Electronics', 'Books', 'Clothing', 'Home & Garden', 'Sports']
    conditions = ['New', 'Like New', 'Good', 'Fair', 'Poor']

    data = []
    for i in range(num_records):
        record = {
            'id': i,
            'title': f"Product {i}",
            'category': np.random.choice(categories),
            'price': round(np.random.uniform(10, 1000), 2),
            'condition': np.random.choice(conditions),
            'created_at': datetime.now().isoformat(),
            'description': f"This is a detailed description of product {i}. It includes various features and specifications.",
            'metadata': {
                'seller_rating': round(np.random.uniform(1, 5), 1),
                'views': np.random.randint(0, 1000),
                'tags': np.random.choice(['premium', 'sale', 'featured'], size=np.random.randint(1, 3)).tolist()
            }
        }
        data.append(record)

    # Save as JSON
    with open('product_data.json', 'w') as f:
        json.dump(data, f)

    # Save as CSV (flattened structure)
    df = pd.json_normalize(data)
    df.to_csv('product_data.csv', index=False)

    return data

# 2. Database Schema and Setup

In [5]:
Base = declarative_base()

class Product(Base):
    __tablename__ = 'products'

    id = Column(Integer, primary_key=True)
    title = Column(String(255))
    category = Column(String(100))
    price = Column(Float)
    condition = Column(String(50))
    created_at = Column(DateTime)
    description = Column(Text)
    seller_rating = Column(Float)
    views = Column(Integer)
    tags = Column(String(255))  # Store as JSON string
    embedding = Column(Text)    # Store as JSON string

def setup_database():
    """Set up SQLite database with the defined schema."""
    engine = create_engine('sqlite:///products.db')
    Base.metadata.create_all(engine)
    return engine

  Base = declarative_base()


In [6]:
def ingest_data(data: list, engine):
    """Ingest data into the database."""
    try:
        processed_data = preprocess_data(data)

        # Create a session
        Session = sessionmaker(bind=engine)
        session = Session()

        # Insert data
        for record in tqdm(processed_data):
            session.add(Product(**record))

        session.commit()

    except (IntegrityError, FlushError) as e:
        print("Error during data ingestion:", e)
        session.rollback()
        raise

    finally:
        session.close()

# 3. Data Preprocessing


In [7]:
def preprocess_data(data: list):
    """Preprocess the data before loading into database."""
    processed_data = []

    for record in data:
        processed_record = {
            'id': record['id'],
            'title': record['title'],
            'category': record['category'],
            'price': record['price'],
            'condition': record['condition'],
            'created_at': datetime.fromisoformat(record['created_at']),
            'description': record['description'],
            'seller_rating': record['metadata']['seller_rating'],
            'views': record['metadata']['views'],
            'tags': json.dumps(record['metadata']['tags'])
        }
        processed_data.append(processed_record)

    return processed_data

# 4. Vectorization


In [8]:
class VectorStore:
    _instance = None  # Class variable to store the single instance

    def __new__(cls):
        """Create a new instance only if one doesn't exist."""
        if not isinstance(cls._instance, cls):
            cls._instance = super(VectorStore, cls).__new__(cls)
            # Initialize ChromaDB client and collection here
            cls._instance.model = SentenceTransformer('all-MiniLM-L6-v2')
            cls._instance.chroma_client = chromadb.Client()
            # Check if collection exists before creating
            if "product_embeddings" not in cls._instance.chroma_client.list_collections():
                cls._instance.collection = cls._instance.chroma_client.create_collection("product_embeddings")
            else:
                # Get existing collection if it already exists
                cls._instance.collection = cls._instance.chroma_client.get_collection("product_embeddings")
        return cls._instance

    def generate_embedding(self, text: str) -> List[float]:
        return self.model.encode(text).tolist()

    def batch_generate_embeddings(self, texts: List[str], batch_size: int = 100) -> List[List[float]]:
        return self.model.encode(texts, batch_size=batch_size).tolist()

    def store_embeddings(self, ids: List[str], texts: List[str], embeddings: List[List[float]]):
        self.collection.add(
            ids=ids,
            documents=texts,
            embeddings=embeddings
        )

# 5. Data Pipeline

In [9]:
class DataPipeline:
    def __init__(self):
        self.engine = setup_database()
        self.vector_store = VectorStore()
        Session = sessionmaker(bind=self.engine)
        self.session = Session()

    def ingest_data(self, data: List[Dict], batch_size: int = 100):
        """Ingest data into the database and vector store."""
        try:
            logger.info("Starting data ingestion")

            # Preprocess data
            processed_data = preprocess_data(data)

            # Generate embeddings
            texts = [f"{record['title']} {record['description']}" for record in processed_data]
            embeddings = self.vector_store.batch_generate_embeddings(texts, batch_size)

            # Store in database
            for record, embedding in zip(processed_data, embeddings):
                product = Product(**record, embedding=json.dumps(embedding))
                self.session.add(product)

            # Store in vector store
            self.vector_store.store_embeddings(
                ids=[str(record['id']) for record in processed_data],
                texts=texts,
                embeddings=embeddings
            )

            self.session.commit()
            logger.info("Data ingestion completed successfully")

        except Exception as e:
            logger.error(f"Error during data ingestion: {str(e)}")
            self.session.rollback()
            raise

    def query_similar(self, query_text: str, n_results: int = 5):
        """Query similar products based on text similarity."""
        try:
            # Generate query embedding
            query_embedding = self.vector_store.generate_embedding(query_text)

            # Query vector store
            results = self.vector_store.collection.query(
                query_embeddings=[query_embedding],
                n_results=n_results
            )

            # Fetch full records from database
            product_ids = [int(id) for id in results['ids'][0]]
            products = self.session.query(Product).filter(Product.id.in_(product_ids)).all()

            return products

        except Exception as e:
            logger.error(f"Error during similarity query: {str(e)}")
            raise

# 6. RAG Implementation

In [10]:
API_KEY = userdata.get('OPENAI_API_KEY')

In [11]:
class RAGSystem:
    def __init__(self):
        load_dotenv()
        self.llm = OpenAI(api_key=API_KEY)
        self.pipeline = DataPipeline()

        self.prompt_template = PromptTemplate(
            input_variables=["query", "context"],
            template="""
            Based on the following product information:
            {context}

            Answer the following question:
            {query}
            """
        )

        self.chain = LLMChain(llm=self.llm, prompt=self.prompt_template)

    def generate_response(self, query: str):
        """Generate a response using RAG."""
        # Retrieve relevant products
        products = self.pipeline.query_similar(query)

        # Format context
        context = "\n\n".join([
            f"Product: {p.title}\nCategory: {p.category}\nPrice: ${p.price}\nDescription: {p.description}"
            for p in products
        ])

        # Generate response
        response = self.chain.run(query=query, context=context)
        return response

In [12]:
# usage
if __name__ == "__main__":
    # 1. Create sample dataset
    data = create_sample_dataset(1000)
    engine = setup_database()

    # Clear existing data before ingesting new data
    with engine.connect() as conn:
        conn.execute(text("DELETE FROM products"))
        conn.commit()

    # Remove the below line to avoid ingesting data twice
    # ingest_data(data, engine)

    # 2. Initialize and run pipeline
    pipeline = DataPipeline()
    pipeline.ingest_data(data)

    # 3. Example query
    results = pipeline.query_similar("electronic devices in new condition")
    for product in results:
        print(f"Title: {product.title}")
        print(f"Category: {product.category}")
        print(f"Price: ${product.price}")
        print("---")

    # 4. RAG example
    rag_system = RAGSystem()
    response = rag_system.generate_response("What are the best electronic products under $500?")
    print("RAG Response:", response)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Title: Product 1
Category: Electronics
Price: $270.07
---
Title: Product 51
Category: Electronics
Price: $917.34
---
Title: Product 202
Category: Home & Garden
Price: $12.9
---
Title: Product 203
Category: Books
Price: $888.93
---
Title: Product 518
Category: Clothing
Price: $968.28
---


  self.llm = OpenAI(api_key=API_KEY)
  self.chain = LLMChain(llm=self.llm, prompt=self.prompt_template)
  response = self.chain.run(query=query, context=context)


RAG Response: 
Based on the given information, there are no electronic products under $500. The closest product to $500 is Product 400 which is priced at $778.32 and falls under the Electronics category. Therefore, there are no electronic products under $500 in this list. 


## Project Specification Document: Product Search and Q&A System

**1. Introduction**

This document outlines the specifications for a project that builds a product search and Q&A system using a combination of database management, vector embedding, and a large language model (LLM). The system will ingest product data, store it in a database and a vector store, allow users to search for similar products based on textual queries, and leverage a RAG (Retrieval Augmented Generation) approach to answer user questions about the products.

**2. Project Goals**

- **Efficient Product Search:** Enable users to quickly find relevant products based on text-based queries using semantic search.
- **Contextualized Q&A:** Allow users to ask questions about the products and receive insightful answers based on the available product information.
- **Scalable System:** Design a system that can handle a growing number of products and user queries effectively.

**3. Project Steps & Rationale**

**Step 1: Sample Dataset Creation ( `create_sample_dataset` )**

* **Rationale:**  We start by creating a sample dataset since a dataset was not provided. This dataset has structured (e.g., price, category) and unstructured data (e.g., description) to simulate real-world product data. The dataset is saved in both JSON and CSV formats for flexibility.

**Step 2: Database Schema and Setup ( `setup_database`, `Product` )**

* **Rationale:** We define a database schema (SQLite in this case) to store our structured product data. The `Product` class represents the table structure for storing product details, including attributes like title, category, price, and embeddings. This ensures structured data is stored persistently and can be queried efficiently.

**Step 3: Data Preprocessing ( `preprocess_data` )**

* **Rationale:**  Before data is ingested, we need to preprocess it. This step ensures data consistency and prepares it for loading into the database.  Specifically, it formats the data into a format compatible with the database schema and removes unnecessary fields.

**Step 4: Vectorization ( `VectorStore` )**

* **Rationale:** We leverage a vector store (ChromaDB) to efficiently store and search through product embeddings. These embeddings are generated from the title and descriptions of the products, allowing for semantic understanding of text. The `VectorStore` class manages embedding generation using a pre-trained model (Sentence Transformer) and handling the interaction with ChromaDB.

**Step 5: Data Pipeline ( `DataPipeline` )**

* **Rationale:** We create a `DataPipeline` to streamline data ingestion. This class handles the ingestion of product data into the database and vector store. It ensures that the database and vector store are populated simultaneously with consistent information.
  - it generates embedding and ingests them in a vector store
  - It preprocesses data
  - queries similar documents based on the query and embeddings in the vector store
  - stores data into the database

**Step 6: RAG Implementation ( `RAGSystem` )**

* **Rationale:** The `RAGSystem` implements a retrieval augmented generation system. It uses the `DataPipeline` to retrieve relevant product information based on user queries. The retrieved information is formatted and presented to an LLM (OpenAI in this case). The LLM is prompted to answer the user's question based on the context retrieved from the database and vector store.
  - Prompts LLM based on retrieved context.


**4. Future Enhancements**

- **Advanced Embedding Models:** Explore more powerful embedding models for improved semantic understanding.
- **Data Enrichment:** Integrate additional data sources (e.g., reviews, images) to provide more comprehensive product information.
- **User Feedback:** Implement mechanisms for collecting user feedback on search results and Q&A responses.
- **Deployment:** Deploy the system on a cloud platform to make it accessible to a wider audience.

**5. Conclusion**

This project specification outlines the key aspects of developing a product search and Q&A system. By combining database management, vector embedding, and LLMs, we can create a system that effectively assists users in finding relevant product information and answering their questions in a insightful way.



