# Lab 1: Building AI-Powered Semantic Product Search with pgvector and Amazon Bedrock
### Advanced Configuration, Optimized Data Ingestion & Interactive Discovery
---

## Contents

1. [Background](#Background)
2. [Architecture](#Architecture)
3. [Setup](#Setup)
4. [Optimized Data Loading](#Optimized-Data-Loading)
5. [Performance Analysis](#Performance-Analysis)
6. [Interactive Semantic Search](#Interactive-Semantic-Search)
7. [Advanced Discovery](#Advanced-Discovery)

## Background

This lab demonstrates Blaize Bazaar's production-grade semantic search implementation using:

- **Vector Search**: Amazon Titan Embeddings V2 (1024-dim) + pgvector HNSW indexing
- **Database**: Aurora PostgreSQL with optimized connection pooling
- **Scale**: 21,704 products with sub-50ms p95 query latency

### Technical Architecture

![Semantic Search Architecture](../static/Product_Catalog.png)

**Data Flow**:
1. Parallel embedding generation (Amazon Bedrock)
2. Batch insertion with conflict resolution (Aurora PostgreSQL)
3. HNSW index creation (pgvector with m=16, ef_construction=64)
4. Real-time similarity search (cosine distance < 0.2 threshold)

## Setup

Install dependencies with pinned versions for reproducibility:

In [None]:
"""
DAT406 Workshop - Environment Setup
====================================
Installing required packages:
  • Data Science: pandas, numpy, matplotlib, seaborn
  • AWS Services: boto3
  • Database: psycopg, pgvector
  • Utilities: tqdm, httpx, sqlparse
"""

%pip install -q -r requirements.txt
print("✅ Setup complete! Ready to start the workshop.")

## Import Libraries & Initialize Services

In [None]:
import pandas as pd
import numpy as np
import boto3
import json
import psycopg
from psycopg_pool import ConnectionPool
from pgvector.psycopg import register_vector
from pandarallel import pandarallel
from tqdm.notebook import tqdm
import time
from datetime import datetime
import ipywidgets as widgets
from IPython.display import display, HTML, clear_output
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
warnings.filterwarnings('ignore')

# Configure visualization
sns.set_style('darkgrid')
plt.rcParams['figure.figsize'] = (12, 6)

# Initialize Bedrock client with retry configuration
bedrock_runtime = boto3.client(
    'bedrock-runtime',
    config=boto3.session.Config(
        retries={'max_attempts': 3, 'mode': 'adaptive'}
    )
)

print("✅ Libraries initialized")
print(f"⏰ Session started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

## Database Configuration

Initialize connection pool for optimized throughput:

In [None]:
# Retrieve credentials from Secrets Manager
client = boto3.client('secretsmanager')
response = client.get_secret_value(SecretId='apgpg-pgvector-secret')
db_secrets = json.loads(response['SecretString'])

# Database connection string
DB_CONNINFO = f"host={db_secrets['host']} port={db_secrets['port']} " \
              f"user={db_secrets['username']} password={db_secrets['password']} dbname=postgres"

# Initialize connection pool (10 connections for parallel operations)
pool = ConnectionPool(
    conninfo=DB_CONNINFO,
    min_size=5,
    max_size=10,
    timeout=30,
    max_waiting=20
)

print(f"✅ Connection pool initialized: {db_secrets['host']}")
print(f"   Pool size: 5-10 connections | Timeout: 30s")

## Schema Setup with Optimized Indexes

In [None]:
def setup_database():
    """Create schema with performance-optimized indexes"""
    with pool.connection() as conn:
        with conn.cursor() as cur:
            # Enable extensions
            cur.execute("CREATE EXTENSION IF NOT EXISTS vector;")
            register_vector(conn)
            
            # Create schema
            cur.execute("CREATE SCHEMA IF NOT EXISTS bedrock_integration;")
            
            # Drop existing table for clean slate
            cur.execute("DROP TABLE IF EXISTS bedrock_integration.product_catalog CASCADE;")
            
            # Create table with optimized data types
            cur.execute("""
                CREATE TABLE bedrock_integration.product_catalog (
                    "productId" VARCHAR(255) PRIMARY KEY,
                    product_description TEXT NOT NULL,
                    imgurl TEXT,
                    producturl TEXT,
                    stars NUMERIC(3,2),
                    reviews INTEGER,
                    price NUMERIC(10,2),
                    category_id INTEGER,
                    isbestseller BOOLEAN DEFAULT FALSE,
                    boughtinlastmonth INTEGER,
                    category_name VARCHAR(255),
                    quantity INTEGER DEFAULT 0,
                    embedding vector(1024),
                    created_at TIMESTAMP DEFAULT NOW()
                );
            """)
            
            conn.commit()
            
    print("✅ Database schema created")

setup_database()

## Load & Validate Product Data

In [None]:
# Load dataset
df = pd.read_csv('../data/amazon-products-sample.csv')

# Data quality checks
initial_count = len(df)
df = df.dropna(subset=['product_description'])
df = df.drop_duplicates(subset=['productId'])

# Fill missing values with sensible defaults
df = df.fillna({
    'stars': 0.0,
    'reviews': 0,
    'price': 0.0,
    'category_id': 0,
    'isbestseller': False,
    'boughtinlastmonth': 0,
    'category_name': 'Uncategorized',
    'quantity': 0,
    'imgurl': '',
    'producturl': ''
})

# Truncate long descriptions for embedding efficiency
df['product_description'] = df['product_description'].str[:2000]

print(f"📊 Data Quality Report:")
print(f"   Initial rows: {initial_count:,}")
print(f"   Valid products: {len(df):,}")
print(f"   Categories: {df['category_name'].nunique()}")
print(f"   Avg price: ${df['price'].mean():.2f}")
print(f"   Avg rating: {df['stars'].mean():.2f}/5.0")

df.head(3)

## Parallel Embedding Generation

Generate 1024-dimensional embeddings using Amazon Titan V2 with parallel processing:

In [None]:
def generate_embedding(text):
    """Generate Titan v2 embedding with error handling"""
    try:
        response = bedrock_runtime.invoke_model(
            body=json.dumps({
                'inputText': str(text)[:2000],  # Truncate for API limits
                'dimensions': 1024,
                'normalize': True
            }),
            modelId='amazon.titan-embed-text-v2:0',
            accept="application/json",
            contentType="application/json"
        )
        return json.loads(response['body'].read())['embedding']
    except Exception as e:
        print(f"⚠️  Embedding error: {str(e)[:50]}")
        return [0.0] * 1024  # Return zero vector on error

# Initialize parallel processing (10 workers for optimal throughput)
pandarallel.initialize(progress_bar=True, nb_workers=10, verbose=0)

# Generate embeddings in parallel
print("🔄 Generating embeddings... (ETA: ~3 minutes)")
start = time.time()

df['embedding'] = df['product_description'].parallel_apply(generate_embedding)

elapsed = time.time() - start
print(f"\n✅ Embeddings generated in {elapsed:.1f}s ({len(df)/elapsed:.1f} products/sec)")

## Optimized Batch Insertion

Insert data using `executemany` with UPSERT for idempotency:

In [None]:
def batch_insert_products(df, batch_size=1000):
    """Optimized batch insertion with progress tracking"""
    start_time = time.time()
    
    with pool.connection() as conn:
        with conn.cursor() as cur:
            # Prepare batches
            batches = []
            for _, row in df.iterrows():
                batches.append(tuple([
                    row['productId'],
                    row['product_description'],
                    row['imgurl'],
                    row['producturl'],
                    float(row['stars']),
                    int(row['reviews']),
                    float(row['price']),
                    int(row['category_id']),
                    bool(row['isbestseller']),
                    int(row['boughtinlastmonth']),
                    row['category_name'],
                    int(row['quantity']),
                    row['embedding']
                ]))
            
            # Execute batch insert with UPSERT
            insert_sql = """
                INSERT INTO bedrock_integration.product_catalog 
                ("productId", product_description, imgurl, producturl, stars, reviews, 
                 price, category_id, isbestseller, boughtinlastmonth, category_name, 
                 quantity, embedding)
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                ON CONFLICT ("productId") DO UPDATE SET
                    product_description = EXCLUDED.product_description,
                    embedding = EXCLUDED.embedding
            """
            
            # Process in chunks for progress feedback
            with tqdm(total=len(batches), desc="Inserting batches") as pbar:
                for i in range(0, len(batches), batch_size):
                    chunk = batches[i:i+batch_size]
                    cur.executemany(insert_sql, chunk)
                    conn.commit()
                    pbar.update(len(chunk))
            
            # Verify insertion
            cur.execute("SELECT COUNT(*) FROM bedrock_integration.product_catalog")
            count = cur.fetchone()[0]
            
    elapsed = time.time() - start_time
    print(f"\n✅ Inserted {count:,} products in {elapsed:.1f}s ({count/elapsed:.0f} rows/sec)")
    return count

batch_insert_products(df)

## Create Performance Indexes

Build HNSW vector index + supporting indexes for optimal query performance:

In [None]:
def create_indexes():
    """Create optimized indexes with timing"""
    with pool.connection() as conn:
        with conn.cursor() as cur:
            indexes = [
                ("HNSW Vector Index", """
                    CREATE INDEX IF NOT EXISTS idx_product_embedding_hnsw 
                    ON bedrock_integration.product_catalog 
                    USING hnsw (embedding vector_cosine_ops)
                    WITH (m = 16, ef_construction = 64)
                """),
                ("Full-Text Search (GIN)", """
                    CREATE INDEX IF NOT EXISTS idx_product_fts 
                    ON bedrock_integration.product_catalog
                    USING GIN (to_tsvector('english', product_description))
                """),
                ("Category B-Tree", """
                    CREATE INDEX IF NOT EXISTS idx_product_category 
                    ON bedrock_integration.product_catalog(category_name)
                """),
                ("Price Range", """
                    CREATE INDEX IF NOT EXISTS idx_product_price 
                    ON bedrock_integration.product_catalog(price) 
                    WHERE price > 0
                """)
            ]
            
            for name, sql in indexes:
                start = time.time()
                cur.execute(sql)
                elapsed = time.time() - start
                print(f"✅ {name}: {elapsed:.2f}s")
            
            # Update statistics
            cur.execute("VACUUM ANALYZE bedrock_integration.product_catalog")
            conn.commit()
            
    print("\n✅ All indexes created and analyzed")

create_indexes()

## Performance Benchmarking

Analyze index performance and query latency:

In [None]:
def benchmark_search():
    """Benchmark vector search performance"""
    test_queries = [
        "wireless noise cancelling headphones",
        "professional gaming laptop",
        "smart home security camera",
        "fitness tracker waterproof",
        "portable bluetooth speaker"
    ]
    
    latencies = []
    
    with pool.connection() as conn:
        with conn.cursor() as cur:
            for query in tqdm(test_queries, desc="Benchmarking"):
                # Generate query embedding
                query_embedding = generate_embedding(query)
                
                # Execute similarity search with timing
                start = time.time()
                cur.execute("""
                    SELECT "productId", product_description, 
                           1 - (embedding <=> %s::vector) as similarity
                    FROM bedrock_integration.product_catalog
                    WHERE embedding IS NOT NULL
                    ORDER BY embedding <=> %s::vector
                    LIMIT 10
                """, (query_embedding, query_embedding))
                
                results = cur.fetchall()
                latency = (time.time() - start) * 1000  # Convert to ms
                latencies.append(latency)
    
    # Display statistics
    print(f"\n📊 Search Performance (n={len(test_queries)}):")
    print(f"   Mean latency: {np.mean(latencies):.1f}ms")
    print(f"   P50 latency: {np.percentile(latencies, 50):.1f}ms")
    print(f"   P95 latency: {np.percentile(latencies, 95):.1f}ms")
    print(f"   P99 latency: {np.percentile(latencies, 99):.1f}ms")
    
    # Visualize latency distribution
    plt.figure(figsize=(10, 4))
    plt.bar(range(len(latencies)), latencies, color='#ba68c8')
    plt.axhline(np.mean(latencies), color='#6a1b9a', linestyle='--', label=f'Mean: {np.mean(latencies):.1f}ms')
    plt.xlabel('Query')
    plt.ylabel('Latency (ms)')
    plt.title('Vector Search Latency Distribution')
    plt.legend()
    plt.tight_layout()
    plt.show()

benchmark_search()

## Interactive Semantic Search Widget

Explore the product catalog with real-time semantic search:

In [None]:
# Create interactive UI components
search_input = widgets.Text(
    value='wireless headphones',
    placeholder='Enter search query...',
    description='Search:',
    style={'description_width': '80px'},
    layout=widgets.Layout(width='60%')
)

limit_slider = widgets.IntSlider(
    value=5,
    min=1,
    max=20,
    step=1,
    description='Results:',
    style={'description_width': '80px'},
    layout=widgets.Layout(width='40%')
)

search_button = widgets.Button(
    description='🔍 Search',
    button_style='primary',
    layout=widgets.Layout(width='150px')
)

output = widgets.Output()

def perform_search(b):
    """Execute semantic search and display results"""
    with output:
        clear_output()
        query = search_input.value
        limit = limit_slider.value
        
        if not query.strip():
            print("⚠️  Please enter a search query")
            return
        
        print(f"🔍 Searching for: '{query}'\n")
        
        # Generate query embedding
        query_embedding = generate_embedding(query)
        
        # Execute search
        with pool.connection() as conn:
            with conn.cursor() as cur:
                start = time.time()
                cur.execute("""
                    SELECT 
                        "productId",
                        product_description,
                        price,
                        stars,
                        reviews,
                        category_name,
                        1 - (embedding <=> %s::vector) as similarity
                    FROM bedrock_integration.product_catalog
                    WHERE embedding IS NOT NULL
                    ORDER BY embedding <=> %s::vector
                    LIMIT %s
                """, (query_embedding, query_embedding, limit))
                
                results = cur.fetchall()
                latency = (time.time() - start) * 1000
        
        # Display results
        print(f"⚡ Found {len(results)} results in {latency:.1f}ms\n")
        
        for i, row in enumerate(results, 1):
            pid, desc, price, stars, reviews, cat, sim = row
            print(f"{'='*80}")
            print(f"#{i} | Similarity: {sim:.3f} ({sim*100:.1f}%)")
            print(f"Product: {desc[:80]}...")
            print(f"Price: ${price:.2f} | Rating: {stars:.1f}⭐ ({reviews:,} reviews)")
            print(f"Category: {cat} | ID: {pid}")
        
        print(f"\n{'='*80}")

search_button.on_click(perform_search)

# Display UI
display(widgets.VBox([
    widgets.HTML("<h3>🔍 Interactive Semantic Search</h3>"),
    widgets.HBox([search_input, limit_slider]),
    search_button,
    output
]))

## Advanced Discovery: Category Analysis

Analyze embedding distribution across product categories:

In [None]:
def analyze_categories():
    """Analyze product distribution and embeddings by category"""
    with pool.connection() as conn:
        # Get category statistics
        query = """
            SELECT 
                category_name,
                COUNT(*) as product_count,
                AVG(price) as avg_price,
                AVG(stars) as avg_rating,
                SUM(reviews) as total_reviews
            FROM bedrock_integration.product_catalog
            WHERE category_name != 'Uncategorized'
            GROUP BY category_name
            ORDER BY product_count DESC
            LIMIT 10
        """
        df_cat = pd.read_sql_query(query, conn)
    
    # Visualize top categories
    fig, axes = plt.subplots(1, 2, figsize=(14, 5))
    
    # Product count by category
    axes[0].barh(df_cat['category_name'], df_cat['product_count'], color='#ba68c8')
    axes[0].set_xlabel('Product Count')
    axes[0].set_title('Top 10 Categories by Product Count')
    axes[0].invert_yaxis()
    
    # Average price by category
    axes[1].barh(df_cat['category_name'], df_cat['avg_price'], color='#6a1b9a')
    axes[1].set_xlabel('Average Price ($)')
    axes[1].set_title('Average Price by Category')
    axes[1].invert_yaxis()
    
    plt.tight_layout()
    plt.show()
    
    return df_cat

df_categories = analyze_categories()
display(df_categories)

## Similarity Threshold Analysis

Determine optimal similarity thresholds for precision/recall:

In [None]:
def analyze_similarity_distribution():
    """Analyze similarity score distribution for threshold tuning"""
    test_query = "wireless bluetooth headphones"
    query_embedding = generate_embedding(test_query)
    
    with pool.connection() as conn:
        # Get similarity distribution
        query = """
            SELECT 
                1 - (embedding <=> %s::vector) as similarity,
                category_name
            FROM bedrock_integration.product_catalog
            WHERE embedding IS NOT NULL
            ORDER BY similarity DESC
            LIMIT 1000
        """
        df_sim = pd.read_sql_query(query, conn, params=(query_embedding,))
    
    # Plot similarity distribution
    plt.figure(figsize=(12, 5))
    
    plt.subplot(1, 2, 1)
    plt.hist(df_sim['similarity'], bins=50, color='#ba68c8', alpha=0.7, edgecolor='black')
    plt.axvline(0.8, color='#6a1b9a', linestyle='--', label='Threshold: 0.8')
    plt.xlabel('Similarity Score')
    plt.ylabel('Frequency')
    plt.title(f'Similarity Distribution for: "{test_query}"')
    plt.legend()
    
    plt.subplot(1, 2, 2)
    top_categories = df_sim.head(50)['category_name'].value_counts().head(10)
    top_categories.plot(kind='barh', color='#8e24aa')
    plt.xlabel('Count in Top 50 Results')
    plt.title('Category Distribution in Top Results')
    
    plt.tight_layout()
    plt.show()
    
    print(f"\n📊 Similarity Statistics:")
    print(f"   Mean: {df_sim['similarity'].mean():.3f}")
    print(f"   Median: {df_sim['similarity'].median():.3f}")
    print(f"   90th percentile: {df_sim['similarity'].quantile(0.9):.3f}")

analyze_similarity_distribution()

## Summary & Next Steps

### ✅ Completed:
- Loaded and validated 21,704 products
- Generated 1024-dim embeddings (Titan V2)
- Created HNSW index (m=16, ef_construction=64)
- Achieved <50ms p95 query latency
- Built interactive search interface

### 📈 Performance Metrics:
- Embedding generation: ~3 minutes (parallel)
- Batch insertion: ~2 minutes (1000 rows/batch)
- Index creation: <30 seconds
- Search latency: <50ms p95

### 🔜 Next Steps:
1. Implement hybrid search (vector + keyword)
2. Add re-ranking with cross-encoders
3. Deploy real-time recommendation system
4. Set up A/B testing framework

In [None]:
# Cleanup: Close connection pool
pool.close()
print("✅ Session cleanup complete")