# Part 1: Building AI-Powered Semantic Product Search with pgvector and Amazon Bedrock
### Configuration, Vector Embeddings and Data Ingestion
---

## Contents

1. [Background](#Background)
2. [Architecture](#Architecture)
3. [Setup](#Setup)
4. [Load Product Data](#Load-Product-Data)
5. [Generate Embeddings](#Generate-Embeddings)
6. [Store in PostgreSQL](#Store-in-PostgreSQL)

## Background

This lab demonstrates how Blaize Bazaar implements intelligent product discovery using semantic search. Our solution enables customers to find products using natural language queries, understanding context beyond simple keyword matching.

Key components of Blaize Bazaar's search system:

- **Smart Search**: Amazon Titan Embeddings converts customer queries into semantic vectors
- **Efficient Storage**: Aurora PostgreSQL with pgvector powers our product catalog
- **Vector Similarity**: Amazon Bedrock and pgvector work together to match customer queries with relevant products using cosine similarity

## Blaize Bazaar's Product Data

- Source: [Amazon Products Dataset 2023](https://www.kaggle.com/datasets/asaniczka/amazon-products-dataset-2023-1-4m-products/data)
- Catalog Size: **21,704** products across multiple categories
- Curated Selection: Top and emerging products per category
- Quality Focus: Products with verified customer ratings

## Product Catalog Design

Blaize Bazaar's `bedrock_integration.product_catalog` table structure:

| **Column**          | **Data Type**    | **Constraints** | **Description** | 
| ------              | ------           | ------          | ------          | 
| productId           | VARCHAR (255)    | NOT NULL, PK    | Unique identifier for each product
| product_description | TEXT             |                 | Detailed description of products
| imgurl              | TEXT             |                 | URL to the product image
| producturl          | TEXT             |                 | URL to the product page
| stars               | NUMERIC          |                 | Product rating (out of 5)
| reviews             | INT              |                 | Number of reviews for the product
| price               | NUMERIC          |                 | Price of the product
| category_id         | INT              |                 | Identifier for the product category
| isbestseller        | BOOLEAN          |                 | Flag indicating if the product is a bestseller
| boughtinlastmonth   | INT              |                 | Number of units sold in the last month
| category_name       | VARCHAR (255)    |                 | Name of the product category
| quantity            | INT              |                 | Available quantity of the product
| embedding           | VECTOR (1024)    |                 | Vector embeddings for semantic search

### Search Optimization
The table has two indexes:

- Primary Key: B-tree index on `productId` - Facilitates fast product lookup
- Vector Search: HNSW index on `embedding` column - Facilitates advanced similarity search
    - Parameters: `m=16, ef_construction=64`
    - Optimization: Cosine similarity search

## Embeddings Model

Blaize Bazaar uses [Amazon Titan Text Embeddings V2](https://aws.amazon.com/bedrock/titan/)

## Technical Architecture

![Building AI-Powered Semantic Product Search with pgvector and Amazon Bedrock](../static/Product_Catalog.png)

**Customer Search Experience**:

1. Convert product descriptions to embeddings (Amazon Bedrock)
2. Store and index product information efficiently (Aurora PostgreSQL + pgvector)
3. Convert customer's search queries to embeddings
4. Return personalized product recommendations

## Setup

First, let's install the required Python packages. You can safely disregard the warning *"Note: you may need to restart the kernel to use updated packages."*

In [1]:
# Install all the required prerequisite libraries in a single command to better handle dependencies
%pip install setuptools==65.5.0 httpx>=0.25.0 "psycopg[binary]" pgvector pandarallel boto3 tqdm "numpy<2.0"

Note: you may need to restart the kernel to use updated packages.


## Load Required Libraries

In [2]:
import pandas as pd
import numpy as np
import boto3
import json
import psycopg
from pgvector.psycopg import register_vector
from pandarallel import pandarallel
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')

# Initialize Bedrock client
bedrock_runtime = boto3.client('bedrock-runtime')

print("Required libraries setup complete ✅ ")

Required libraries setup complete ✅ 


## Aurora PostgreSQL Database Setup

Set up PostgreSQL with the pgvector extension and create our product catalog table

In [8]:
# Get database credentials from Secrets Manager
client = boto3.client('secretsmanager')
response = client.get_secret_value(SecretId='apgpg-pgvector-secret')
database_secrets = json.loads(response['SecretString'])

# Set up database connection parameters
dbhost = database_secrets['host']
dbport = database_secrets['port']
dbuser = database_secrets['username']
dbpass = database_secrets['password']

def setup_database():
    """Set up database schema and tables"""
    conn = psycopg.connect(
        host=dbhost,
        port=dbport,
        user=dbuser,
        password=dbpass,
        autocommit=True
    )

    # Enable vector extension
    conn.execute("CREATE EXTENSION IF NOT EXISTS vector;")
    register_vector(conn)

    # Create schema
    conn.execute("CREATE SCHEMA IF NOT EXISTS bedrock_integration;")

    # Drop existing table if needed
    conn.execute("DROP TABLE IF EXISTS bedrock_integration.product_catalog;")

    # Create products table
    conn.execute("""
    CREATE TABLE IF NOT EXISTS bedrock_integration.product_catalog (
        \"productId\" VARCHAR(255) PRIMARY KEY,
        product_description TEXT,
        imgurl TEXT,
        producturl TEXT,
        stars NUMERIC,
        reviews INT,
        price NUMERIC,
        category_id INT,
        isbestseller BOOLEAN,
        boughtinlastmonth INT,
        category_name VARCHAR(255),
        quantity INT,
        embedding vector(1024)
    );
    """)

    print(f"Connection info: host={dbhost}, port={dbport}, user={dbuser}")
    print("Database setup complete ✅")
    conn.close()

setup_database()

Connection info: host=apgpg-pgvector.cluster-cryu0q602vhy.us-west-2.rds.amazonaws.com, port=5432, user=postgres
Database setup complete ✅


## Load Product Catalog Data

Load and preprocess the product catalog data

In [5]:
# Load product data
print("Loading product data...")
df = pd.read_csv('../datasets/product_catalog.csv')

# Clean up missing values
df = df.dropna(subset=['product_description'])
df = df.fillna({
    'stars': 0,
    'reviews': 0,
    'price': 0,
    'category_id': 0,
    'isbestseller': False,
    'boughtinlastmonth': 0,
    'category_name': 'Unknown',
    'quantity': 0
})

print(f"Loaded {len(df)} products")
df.head(5)

Loading product data...
Loaded 21704 products


Unnamed: 0,productId,product_description,imgurl,producturl,stars,reviews,price,category_id,isbestseller,boughtinlastmonth,category_name,quantity
0,B0CD1983NZ,"IMAXTOP Selfie Light, RGB Video Light with 78 ...",https://m.media-amazon.com/images/I/61RBZcO-vy...,https://www.amazon.com/dp/B0CD1983NZ,5.0,0,32.99,76,False,0,Accessories & Supplies,57
1,B0CCY8S9YB,Phone Camera Lens Kit Upgraded Version Telepho...,https://m.media-amazon.com/images/I/71yG7w3Cqa...,https://www.amazon.com/dp/B0CCY8S9YB,5.0,0,39.99,76,False,0,Accessories & Supplies,97
2,B0CJBKTMVJ,"iPhone Charger Fast Charging, USB C to Lightni...",https://m.media-amazon.com/images/I/61Dvn0ycei...,https://www.amazon.com/dp/B0CJBKTMVJ,5.0,0,6.99,76,False,0,Accessories & Supplies,74
3,B0CCD7V85M,"Stylus Pen for iPad 2018-2023, 15mins Fast Cha...",https://m.media-amazon.com/images/I/617SUawrV5...,https://www.amazon.com/dp/B0CCD7V85M,5.0,0,16.99,76,False,0,Accessories & Supplies,84
4,B0CG197TYR,"TechMatte USB C to Lightning Adapter, [2 Pack]...",https://m.media-amazon.com/images/I/61EMPUb-OC...,https://www.amazon.com/dp/B0CG197TYR,5.0,0,8.99,76,False,0,Accessories & Supplies,20


## Generate Embeddings

Generate embeddings using Amazon Bedrock's Titan model

In [7]:
# Run time: ~ 3 mins
def generate_embedding(text):
    """Generate embedding for a single text using Amazon Titan Text v2"""
    try:
        payload = json.dumps({'inputText': text})
        response = bedrock_runtime.invoke_model(
            body=payload,
            modelId='amazon.titan-embed-text-v2:0',
            accept="application/json",
            contentType="application/json"
        )
        response_body = json.loads(response.get("body").read())
        return response_body.get("embedding")
    except Exception as e:
        print(f"Error generating embedding: {str(e)}")
        return None

# Initialize parallel processing
print("\nGenerating embeddings for product descriptions...")
pandarallel.initialize(progress_bar=True, nb_workers=10)

# Generate embeddings
df['embedding'] = df['product_description'].parallel_apply(generate_embedding)

print("\nCompleted embedding generation")


Generating embeddings for product descriptions...
INFO: Pandarallel will run on 10 workers.
INFO: Pandarallel will use Memory file system to transfer data between the main process and workers.


VBox(children=(HBox(children=(IntProgress(value=0, description='0.00%', max=2171), Label(value='0 / 2171'))), …


Completed embedding generation


## Store in Database

Store the products and their embeddings in Aurora PostgreSQL:

In [10]:
# Run time: ~ 2 mins
def store_products():
    """Store products in database with batch processing and statistics"""
    import time
    start_time = time.time()
    batch_size = 1000

    conn = psycopg.connect(
        host=dbhost,
        port=dbport,
        user=dbuser,
        password=dbpass,
        autocommit=True
    )

    print(f"Storing products in database... Total rows to process: {len(df)}")
    conn.execute("""TRUNCATE TABLE bedrock_integration.product_catalog;""")
    try:
        with conn.cursor() as cur:
            batches = []
            total_processed = 0

            # Process data in batches
            for i, (_, row) in enumerate(df.iterrows(), 1):
                batches.append((
                    row['productId'],
                    row['product_description'],
                    row['imgurl'],
                    row['producturl'],
                    row['stars'],
                    row['reviews'],
                    row['price'],
                    row['category_id'],
                    row['isbestseller'],
                    row['boughtinlastmonth'],
                    row['category_name'],
                    row['quantity'],
                    row['embedding']
                ))

                # When batch size is reached or at the end, process the batch
                if len(batches) == batch_size or i == len(df):
                    batch_start = time.time()

                    cur.executemany("""
                    INSERT INTO bedrock_integration.product_catalog (
                        "productId", product_description, imgurl, producturl,
                        stars, reviews, price, category_id, isbestseller,
                        boughtinlastmonth, category_name, quantity, embedding
                    ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                    ON CONFLICT ("productId") DO UPDATE 
                    SET 
                        product_description = EXCLUDED.product_description,
                        imgurl = EXCLUDED.imgurl,
                        producturl = EXCLUDED.producturl,
                        stars = EXCLUDED.stars,
                        reviews = EXCLUDED.reviews,
                        price = EXCLUDED.price,
                        category_id = EXCLUDED.category_id,
                        isbestseller = EXCLUDED.isbestseller,
                        boughtinlastmonth = EXCLUDED.boughtinlastmonth,
                        category_name = EXCLUDED.category_name,
                        quantity = EXCLUDED.quantity,
                        embedding = EXCLUDED.embedding;
                    """, batches)

                    total_processed += len(batches)
                    batch_time = time.time() - batch_start
                    elapsed_total = time.time() - start_time

                    # Calculate progress and estimated time remaining
                    progress = (total_processed / len(df)) * 100
                    avg_time_per_batch = elapsed_total / (total_processed / batch_size)
                    remaining_batches = (len(df) - total_processed) / batch_size
                    eta = remaining_batches * avg_time_per_batch

                    print(f"\rProgress: {progress:.1f}% | Processed: {total_processed}/{len(df)} rows | "
                          f"Batch time: {batch_time:.2f}s | ETA: {eta:.0f}s", end="")

                    batches = []

            # Create HNSW index
            conn.execute("""
            CREATE INDEX IF NOT EXISTS product_catalog_embedding_idx 
            ON bedrock_integration.product_catalog 
            USING hnsw (embedding vector_cosine_ops)
            WITH  (m = 16, ef_construction = 64);
            """)

            # 2. Full-text search index (GIN) - Critical for performance
            print("Creating full-text search index (GIN)...")
            conn.execute("""CREATE INDEX IF NOT EXISTS product_catalog_fts_idx 
                              ON bedrock_integration.product_catalog
                              USING GIN (to_tsvector('english', coalesce(product_description, '')));
                        """)
            
            # 3. Category index for filtered searches
            print("Creating category index...")
            conn.execute("""CREATE INDEX IF NOT EXISTS product_catalog_category_idx 
                               ON bedrock_integration.product_catalog(category_name) 
                               WHERE category_name IS NOT NULL;""")

            print("\n\nRunning VACUUM ANALYZE...")
            cur.execute("VACUUM ANALYZE bedrock_integration.product_catalog;")

            # Get final statistics
            cur.execute("SELECT COUNT(*) FROM bedrock_integration.product_catalog")
            final_count = cur.fetchone()[0]

            end_time = time.time()
            total_time = end_time - start_time

            print("\n📊 Data Loading Statistics:")
            print(f"✓ Total rows loaded: {final_count:,}")
            print(f"✓ Total loading time: {total_time:.2f} seconds")
            print(f"✓ Average time per row: {(total_time/len(df))*1000:.2f} ms")
            print(f"✓ Average time per batch: {(total_time/(len(df)/batch_size)):.2f} seconds")
            print("\n✅ Products stored successfully in database")

    except Exception as e:
        print(f"\n❌ Error storing products: {str(e)}")
        raise
    finally:
        conn.close()

# Load data with embeddings into the table
store_products()
print("\nPart 1 Complete: Setup and data loading finished! ✅")

Storing products in database... Total rows to process: 21704
Progress: 100.0% | Processed: 21704/21704 rows | Batch time: 2.08s | ETA: 0sCreating full-text search index (GIN)...
Creating category index...


Running VACUUM ANALYZE...

📊 Data Loading Statistics:
✓ Total rows loaded: 21,704
✓ Total loading time: 71.20 seconds
✓ Average time per row: 3.28 ms
✓ Average time per batch: 3.28 seconds

✅ Products stored successfully in database

Part 1 Complete: Setup and data loading finished! ✅
