# Part 1: Building AI-Powered Semantic Product Search with pgvector and Amazon Bedrock
### Configuration, Vector Embeddings and Data Ingestion
---

## Contents

1. [Background](#Background)
2. [Architecture](#Architecture)
3. [Setup](#Setup)
4. [Load Product Data](#Load-Product-Data)
5. [Generate Embeddings](#Generate-Embeddings)
6. [Store in PostgreSQL](#Store-in-PostgreSQL)

## Background

This lab demonstrates how to build a semantic product search system using vector embeddings. Key components:

- **Vector Embeddings**: Using Amazon Titan Embeddings to convert product descriptions into numerical vectors that capture semantic meaning
- **Vector Storage**: Using pgvector extension in Aurora PostgreSQL to efficiently store and search these vectors
- **Semantic Search**: Finding similar products by comparing vector distances

## Architecture

1. Product descriptions are converted to embeddings using Amazon Bedrock.
2. Embeddings are stored in Aurora PostgreSQL using the pgvector extension.
3. Search queries are converted to embeddings and compared using vector similarity.
4. Most similar products are returned based on vector distance.

![Building AI-Powered Semantic Product Search with pgvector and Amazon Bedrock](../static/Product_Catalog.png)

## Setup

First, let's install the required Python packages:

In [None]:
# Install all the required prerequiste libraries
%pip install setuptools==65.5.0
%pip install "psycopg[binary]" pgvector pandarallel boto3 tqdm numpy

## Load Required Libraries

In [None]:
import pandas as pd
import numpy as np
import boto3
import json
import psycopg
from pgvector.psycopg import register_vector
from pandarallel import pandarallel
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')

# Initialize Bedrock client
bedrock_runtime = boto3.client('bedrock-runtime')

print("Required libraries setup complete ✅ ")

## Aurora PostgreSQL Database Setup

Set up PostgreSQL with the pgvector extension and create our product catalog table:

In [None]:
# Get database credentials from Secrets Manager
client = boto3.client('secretsmanager')
response = client.get_secret_value(SecretId='apg-pgvector-secret-RIV')
database_secrets = json.loads(response['SecretString'])

# Set up database connection parameters
dbhost = database_secrets['host']
dbport = database_secrets['port']
dbuser = database_secrets['username']
dbpass = database_secrets['password']

def setup_database():
    """Set up database schema and tables"""
    conn = psycopg.connect(
        host=dbhost,
        port=dbport,
        user=dbuser,
        password=dbpass,
        autocommit=True
    )

    # Enable vector extension
    conn.execute("CREATE EXTENSION IF NOT EXISTS vector;")
    register_vector(conn)

    # Create schema
    conn.execute("CREATE SCHEMA IF NOT EXISTS bedrock_integration;")
    
    # Drop existing table if needed
    conn.execute("DROP TABLE IF EXISTS bedrock_integration.product_catalog;")

    # Create products table
    conn.execute("""
    CREATE TABLE IF NOT EXISTS bedrock_integration.product_catalog (
        \"productId\" VARCHAR(255) PRIMARY KEY,
        product_description TEXT,
        imgUrl TEXT,
        productURL TEXT,
        stars NUMERIC,
        reviews INT,
        price NUMERIC,
        category_id INT,
        isBestSeller BOOLEAN,
        boughtInLastMonth INT,
        category_name VARCHAR(255),
        quantity INT,
        embedding vector(1024)
    );
    """)

    # Create HNSW index
    conn.execute("""
    CREATE INDEX IF NOT EXISTS product_catalog_embedding_idx 
    ON bedrock_integration.product_catalog 
    USING hnsw (embedding vector_cosine_ops);
    """)
        
    print(f"Connection info: host={dbhost}, port={dbport}, user={dbuser}")
    print("Database setup complete ✅")
    conn.close()

setup_database()

## Load Product Catalog Data

Load and preprocess the product catalog data:

In [None]:
# Load product data
print("Loading product data...")
df = pd.read_csv('../datasets/product_catalog.csv')

# Clean up missing values
df = df.dropna(subset=['product_description'])
df = df.fillna({
    'stars': 0,
    'reviews': 0,
    'price': 0,
    'category_id': 0,
    'isBestSeller': False,
    'boughtInLastMonth': 0,
    'category_name': 'Unknown',
    'quantity': 0
})

print(f"Loaded {len(df)} products")
df.head(5)

## Generate Embeddings

Generate embeddings using Amazon Bedrock's Titan model:

In [None]:
def generate_embedding(text):
    """Generate embedding for a single text using Amazon Titan Text v2"""
    try:
        payload = json.dumps({'inputText': text})
        response = bedrock_runtime.invoke_model(
            body=payload,
            modelId='amazon.titan-embed-text-v2:0',
            accept="application/json",
            contentType="application/json"
        )
        response_body = json.loads(response.get("body").read())
        return response_body.get("embedding")
    except Exception as e:
        print(f"Error generating embedding: {str(e)}")
        return None

# Initialize parallel processing
print("\nGenerating embeddings for product descriptions...")
pandarallel.initialize(progress_bar=True, nb_workers=10)

# Generate embeddings
df['embedding'] = df['product_description'].parallel_apply(generate_embedding)

print("\nCompleted embedding generation")

## Store in Database

Store the products and their embeddings in Aurora PostgreSQL:

In [None]:
def store_products():
    """Store products in database with batch processing and statistics"""
    import time
    start_time = time.time()
    batch_size = 1000

    conn = psycopg.connect(
        host=dbhost,
        port=dbport,
        user=dbuser,
        password=dbpass,
        autocommit=True
    )
    
    print(f"Storing products in database... Total rows to process: {len(df)}")
    try:
        with conn.cursor() as cur:
            batches = []
            total_processed = 0
            
            # Process data in batches
            for i, (_, row) in enumerate(df.iterrows(), 1):
                batches.append((
                    row['productId'],
                    row['product_description'],
                    row['imgUrl'],
                    row['productURL'],
                    row['stars'],
                    row['reviews'],
                    row['price'],
                    row['category_id'],
                    row['isBestSeller'],
                    row['boughtInLastMonth'],
                    row['category_name'],
                    row['quantity'],
                    row['embedding']
                ))
                
                # When batch size is reached or at the end, process the batch
                if len(batches) == batch_size or i == len(df):
                    batch_start = time.time()
                    
                    cur.executemany("""
                    INSERT INTO bedrock_integration.product_catalog (
                        "productId", product_description, imgUrl, productURL,
                        stars, reviews, price, category_id, isBestSeller,
                        boughtInLastMonth, category_name, quantity, embedding
                    ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                    ON CONFLICT ("productId") DO UPDATE 
                    SET 
                        product_description = EXCLUDED.product_description,
                        imgUrl = EXCLUDED.imgUrl,
                        productURL = EXCLUDED.productURL,
                        stars = EXCLUDED.stars,
                        reviews = EXCLUDED.reviews,
                        price = EXCLUDED.price,
                        category_id = EXCLUDED.category_id,
                        isBestSeller = EXCLUDED.isBestSeller,
                        boughtInLastMonth = EXCLUDED.boughtInLastMonth,
                        category_name = EXCLUDED.category_name,
                        quantity = EXCLUDED.quantity,
                        embedding = EXCLUDED.embedding;
                    """, batches)
                    
                    total_processed += len(batches)
                    batch_time = time.time() - batch_start
                    elapsed_total = time.time() - start_time
                    
                    # Calculate progress and estimated time remaining
                    progress = (total_processed / len(df)) * 100
                    avg_time_per_batch = elapsed_total / (total_processed / batch_size)
                    remaining_batches = (len(df) - total_processed) / batch_size
                    eta = remaining_batches * avg_time_per_batch
                    
                    print(f"\rProgress: {progress:.1f}% | Processed: {total_processed}/{len(df)} rows | "
                          f"Batch time: {batch_time:.2f}s | ETA: {eta:.0f}s", end="")
                    
                    batches = []
            
            print("\n\nRunning VACUUM ANALYZE...")
            cur.execute("VACUUM ANALYZE bedrock_integration.product_catalog;")
            
            # Get final statistics
            cur.execute("SELECT COUNT(*) FROM bedrock_integration.product_catalog")
            final_count = cur.fetchone()[0]
            
            end_time = time.time()
            total_time = end_time - start_time
            
            print("\n📊 Data Loading Statistics:")
            print(f"✓ Total rows loaded: {final_count:,}")
            print(f"✓ Total loading time: {total_time:.2f} seconds")
            print(f"✓ Average time per row: {(total_time/len(df))*1000:.2f} ms")
            print(f"✓ Average time per batch: {(total_time/(len(df)/batch_size)):.2f} seconds")
            print("\n✅ Products stored successfully in database")
            
    except Exception as e:
        print(f"\n❌ Error storing products: {str(e)}")
        raise
    finally:
        conn.close()

# Load data with embeddings into the table
store_products()
print("\nPart 1 Complete: Setup and data loading finished! ✅")