## Importing Dependencies

In [30]:
import os
import time
import sqlite3
import joblib
from pathlib import Path
from datetime import datetime, timezone
from tqdm import tqdm
from sqlalchemy import create_engine, text
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.cluster import MiniBatchKMeans
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.ensemble import IsolationForest
from sklearn.metrics import silhouette_score
import urllib.request
from scipy.spatial.distance import pdist, squareform
import chromadb
from chromadb.config import Settings

# Set style for plots
plt.style.use('default')
sns.set_palette("husl")

print("✓ All dependencies imported successfully")

✓ All dependencies imported successfully


## Configuring Directories and Processing Parameters

In [2]:
class Config:
    # Directories
    DATA_DIR = Path("./poc_data")
    LOG_DIR = DATA_DIR / "event_log"
    OFFLINE_FEATURE_DIR = DATA_DIR / "offline_features"
    MODEL_DIR = DATA_DIR / "models"
    VECTOR_DB_DIR = DATA_DIR / "vector_db"
    
    # Files
    SQLITE_DB = DATA_DIR / "feature_store.db"
    HISTORICAL_DATA = DATA_DIR / "historical_full.parquet"
    AI_METRICS_FILE = DATA_DIR / "ai_metrics.parquet"
    
    # Processing parameters
    CHUNK_SIZE = 100000
    TOTAL_SAMPLES = 2000000  # Reduced for 16GB RAM
    RANDOM_SEED = 42
    INITIAL_K = 6
    REPROCESS_K = 8
    EMBEDDING_DIM = 128  # Reduced dimensionality for memory efficiency
    
    # NYC Taxi Data URL
    TAXI_DATA_URL = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet"

# Initialize configuration
config = Config()

print("✓ Configuration initialized")
print(f"  - Data directory: {config.DATA_DIR}")
print(f"  - Chunk size: {config.CHUNK_SIZE:,}")
print(f"  - Total samples: {config.TOTAL_SAMPLES:,}")
print(f"  - Initial K clusters: {config.INITIAL_K}")


✓ Configuration initialized
  - Data directory: poc_data
  - Chunk size: 100,000
  - Total samples: 2,000,000
  - Initial K clusters: 6


## Directory Setup

In [None]:
def setup_directories():
    """Create all necessary directories"""
    directories = [
        config.DATA_DIR, config.LOG_DIR, config.OFFLINE_FEATURE_DIR,
        config.MODEL_DIR, config.VECTOR_DB_DIR
    ]
    
    for directory in directories:
        directory.mkdir(parents=True, exist_ok=True)
    
    print("✓ All directories created")

# Execute setup
setup_directories()

# Display directory structure
print("\nDirectory structure created:")
for directory in [config.DATA_DIR, config.LOG_DIR, config.OFFLINE_FEATURE_DIR, config.MODEL_DIR, config.VECTOR_DB_DIR]:
    print(f"{directory}")


✓ All directories created

Directory structure created:
  📁 poc_data
  📁 poc_data\event_log
  📁 poc_data\offline_features
  📁 poc_data\models
  📁 poc_data\vector_db


## Feature Engineering & Loading Taxi Data

In [4]:
def create_taxi_features(df):
    """Create features suitable for clustering from taxi data"""
    # Filter reasonable values
    df = df[
        (df.trip_distance > 0) & 
        (df.trip_distance < 100) &
        (df.total_amount > 0) &
        (df.total_amount < 200)
    ].copy()
    
    # Create time-based features
    df['pickup_hour'] = df.tpep_pickup_datetime.dt.hour
    df['pickup_day'] = df.tpep_pickup_datetime.dt.dayofweek
    df['trip_duration'] = (df.tpep_dropoff_datetime - df.tpep_pickup_datetime).dt.total_seconds() / 60
    
    # Filter out unrealistic durations
    df = df[(df.trip_duration > 0) & (df.trip_duration < 180)]
    
    # Create geospatial features
    df['pickup_location'] = (df['PULocationID'] % 10).astype('category').cat.codes
    df['dropoff_location'] = (df['DOLocationID'] % 10).astype('category').cat.codes
    
    # Select features for clustering
    feature_cols = [
        'trip_distance', 'total_amount', 'pickup_hour', 
        'pickup_day', 'trip_duration', 'passenger_count',
        'pickup_location', 'dropoff_location'
    ]
    
    # Add unique ID
    df['trip_id'] = [f"trip_{i}" for i in range(len(df))]
    
    return df[['trip_id'] + feature_cols], feature_cols

def optimize_dataframe(df):
    """Reduce memory usage of DataFrame"""
    # Downcast numeric types
    for col in df.select_dtypes(include=['int64']).columns:
        df[col] = pd.to_numeric(df[col], downcast='integer')
    
    for col in df.select_dtypes(include=['float64']).columns:
        df[col] = pd.to_numeric(df[col], downcast='float')
    
    # Convert objects to category where possible
    for col in df.select_dtypes(include=['object']).columns:
        if df[col].nunique() / len(df) < 0.5:  # If cardinality < 50%
            df[col] = df[col].astype('category')
    
    return df

def load_and_preprocess_data():
    """Load and preprocess NYC taxi data"""
    print("Loading NYC taxi data...")
    
    # Download data if not exists
    local_data_path = config.DATA_DIR / "nyc_taxi_data.parquet"
    
    if not local_data_path.exists():
        print("  Downloading data from NYC OpenData...")
        df = pd.read_parquet(config.TAXI_DATA_URL)
        df.to_parquet(local_data_path)
        print("  ✓ Data downloaded and cached locally")
    else:
        print("  ✓ Loading cached data...")
        df = pd.read_parquet(local_data_path)
    
    print(f"  Original dataset size: {len(df):,} records")
    
    # Drop duplicates
    df = df.drop_duplicates()
    print(f"  Dataset size after dropping duplicates: {len(df):,} records")
    
    # Drop rows with NaNs
    df = df.dropna()
    print(f"  Dataset size after dropping NaNs: {len(df):,} records")
    
    # Sample to fit memory constraints
    if len(df) > config.TOTAL_SAMPLES:
        df = df.sample(config.TOTAL_SAMPLES, random_state=config.RANDOM_SEED)
        print(f"  Sampled to: {len(df):,} records")
    
    # Create features for clustering
    df, feature_cols = create_taxi_features(df)
    
    # Optimize memory usage
    df = optimize_dataframe(df)
    
    # Save historical data
    df.to_parquet(config.HISTORICAL_DATA, index=False)
    
    return df, feature_cols


# Execute data loading
df, feature_cols = load_and_preprocess_data()

print(f"\n✓ Data preprocessing complete!")
print(f"  Final dataset size: {len(df):,} records")
print(f"  Number of features: {len(feature_cols)}")
print(f"  Feature columns: {feature_cols}")
print(f"  Memory usage: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")

# Display sample data
print(f"\nSample data:")
print(df.head())

Loading NYC taxi data...
  ✓ Loading cached data...
  Original dataset size: 3,066,766 records
  Dataset size after dropping duplicates: 3,066,766 records
  Dataset size after dropping NaNs: 2,995,023 records
  Sampled to: 2,000,000 records

✓ Data preprocessing complete!
  Final dataset size: 1,956,509 records
  Number of features: 8
  Feature columns: ['trip_distance', 'total_amount', 'pickup_hour', 'pickup_day', 'trip_duration', 'passenger_count', 'pickup_location', 'dropoff_location']
  Memory usage: 176.20 MB

Sample data:
        trip_id  trip_distance  total_amount  pickup_hour  pickup_day  \
2858416  trip_0       0.400000     10.800000           15           0   
873744   trip_1       0.600000     12.950000           21           1   
2361819  trip_2       2.100000     29.900000           18           2   
1703896  trip_3      16.299999     96.650002            9           3   
475582   trip_4       1.740000     22.000000           17           4   

         trip_duration  pas

## Event Log Creation

In [5]:
def create_event_log(df):
    """Create immutable event log in chunks"""
    # Save historical data
    df.to_parquet(config.HISTORICAL_DATA, index=False)
    
    # Create log chunks
    print(f"Creating event log chunks in {config.LOG_DIR}")
    num_chunks = int(np.ceil(len(df) / config.CHUNK_SIZE))
    
    for i in tqdm(range(num_chunks), desc="Creating chunks"):
        start = i * config.CHUNK_SIZE
        end = min((i+1) * config.CHUNK_SIZE, len(df))
        chunk = df.iloc[start:end]
        
        # Save as parquet for efficiency
        fname = config.LOG_DIR / f"events_{i:05d}.parquet"
        chunk.to_parquet(fname, index=False)
    
    print(f"✓ Created {num_chunks} event chunks")
    return num_chunks, df.columns.tolist()

# Execute event log creation
num_chunks, all_cols = create_event_log(df)

print(f"\n✓ Event log created!")
print(f"  Number of chunks: {num_chunks}")
print(f"  Chunk size: {config.CHUNK_SIZE:,} records")
print(f"  Total columns: {len(all_cols)}")

# Verify chunks were created
chunk_files = list(config.LOG_DIR.glob("events_*.parquet"))
print(f"  Chunk files created: {len(chunk_files)}")

Creating event log chunks in poc_data\event_log


Creating chunks: 100%|██████████| 20/20 [00:00<00:00, 23.54it/s]

✓ Created 20 event chunks

✓ Event log created!
  Number of chunks: 20
  Chunk size: 100,000 records
  Total columns: 9
  Chunk files created: 20





## Train Clustering Model

In [6]:
def train_model(df, feature_cols, k=None, model_name=None):
    """Train clustering model and save to disk"""
    k = k or config.INITIAL_K
    model_name = model_name or f"kmeans_k{k}.joblib"
    
    print(f"Training MiniBatchKMeans with K={k}...")
    model = MiniBatchKMeans(
        n_clusters=k, 
        random_state=config.RANDOM_SEED,
        batch_size=10000,  # Process in smaller batches
        n_init=3
    )
    
    # Fit in mini-batches for memory efficiency
    print("  Training in mini-batches...")
    for i in tqdm(range(0, len(df), 50000), desc="Training batches"):
        batch = df[feature_cols].iloc[i:i+50000]
        model.partial_fit(batch)
    
    model_path = config.MODEL_DIR / model_name
    joblib.dump(model, model_path)
    print(f"✓ Saved model to {model_path}")
    
    return model, model_path

# Train initial model
initial_model, model_path = train_model(df, feature_cols)

print(f"\n✓ Model training complete!")
print(f"  Model type: MiniBatchKMeans")
print(f"  Number of clusters: {config.INITIAL_K}")
print(f"  Model saved to: {model_path}")
print(f"  Inertia: {initial_model.inertia_:.2f}")

# Show cluster centers
print(f"\nCluster centers shape: {initial_model.cluster_centers_.shape}")


Training MiniBatchKMeans with K=6...
  Training in mini-batches...


Training batches: 100%|██████████| 40/40 [00:03<00:00, 10.22it/s]

✓ Saved model to poc_data\models\kmeans_k6.joblib

✓ Model training complete!
  Model type: MiniBatchKMeans
  Number of clusters: 6
  Model saved to: poc_data\models\kmeans_k6.joblib
  Inertia: 665685.12

Cluster centers shape: (6, 8)





## Initializing Feature Stores

In [7]:
def init_feature_stores(reset=True):
    """Initialize online (SQLite) and offline (Parquet) feature stores"""
    
    # Handle existing database
    if reset and config.SQLITE_DB.exists():
        try:
            config.SQLITE_DB.unlink()
        except PermissionError:
            print("[WARN] Database file is currently in use. Trying to close connections...")
            import gc
            gc.collect()  # Force garbage collection of unused connections
            time.sleep(0.1)
            config.SQLITE_DB.unlink()
    
    # Create new DB
    conn = sqlite3.connect(config.SQLITE_DB)
    cursor = conn.cursor()
    
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS online_features (
        trip_id TEXT PRIMARY KEY,
        cluster_id INTEGER,
        last_updated TIMESTAMP,
        trip_distance REAL,
        total_amount REAL,
        pickup_hour INTEGER,
        pickup_day INTEGER,
        trip_duration REAL,
        passenger_count INTEGER
    )
    ''')
    
    conn.commit()
    conn.close()
    
    # Clean offline store
    for f in config.OFFLINE_FEATURE_DIR.glob("*.parquet"):
        f.unlink()
    
    print("✓ Feature stores initialized")

# Initialize feature stores
init_feature_stores(reset=True)

print(f"✓ Feature stores initialized!")
print(f"  Online store: SQLite database at {config.SQLITE_DB}")
print(f"  Offline store: Parquet files in {config.OFFLINE_FEATURE_DIR}")

# Verify database creation
conn = sqlite3.connect(config.SQLITE_DB)
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = cursor.fetchall()
conn.close()
print(f"  Database tables: {[table[0] for table in tables]}")


✓ Feature stores initialized
✓ Feature stores initialized!
  Online store: SQLite database at poc_data\feature_store.db
  Offline store: Parquet files in poc_data\offline_features
  Database tables: ['online_features']


## Setting Up VectorDB (ChromaDB)

In [8]:
def setup_vector_db(reset=True):
    """Set up ChromaDB for vector storage"""
    chroma_client = chromadb.PersistentClient(
        path=str(config.VECTOR_DB_DIR),
        settings=Settings(allow_reset=reset)
    )
    
    # Create collection
    collection = chroma_client.get_or_create_collection(
        name="trip_embeddings",
        metadata={"hnsw:space": "cosine"}
    )
    
    print("✓ Vector database initialized")
    return chroma_client, collection

# Setup vector database
chroma_client, collection = setup_vector_db(reset=True)

print(f"✓ Vector database setup complete!")
print(f"  Client: ChromaDB persistent client")
print(f"  Collection: trip_embeddings")
print(f"  Storage path: {config.VECTOR_DB_DIR}")
print(f"  Distance metric: Cosine")

✓ Vector database initialized
✓ Vector database setup complete!
  Client: ChromaDB persistent client
  Collection: trip_embeddings
  Storage path: poc_data\vector_db
  Distance metric: Cosine


## Fitting Scaler and PCA

In [9]:
# Global variables for PCA and Scaler
pca_model = None
scaler_model = None

def fit_scaler_pca(df, feature_cols):
    """Fit scaler and PCA on historical data for consistent embeddings"""
    global pca_model, scaler_model
    
    print("Fitting StandardScaler and PCA on historical data...")
    
    # Fit scaler
    scaler_model = StandardScaler()
    scaled_features = scaler_model.fit_transform(df[feature_cols])
    
    # Fit PCA
    pca_model = PCA(n_components=min(config.EMBEDDING_DIM, len(feature_cols)))
    pca_model.fit(scaled_features)
    
    explained_var_ratio = pca_model.explained_variance_ratio_
    cumulative_var_ratio = np.cumsum(explained_var_ratio)
    
    print("✓ PCA and Scaler fitted on historical data")
    print(f"  PCA components: {pca_model.n_components_}")
    print(f"  Explained variance ratio (first 5): {explained_var_ratio[:5]}")
    print(f"  Cumulative variance explained: {cumulative_var_ratio[-1]:.3f}")
    
    return pca_model, scaler_model

# Fit PCA and Scaler
pca_fitted, scaler_fitted = fit_scaler_pca(df, feature_cols)

print(f"\n✓ Dimensionality reduction models ready!")
print(f"  Original features: {len(feature_cols)}")
print(f"  Reduced dimensions: {pca_fitted.n_components_}")
print(f"  Variance preserved: {np.sum(pca_fitted.explained_variance_ratio_):.1%}")


Fitting StandardScaler and PCA on historical data...
✓ PCA and Scaler fitted on historical data
  PCA components: 8
  Explained variance ratio (first 5): [0.34288306 0.140263   0.12783096 0.12558373 0.11959544]
  Cumulative variance explained: 1.000

✓ Dimensionality reduction models ready!
  Original features: 8
  Reduced dimensions: 8
  Variance preserved: 100.0%


## Upsert to Online Feature Store

In [10]:
def upsert_to_online_store(engine, output_df):
    """Bulk upsert features to SQLite online store using dictionaries for SQLAlchemy 2.x"""
    try:
        # Prepare records as dictionaries
        records = output_df[[
            'trip_id', 'cluster_id', 'trip_distance', 'total_amount',
            'pickup_hour', 'pickup_day', 'trip_duration', 'passenger_count'
        ]].copy()
        
        # Convert timestamp to string format for SQLite compatibility
        records['last_updated'] = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')

        # Convert to list of dicts
        records_list = records.to_dict(orient='records')

        # Use SQLAlchemy bulk execute
        with engine.begin() as conn:  # begin() handles commit automatically
            conn.execute(
                '''
                INSERT OR REPLACE INTO online_features
                (trip_id, cluster_id, trip_distance, total_amount, pickup_hour, pickup_day, trip_duration, passenger_count, last_updated)
                VALUES (:trip_id, :cluster_id, :trip_distance, :total_amount, :pickup_hour, :pickup_day, :trip_duration, :passenger_count, :last_updated)
                ''',
                records_list
            )

        print(f"✓ Upserted {len(records_list)} rows into online_features")
        return True
    except Exception as e:
        print(f"✗ Error upserting to online store: {str(e)}")
        return False

## Store Embedding in VectorDB

In [11]:
def store_embeddings(collection, df, feature_cols, batch_size=5000):
    """Store embeddings in ChromaDB safely, ensuring numeric features and metadata"""
    global pca_model, scaler_model

    if df.empty:
        print("[WARN] Empty DataFrame, skipping embedding storage")
        return 0

    print(f"  Preparing to store embeddings for {len(df)} rows")

    try:
        # Ensure numeric features only
        df_features = df[feature_cols].apply(pd.to_numeric, errors='coerce').fillna(0)
        
        # Standardize and reduce dimensions
        scaled_features = scaler_model.transform(df_features)
        embeddings = pca_model.transform(scaled_features)
        print(f"  Embeddings created with shape: {embeddings.shape}")
    except Exception as e:
        print(f"  ERROR in embedding creation: {str(e)}")
        return 0

    # Ensure processed_at exists
    if 'processed_at' not in df.columns:
        df['processed_at'] = datetime.now(timezone.utc)

    # Store embeddings in batches
    total_stored = 0
    for i in range(0, len(embeddings), batch_size):
        batch_end = min(i + batch_size, len(embeddings))
        batch_ids = df['trip_id'].iloc[i:batch_end].astype(str).tolist()
        batch_embeddings = embeddings[i:batch_end].tolist()
        batch_metadatas = []
        
        for _, row in df.iloc[i:batch_end].iterrows():
            try:
                batch_metadatas.append({
                    'cluster_id': int(row['cluster_id']),
                    'trip_distance': float(row['trip_distance']),
                    'total_amount': float(row['total_amount']),
                    'pickup_hour': int(row['pickup_hour']),
                    'trip_duration': float(row.get('trip_duration', 0)),
                    'processed_at': row['processed_at'].isoformat() if hasattr(row['processed_at'], 'isoformat') else str(row['processed_at'])
                })
            except Exception as e:
                print(f"  ERROR creating metadata for row: {str(e)}")
                continue

        try:
            collection.add(
                ids=batch_ids,
                embeddings=batch_embeddings,
                metadatas=batch_metadatas
            )
            total_stored += len(batch_ids)
            print(f"  Added batch {i//batch_size + 1}, stored {len(batch_ids)} embeddings")
        except Exception as e:
            print(f"  ERROR adding batch to vector DB: {str(e)}")
            # Try to add without metadata first to isolate the issue
            try:
                collection.add(
                    ids=batch_ids,
                    embeddings=batch_embeddings
                )
                print(f"  Successfully added batch without metadata")
                total_stored += len(batch_ids)
            except Exception as e2:
                print(f"  ERROR adding batch without metadata: {str(e2)}")
                # Try to add just one item to see if there's a specific issue
                try:
                    collection.add(
                        ids=[batch_ids[0]],
                        embeddings=[batch_embeddings[0]]
                    )
                    print(f"  Successfully added single item")
                    total_stored += 1
                except Exception as e3:
                    print(f"  ERROR adding single item: {str(e3)}")

    print(f"✓ Stored {total_stored} embeddings in vector DB")
    return total_stored




## Process Event Stream and Populate Feature Stores

In [12]:
def process_stream_batch(model, feature_cols, collection):
    """Process all event log chunks and populate feature stores + vector DB"""
    engine = create_engine(f"sqlite:///{config.SQLITE_DB}")
    processed_total = 0

    chunk_files = sorted(config.LOG_DIR.glob("events_*.parquet"))
    if not chunk_files:
        print("[WARN] No event chunks found to process")
        return 0

    print(f"Processing {len(chunk_files)} chunks...")
    print(f"Feature columns expected: {feature_cols}")

    for fname in tqdm(chunk_files, desc="Processing chunks"):
        try:
            print(f"\nProcessing {fname.name}...")
            chunk = pd.read_parquet(fname)
            
            if chunk.empty:
                print(f"  Chunk is empty, skipping")
                continue
                
            print(f"  Chunk shape: {chunk.shape}")
            print(f"  Chunk columns: {chunk.columns.tolist()}")
            
            # Check if all required feature columns exist
            missing_cols = [col for col in feature_cols if col not in chunk.columns]
            if missing_cols:
                print(f"  Missing columns: {missing_cols}")
                continue
                
            # Check for NaN values in feature columns
            nan_counts = chunk[feature_cols].isna().sum()
            if nan_counts.any():
                print(f"  NaN values found: {nan_counts[nan_counts > 0].to_dict()}")
                
            # Fill numeric features
            chunk_filled = chunk[feature_cols].apply(pd.to_numeric, errors='coerce').fillna(0)
            print(f"  Filled chunk shape: {chunk_filled.shape}")
            
            # Check if all values are numeric after conversion
            non_numeric_cols = []
            for col in chunk_filled.columns:
                if not pd.api.types.is_numeric_dtype(chunk_filled[col]):
                    non_numeric_cols.append(col)
                    
            if non_numeric_cols:
                print(f"  Non-numeric columns after conversion: {non_numeric_cols}")
                continue
                
            # Predict cluster IDs
            cluster_ids = model.predict(chunk_filled)
            chunk['cluster_id'] = cluster_ids
            chunk['processed_at'] = datetime.now(timezone.utc)
            
            print(f"  Cluster IDs assigned: {len(np.unique(cluster_ids))} unique clusters")
            
            # Save offline features
            offline_path = config.OFFLINE_FEATURE_DIR / f"{fname.stem}_features.parquet"
            chunk.to_parquet(offline_path, index=False)
            print(f"  Offline features saved to {offline_path}")
            
            # Upsert to SQLite online store
            upsert_success = upsert_to_online_store(engine, chunk)
            if not upsert_success:
                print(f"  Failed to upsert to online store")
            
            # Store embeddings in vector DB
            print(f"  Storing embeddings...")
            stored_count = store_embeddings(collection, chunk, feature_cols)
            processed_total += stored_count
            
            if stored_count == 0:
                print(f"  WARNING: No embeddings stored for this chunk")
            else:
                print(f"  Stored {stored_count} embeddings")
                
        except Exception as e:
            print(f"  ERROR processing chunk: {str(e)}")
            import traceback
            traceback.print_exc()
            continue

    print(f"✓ Stream processing complete. Total rows processed and stored: {processed_total:,}")
    
    # Verify vector DB has content
    db_count = collection.count()
    print(f"Vector DB now contains {db_count} embeddings")
    
    return processed_total


def check_vector_db_contents(collection):
    """Check what's actually in the vector database"""
    print("\nChecking vector database contents...")
    
    # Count items in collection
    count = collection.count()
    print(f"Items in vector database: {count}")
    
    if count > 0:
        # Get a sample of items
        sample = collection.get(limit=min(5, count))
        print(f"Sample IDs: {sample['ids']}")
        if sample['metadatas']:
            print(f"Sample metadata keys: {list(sample['metadatas'][0].keys())}")
    else:
        print("Vector database is empty")
        
    return count
processed_count = process_stream_batch(initial_model, feature_cols, collection)
check_vector_db_contents(collection)

Processing 20 chunks...
Feature columns expected: ['trip_distance', 'total_amount', 'pickup_hour', 'pickup_day', 'trip_duration', 'passenger_count', 'pickup_location', 'dropoff_location']


Processing chunks:   0%|          | 0/20 [00:00<?, ?it/s]


Processing events_00000.parquet...
  Chunk shape: (100000, 9)
  Chunk columns: ['trip_id', 'trip_distance', 'total_amount', 'pickup_hour', 'pickup_day', 'trip_duration', 'passenger_count', 'pickup_location', 'dropoff_location']
  Filled chunk shape: (100000, 8)
  Cluster IDs assigned: 6 unique clusters
  Offline features saved to poc_data\offline_features\events_00000_features.parquet
✗ Error upserting to online store: Not an executable object: '\n                INSERT OR REPLACE INTO online_features\n                (trip_id, cluster_id, trip_distance, total_amount, pickup_hour, pickup_day, trip_duration, passenger_count, last_updated)\n                VALUES (:trip_id, :cluster_id, :trip_distance, :total_amount, :pickup_hour, :pickup_day, :trip_duration, :passenger_count, :last_updated)\n                '
  Failed to upsert to online store
  Storing embeddings...
  Preparing to store embeddings for 100000 rows
  Embeddings created with shape: (100000, 8)
  Added batch 1, stored 500

Processing chunks:   5%|▌         | 1/20 [00:34<10:56, 34.56s/it]

  Added batch 20, stored 5000 embeddings
✓ Stored 100000 embeddings in vector DB
  Stored 100000 embeddings

Processing events_00001.parquet...
  Chunk shape: (100000, 9)
  Chunk columns: ['trip_id', 'trip_distance', 'total_amount', 'pickup_hour', 'pickup_day', 'trip_duration', 'passenger_count', 'pickup_location', 'dropoff_location']
  Filled chunk shape: (100000, 8)
  Cluster IDs assigned: 6 unique clusters
  Offline features saved to poc_data\offline_features\events_00001_features.parquet
✗ Error upserting to online store: Not an executable object: '\n                INSERT OR REPLACE INTO online_features\n                (trip_id, cluster_id, trip_distance, total_amount, pickup_hour, pickup_day, trip_duration, passenger_count, last_updated)\n                VALUES (:trip_id, :cluster_id, :trip_distance, :total_amount, :pickup_hour, :pickup_day, :trip_duration, :passenger_count, :last_updated)\n                '
  Failed to upsert to online store
  Storing embeddings...
  Preparing 

Processing chunks:  10%|█         | 2/20 [01:20<12:26, 41.46s/it]

  Added batch 20, stored 5000 embeddings
✓ Stored 100000 embeddings in vector DB
  Stored 100000 embeddings

Processing events_00002.parquet...
  Chunk shape: (100000, 9)
  Chunk columns: ['trip_id', 'trip_distance', 'total_amount', 'pickup_hour', 'pickup_day', 'trip_duration', 'passenger_count', 'pickup_location', 'dropoff_location']
  Filled chunk shape: (100000, 8)
  Cluster IDs assigned: 6 unique clusters
  Offline features saved to poc_data\offline_features\events_00002_features.parquet
✗ Error upserting to online store: Not an executable object: '\n                INSERT OR REPLACE INTO online_features\n                (trip_id, cluster_id, trip_distance, total_amount, pickup_hour, pickup_day, trip_duration, passenger_count, last_updated)\n                VALUES (:trip_id, :cluster_id, :trip_distance, :total_amount, :pickup_hour, :pickup_day, :trip_duration, :passenger_count, :last_updated)\n                '
  Failed to upsert to online store
  Storing embeddings...
  Preparing 

Processing chunks:  15%|█▌        | 3/20 [02:12<13:04, 46.12s/it]

  Added batch 20, stored 5000 embeddings
✓ Stored 100000 embeddings in vector DB
  Stored 100000 embeddings

Processing events_00003.parquet...
  Chunk shape: (100000, 9)
  Chunk columns: ['trip_id', 'trip_distance', 'total_amount', 'pickup_hour', 'pickup_day', 'trip_duration', 'passenger_count', 'pickup_location', 'dropoff_location']
  Filled chunk shape: (100000, 8)
  Cluster IDs assigned: 6 unique clusters
  Offline features saved to poc_data\offline_features\events_00003_features.parquet
✗ Error upserting to online store: Not an executable object: '\n                INSERT OR REPLACE INTO online_features\n                (trip_id, cluster_id, trip_distance, total_amount, pickup_hour, pickup_day, trip_duration, passenger_count, last_updated)\n                VALUES (:trip_id, :cluster_id, :trip_distance, :total_amount, :pickup_hour, :pickup_day, :trip_duration, :passenger_count, :last_updated)\n                '
  Failed to upsert to online store
  Storing embeddings...
  Preparing 

Processing chunks:  20%|██        | 4/20 [03:08<13:20, 50.01s/it]

  Added batch 20, stored 5000 embeddings
✓ Stored 100000 embeddings in vector DB
  Stored 100000 embeddings

Processing events_00004.parquet...
  Chunk shape: (100000, 9)
  Chunk columns: ['trip_id', 'trip_distance', 'total_amount', 'pickup_hour', 'pickup_day', 'trip_duration', 'passenger_count', 'pickup_location', 'dropoff_location']
  Filled chunk shape: (100000, 8)
  Cluster IDs assigned: 6 unique clusters
  Offline features saved to poc_data\offline_features\events_00004_features.parquet
✗ Error upserting to online store: Not an executable object: '\n                INSERT OR REPLACE INTO online_features\n                (trip_id, cluster_id, trip_distance, total_amount, pickup_hour, pickup_day, trip_duration, passenger_count, last_updated)\n                VALUES (:trip_id, :cluster_id, :trip_distance, :total_amount, :pickup_hour, :pickup_day, :trip_duration, :passenger_count, :last_updated)\n                '
  Failed to upsert to online store
  Storing embeddings...
  Preparing 

Processing chunks:  25%|██▌       | 5/20 [04:08<13:23, 53.55s/it]

  Added batch 20, stored 5000 embeddings
✓ Stored 100000 embeddings in vector DB
  Stored 100000 embeddings

Processing events_00005.parquet...
  Chunk shape: (100000, 9)
  Chunk columns: ['trip_id', 'trip_distance', 'total_amount', 'pickup_hour', 'pickup_day', 'trip_duration', 'passenger_count', 'pickup_location', 'dropoff_location']
  Filled chunk shape: (100000, 8)
  Cluster IDs assigned: 6 unique clusters
  Offline features saved to poc_data\offline_features\events_00005_features.parquet
✗ Error upserting to online store: Not an executable object: '\n                INSERT OR REPLACE INTO online_features\n                (trip_id, cluster_id, trip_distance, total_amount, pickup_hour, pickup_day, trip_duration, passenger_count, last_updated)\n                VALUES (:trip_id, :cluster_id, :trip_distance, :total_amount, :pickup_hour, :pickup_day, :trip_duration, :passenger_count, :last_updated)\n                '
  Failed to upsert to online store
  Storing embeddings...
  Preparing 

Processing chunks:  30%|███       | 6/20 [05:12<13:19, 57.14s/it]

  Added batch 20, stored 5000 embeddings
✓ Stored 100000 embeddings in vector DB
  Stored 100000 embeddings

Processing events_00006.parquet...
  Chunk shape: (100000, 9)
  Chunk columns: ['trip_id', 'trip_distance', 'total_amount', 'pickup_hour', 'pickup_day', 'trip_duration', 'passenger_count', 'pickup_location', 'dropoff_location']
  Filled chunk shape: (100000, 8)
  Cluster IDs assigned: 6 unique clusters
  Offline features saved to poc_data\offline_features\events_00006_features.parquet
✗ Error upserting to online store: Not an executable object: '\n                INSERT OR REPLACE INTO online_features\n                (trip_id, cluster_id, trip_distance, total_amount, pickup_hour, pickup_day, trip_duration, passenger_count, last_updated)\n                VALUES (:trip_id, :cluster_id, :trip_distance, :total_amount, :pickup_hour, :pickup_day, :trip_duration, :passenger_count, :last_updated)\n                '
  Failed to upsert to online store
  Storing embeddings...
  Preparing 

Processing chunks:  35%|███▌      | 7/20 [06:20<13:11, 60.88s/it]

  Added batch 20, stored 5000 embeddings
✓ Stored 100000 embeddings in vector DB
  Stored 100000 embeddings

Processing events_00007.parquet...
  Chunk shape: (100000, 9)
  Chunk columns: ['trip_id', 'trip_distance', 'total_amount', 'pickup_hour', 'pickup_day', 'trip_duration', 'passenger_count', 'pickup_location', 'dropoff_location']
  Filled chunk shape: (100000, 8)
  Cluster IDs assigned: 6 unique clusters
  Offline features saved to poc_data\offline_features\events_00007_features.parquet
✗ Error upserting to online store: Not an executable object: '\n                INSERT OR REPLACE INTO online_features\n                (trip_id, cluster_id, trip_distance, total_amount, pickup_hour, pickup_day, trip_duration, passenger_count, last_updated)\n                VALUES (:trip_id, :cluster_id, :trip_distance, :total_amount, :pickup_hour, :pickup_day, :trip_duration, :passenger_count, :last_updated)\n                '
  Failed to upsert to online store
  Storing embeddings...
  Preparing 

Processing chunks:  40%|████      | 8/20 [07:37<13:11, 65.96s/it]

  Added batch 20, stored 5000 embeddings
✓ Stored 100000 embeddings in vector DB
  Stored 100000 embeddings

Processing events_00008.parquet...
  Chunk shape: (100000, 9)
  Chunk columns: ['trip_id', 'trip_distance', 'total_amount', 'pickup_hour', 'pickup_day', 'trip_duration', 'passenger_count', 'pickup_location', 'dropoff_location']
  Filled chunk shape: (100000, 8)
  Cluster IDs assigned: 6 unique clusters
  Offline features saved to poc_data\offline_features\events_00008_features.parquet
✗ Error upserting to online store: Not an executable object: '\n                INSERT OR REPLACE INTO online_features\n                (trip_id, cluster_id, trip_distance, total_amount, pickup_hour, pickup_day, trip_duration, passenger_count, last_updated)\n                VALUES (:trip_id, :cluster_id, :trip_distance, :total_amount, :pickup_hour, :pickup_day, :trip_duration, :passenger_count, :last_updated)\n                '
  Failed to upsert to online store
  Storing embeddings...
  Preparing 

Processing chunks:  45%|████▌     | 9/20 [08:58<12:56, 70.63s/it]

  Added batch 20, stored 5000 embeddings
✓ Stored 100000 embeddings in vector DB
  Stored 100000 embeddings

Processing events_00009.parquet...
  Chunk shape: (100000, 9)
  Chunk columns: ['trip_id', 'trip_distance', 'total_amount', 'pickup_hour', 'pickup_day', 'trip_duration', 'passenger_count', 'pickup_location', 'dropoff_location']
  Filled chunk shape: (100000, 8)
  Cluster IDs assigned: 6 unique clusters
  Offline features saved to poc_data\offline_features\events_00009_features.parquet
✗ Error upserting to online store: Not an executable object: '\n                INSERT OR REPLACE INTO online_features\n                (trip_id, cluster_id, trip_distance, total_amount, pickup_hour, pickup_day, trip_duration, passenger_count, last_updated)\n                VALUES (:trip_id, :cluster_id, :trip_distance, :total_amount, :pickup_hour, :pickup_day, :trip_duration, :passenger_count, :last_updated)\n                '
  Failed to upsert to online store
  Storing embeddings...
  Preparing 

Processing chunks:  50%|█████     | 10/20 [10:54<14:06, 84.62s/it]

  Added batch 20, stored 5000 embeddings
✓ Stored 100000 embeddings in vector DB
  Stored 100000 embeddings

Processing events_00010.parquet...
  Chunk shape: (100000, 9)
  Chunk columns: ['trip_id', 'trip_distance', 'total_amount', 'pickup_hour', 'pickup_day', 'trip_duration', 'passenger_count', 'pickup_location', 'dropoff_location']
  Filled chunk shape: (100000, 8)
  Cluster IDs assigned: 6 unique clusters
  Offline features saved to poc_data\offline_features\events_00010_features.parquet
✗ Error upserting to online store: Not an executable object: '\n                INSERT OR REPLACE INTO online_features\n                (trip_id, cluster_id, trip_distance, total_amount, pickup_hour, pickup_day, trip_duration, passenger_count, last_updated)\n                VALUES (:trip_id, :cluster_id, :trip_distance, :total_amount, :pickup_hour, :pickup_day, :trip_duration, :passenger_count, :last_updated)\n                '
  Failed to upsert to online store
  Storing embeddings...
  Preparing 

Processing chunks:  55%|█████▌    | 11/20 [12:54<14:19, 95.45s/it]

  Added batch 20, stored 5000 embeddings
✓ Stored 100000 embeddings in vector DB
  Stored 100000 embeddings

Processing events_00011.parquet...
  Chunk shape: (100000, 9)
  Chunk columns: ['trip_id', 'trip_distance', 'total_amount', 'pickup_hour', 'pickup_day', 'trip_duration', 'passenger_count', 'pickup_location', 'dropoff_location']
  Filled chunk shape: (100000, 8)
  Cluster IDs assigned: 6 unique clusters
  Offline features saved to poc_data\offline_features\events_00011_features.parquet
✗ Error upserting to online store: Not an executable object: '\n                INSERT OR REPLACE INTO online_features\n                (trip_id, cluster_id, trip_distance, total_amount, pickup_hour, pickup_day, trip_duration, passenger_count, last_updated)\n                VALUES (:trip_id, :cluster_id, :trip_distance, :total_amount, :pickup_hour, :pickup_day, :trip_duration, :passenger_count, :last_updated)\n                '
  Failed to upsert to online store
  Storing embeddings...
  Preparing 

Processing chunks:  60%|██████    | 12/20 [14:58<13:52, 104.12s/it]

  Added batch 20, stored 5000 embeddings
✓ Stored 100000 embeddings in vector DB
  Stored 100000 embeddings

Processing events_00012.parquet...
  Chunk shape: (100000, 9)
  Chunk columns: ['trip_id', 'trip_distance', 'total_amount', 'pickup_hour', 'pickup_day', 'trip_duration', 'passenger_count', 'pickup_location', 'dropoff_location']
  Filled chunk shape: (100000, 8)
  Cluster IDs assigned: 6 unique clusters
  Offline features saved to poc_data\offline_features\events_00012_features.parquet
✗ Error upserting to online store: Not an executable object: '\n                INSERT OR REPLACE INTO online_features\n                (trip_id, cluster_id, trip_distance, total_amount, pickup_hour, pickup_day, trip_duration, passenger_count, last_updated)\n                VALUES (:trip_id, :cluster_id, :trip_distance, :total_amount, :pickup_hour, :pickup_day, :trip_duration, :passenger_count, :last_updated)\n                '
  Failed to upsert to online store
  Storing embeddings...
  Preparing 

Processing chunks:  65%|██████▌   | 13/20 [17:00<12:46, 109.51s/it]

  Added batch 20, stored 5000 embeddings
✓ Stored 100000 embeddings in vector DB
  Stored 100000 embeddings

Processing events_00013.parquet...
  Chunk shape: (100000, 9)
  Chunk columns: ['trip_id', 'trip_distance', 'total_amount', 'pickup_hour', 'pickup_day', 'trip_duration', 'passenger_count', 'pickup_location', 'dropoff_location']
  Filled chunk shape: (100000, 8)
  Cluster IDs assigned: 6 unique clusters
  Offline features saved to poc_data\offline_features\events_00013_features.parquet
✗ Error upserting to online store: Not an executable object: '\n                INSERT OR REPLACE INTO online_features\n                (trip_id, cluster_id, trip_distance, total_amount, pickup_hour, pickup_day, trip_duration, passenger_count, last_updated)\n                VALUES (:trip_id, :cluster_id, :trip_distance, :total_amount, :pickup_hour, :pickup_day, :trip_duration, :passenger_count, :last_updated)\n                '
  Failed to upsert to online store
  Storing embeddings...
  Preparing 

Processing chunks:  70%|███████   | 14/20 [19:04<11:23, 113.88s/it]

  Added batch 20, stored 5000 embeddings
✓ Stored 100000 embeddings in vector DB
  Stored 100000 embeddings

Processing events_00014.parquet...
  Chunk shape: (100000, 9)
  Chunk columns: ['trip_id', 'trip_distance', 'total_amount', 'pickup_hour', 'pickup_day', 'trip_duration', 'passenger_count', 'pickup_location', 'dropoff_location']
  Filled chunk shape: (100000, 8)
  Cluster IDs assigned: 6 unique clusters
  Offline features saved to poc_data\offline_features\events_00014_features.parquet
✗ Error upserting to online store: Not an executable object: '\n                INSERT OR REPLACE INTO online_features\n                (trip_id, cluster_id, trip_distance, total_amount, pickup_hour, pickup_day, trip_duration, passenger_count, last_updated)\n                VALUES (:trip_id, :cluster_id, :trip_distance, :total_amount, :pickup_hour, :pickup_day, :trip_duration, :passenger_count, :last_updated)\n                '
  Failed to upsert to online store
  Storing embeddings...
  Preparing 

Processing chunks:  75%|███████▌  | 15/20 [20:56<09:26, 113.34s/it]

  Added batch 20, stored 5000 embeddings
✓ Stored 100000 embeddings in vector DB
  Stored 100000 embeddings

Processing events_00015.parquet...
  Chunk shape: (100000, 9)
  Chunk columns: ['trip_id', 'trip_distance', 'total_amount', 'pickup_hour', 'pickup_day', 'trip_duration', 'passenger_count', 'pickup_location', 'dropoff_location']
  Filled chunk shape: (100000, 8)
  Cluster IDs assigned: 6 unique clusters
  Offline features saved to poc_data\offline_features\events_00015_features.parquet
✗ Error upserting to online store: Not an executable object: '\n                INSERT OR REPLACE INTO online_features\n                (trip_id, cluster_id, trip_distance, total_amount, pickup_hour, pickup_day, trip_duration, passenger_count, last_updated)\n                VALUES (:trip_id, :cluster_id, :trip_distance, :total_amount, :pickup_hour, :pickup_day, :trip_duration, :passenger_count, :last_updated)\n                '
  Failed to upsert to online store
  Storing embeddings...
  Preparing 

Processing chunks:  80%|████████  | 16/20 [22:56<07:40, 115.22s/it]

  Added batch 20, stored 5000 embeddings
✓ Stored 100000 embeddings in vector DB
  Stored 100000 embeddings

Processing events_00016.parquet...
  Chunk shape: (100000, 9)
  Chunk columns: ['trip_id', 'trip_distance', 'total_amount', 'pickup_hour', 'pickup_day', 'trip_duration', 'passenger_count', 'pickup_location', 'dropoff_location']
  Filled chunk shape: (100000, 8)
  Cluster IDs assigned: 6 unique clusters
  Offline features saved to poc_data\offline_features\events_00016_features.parquet
✗ Error upserting to online store: Not an executable object: '\n                INSERT OR REPLACE INTO online_features\n                (trip_id, cluster_id, trip_distance, total_amount, pickup_hour, pickup_day, trip_duration, passenger_count, last_updated)\n                VALUES (:trip_id, :cluster_id, :trip_distance, :total_amount, :pickup_hour, :pickup_day, :trip_duration, :passenger_count, :last_updated)\n                '
  Failed to upsert to online store
  Storing embeddings...
  Preparing 

Processing chunks:  85%|████████▌ | 17/20 [25:27<06:18, 126.21s/it]

  Added batch 20, stored 5000 embeddings
✓ Stored 100000 embeddings in vector DB
  Stored 100000 embeddings

Processing events_00017.parquet...
  Chunk shape: (100000, 9)
  Chunk columns: ['trip_id', 'trip_distance', 'total_amount', 'pickup_hour', 'pickup_day', 'trip_duration', 'passenger_count', 'pickup_location', 'dropoff_location']
  Filled chunk shape: (100000, 8)
  Cluster IDs assigned: 6 unique clusters
  Offline features saved to poc_data\offline_features\events_00017_features.parquet
✗ Error upserting to online store: Not an executable object: '\n                INSERT OR REPLACE INTO online_features\n                (trip_id, cluster_id, trip_distance, total_amount, pickup_hour, pickup_day, trip_duration, passenger_count, last_updated)\n                VALUES (:trip_id, :cluster_id, :trip_distance, :total_amount, :pickup_hour, :pickup_day, :trip_duration, :passenger_count, :last_updated)\n                '
  Failed to upsert to online store
  Storing embeddings...
  Preparing 

Processing chunks:  90%|█████████ | 18/20 [28:00<04:28, 134.13s/it]

  Added batch 20, stored 5000 embeddings
✓ Stored 100000 embeddings in vector DB
  Stored 100000 embeddings

Processing events_00018.parquet...
  Chunk shape: (100000, 9)
  Chunk columns: ['trip_id', 'trip_distance', 'total_amount', 'pickup_hour', 'pickup_day', 'trip_duration', 'passenger_count', 'pickup_location', 'dropoff_location']
  Filled chunk shape: (100000, 8)
  Cluster IDs assigned: 6 unique clusters
  Offline features saved to poc_data\offline_features\events_00018_features.parquet
✗ Error upserting to online store: Not an executable object: '\n                INSERT OR REPLACE INTO online_features\n                (trip_id, cluster_id, trip_distance, total_amount, pickup_hour, pickup_day, trip_duration, passenger_count, last_updated)\n                VALUES (:trip_id, :cluster_id, :trip_distance, :total_amount, :pickup_hour, :pickup_day, :trip_duration, :passenger_count, :last_updated)\n                '
  Failed to upsert to online store
  Storing embeddings...
  Preparing 

Processing chunks:  95%|█████████▌| 19/20 [30:30<02:18, 138.91s/it]

  Added batch 20, stored 5000 embeddings
✓ Stored 100000 embeddings in vector DB
  Stored 100000 embeddings

Processing events_00019.parquet...
  Chunk shape: (56509, 9)
  Chunk columns: ['trip_id', 'trip_distance', 'total_amount', 'pickup_hour', 'pickup_day', 'trip_duration', 'passenger_count', 'pickup_location', 'dropoff_location']
  Filled chunk shape: (56509, 8)
  Cluster IDs assigned: 6 unique clusters
  Offline features saved to poc_data\offline_features\events_00019_features.parquet
✗ Error upserting to online store: Not an executable object: '\n                INSERT OR REPLACE INTO online_features\n                (trip_id, cluster_id, trip_distance, total_amount, pickup_hour, pickup_day, trip_duration, passenger_count, last_updated)\n                VALUES (:trip_id, :cluster_id, :trip_distance, :total_amount, :pickup_hour, :pickup_day, :trip_duration, :passenger_count, :last_updated)\n                '
  Failed to upsert to online store
  Storing embeddings...
  Preparing to

Processing chunks: 100%|██████████| 20/20 [31:53<00:00, 95.67s/it] 

  Added batch 12, stored 1509 embeddings
✓ Stored 56509 embeddings in vector DB
  Stored 56509 embeddings
✓ Stream processing complete. Total rows processed and stored: 1,956,509





Vector DB now contains 1956509 embeddings

Checking vector database contents...
Items in vector database: 1956509
Sample IDs: ['trip_0', 'trip_1', 'trip_2', 'trip_3', 'trip_4']
Sample metadata keys: ['processed_at', 'trip_duration', 'cluster_id', 'total_amount', 'pickup_hour', 'trip_distance']


1956509

## Checking VectorDB Contents

In [13]:
def check_vector_db_contents(collection):
    """Check what's actually in the vector database"""
    print("\nChecking vector database contents...")
    
    # Count items in collection
    count = collection.count()
    print(f"Items in vector database: {count}")
    
    if count > 0:
        # Get a sample of items
        sample = collection.get(limit=min(5, count))
        print(f"Sample IDs: {sample['ids']}")
        if sample['metadatas']:
            print(f"Sample metadata keys: {list(sample['metadatas'][0].keys())}")
    else:
        print("Vector database is empty")
        
    return count

# Call this after processing
db_count = check_vector_db_contents(collection)


Checking vector database contents...
Items in vector database: 1956509
Sample IDs: ['trip_0', 'trip_1', 'trip_2', 'trip_3', 'trip_4']
Sample metadata keys: ['cluster_id', 'processed_at', 'total_amount', 'trip_duration', 'pickup_hour', 'trip_distance']


In [14]:
import pandas as pd

chunk_files = sorted(config.LOG_DIR.glob("events_*.parquet"))
if len(chunk_files) == 0:
    print("No chunk files found!")
else:
    chunk = pd.read_parquet(chunk_files[0])
    print(f"First chunk rows: {len(chunk)}")
    print(chunk.head())

First chunk rows: 100000
  trip_id  trip_distance  total_amount  pickup_hour  pickup_day  \
0  trip_0       0.400000     10.800000           15           0   
1  trip_1       0.600000     12.950000           21           1   
2  trip_2       2.100000     29.900000           18           2   
3  trip_3      16.299999     96.650002            9           3   
4  trip_4       1.740000     22.000000           17           4   

   trip_duration  passenger_count  pickup_location  dropoff_location  
0       4.266667              1.0                7                 7  
1       5.066667              1.0                6                 6  
2      22.549999              1.0                3                 6  
3      36.849998              3.0                2                 2  
4      14.500000              1.0                3                 1  


## Compute AI Metrics per Cluster

In [None]:
def compute_ai_metrics(collection, feature_cols):
    """Compute AI-enhanced metrics and add to feature store"""
    print("Computing AI-enhanced metrics...")
    
    # Get all embeddings and metadata from vector DB
    results = collection.get(include=['embeddings', 'metadatas'])
    
    if not results['ids']:
        print("No data in vector database")
        return None
    
    print(f"  Processing {len(results['ids']):,} records from vector database...")
    
    # Create DataFrame with embeddings and metadata
    emb_df = pd.DataFrame({
        'trip_id': results['ids'],
        'embedding': results['embeddings'],
        'cluster_id': [m['cluster_id'] for m in results['metadatas']],
        'trip_distance': [m['trip_distance'] for m in results['metadatas']],
        'total_amount': [m['total_amount'] for m in results['metadatas']],
        'pickup_hour': [m['pickup_hour'] for m in results['metadatas']],
        'trip_duration': [m.get('trip_duration', 30) for m in results['metadatas']]  # Default if missing
    })
    
    # Calculate cluster sizes
    cluster_sizes = emb_df.groupby('cluster_id').size()
    emb_df['cluster_size'] = emb_df['cluster_id'].map(cluster_sizes)
    print(f"  ✓ Cluster sizes computed")
    
    # Vectorized intra-cluster distance calculation
    print("  Computing intra-cluster distances...")
    cluster_means = {}
    for cid in emb_df['cluster_id'].unique():
        cluster_embeddings = np.vstack(emb_df[emb_df['cluster_id']==cid]['embedding'].values)
        cluster_means[cid] = np.mean(cluster_embeddings, axis=0)
    
    emb_df['intra_cluster_distance'] = emb_df.apply(
        lambda row: np.linalg.norm(np.array(row['embedding']) - cluster_means[row['cluster_id']]), axis=1
    )
    print(f"  ✓ Intra-cluster distances computed")
    
    # Calculate anomaly score
    print("  Computing anomaly scores...")
    iso_forest = IsolationForest(contamination=0.1, random_state=config.RANDOM_SEED)
    emb_df['anomaly_score'] = iso_forest.fit_predict(
        np.vstack(emb_df['embedding'].values)
    )
    print(f"  ✓ Anomaly scores computed")
    
    # Calculate silhouette score (sampled for memory efficiency)
    print("  Computing silhouette scores...")
    sample_size = min(10000, len(emb_df))
    sample_idx = np.random.choice(len(emb_df), sample_size, replace=False)
    sample_embeddings = np.vstack(emb_df['embedding'].iloc[sample_idx].values)
    sample_clusters = emb_df['cluster_id'].iloc[sample_idx].values
    
    silhouette_avg = silhouette_score(sample_embeddings, sample_clusters)
    emb_df['silhouette_score'] = silhouette_avg
    print(f"  ✓ Silhouette score computed: {silhouette_avg:.3f}")
    
    # Business metrics
    print("  Computing business metrics...")
    emb_df['revenue_per_minute'] = emb_df['total_amount'] / (emb_df['trip_duration'] + 1)
    emb_df['peak_hour'] = emb_df['pickup_hour'].isin([7, 8, 17, 18, 19]).astype(int)
    print(f"  ✓ Business metrics computed")
    
    # Save AI metrics
    ai_metrics = emb_df[[
        'trip_id', 'cluster_id', 'cluster_size', 'intra_cluster_distance',
        'anomaly_score', 'silhouette_score', 'revenue_per_minute', 'peak_hour'
    ]]
    
    ai_metrics.to_parquet(config.AI_METRICS_FILE, index=False)
    print(f"✓ AI metrics computed and saved to {config.AI_METRICS_FILE}")
    
    return ai_metrics

Computing AI-enhanced metrics...


In [28]:
def check_vector_db_contents(collection):
    """Check what's actually in the vector database"""
    print("\nChecking vector database contents...")
    
    # Count items in collection
    count = collection.count()
    print(f"Items in vector database: {count}")
    
    if count > 0:
        # Get a sample of items
        sample = collection.get(limit=min(5, count))
        print(f"Sample IDs: {sample['ids']}")
        if sample['metadatas']:
            print(f"Sample metadata keys: {list(sample['metadatas'][0].keys())}")
    else:
        print("Vector database is empty")
        
    return count

# Call this after processing
db_count = check_vector_db_contents(collection)


Checking vector database contents...
Items in vector database: 1956509
Sample IDs: ['trip_0', 'trip_1', 'trip_2', 'trip_3', 'trip_4']
Sample metadata keys: ['total_amount', 'pickup_hour', 'processed_at', 'cluster_id', 'trip_distance', 'trip_duration']


## Vector Similarity Search

In [29]:
def vector_similarity_search(collection, query_trip_id, n_results=5):
    """Find similar trips using vector search"""
    # Get the query embedding
    result = collection.get(ids=[query_trip_id], include=['embeddings'])
    
    if not result['ids']:
        print(f"Trip ID {query_trip_id} not found in vector DB")
        return None
    
    query_embedding = result['embeddings'][0]
    
    # Search for similar trips
    similar = collection.query(
        query_embeddings=[query_embedding],
        n_results=n_results + 1,  # +1 to exclude the query itself
        include=['metadatas', 'distances']
    )
    
    # Filter out the query trip itself
    similar_trips = []
    for i, trip_id in enumerate(similar['ids'][0]):
        if trip_id != query_trip_id:
            similar_trips.append({
                'trip_id': trip_id,
                'distance': similar['distances'][0][i],
                'metadata': similar['metadatas'][0][i]
            })
    
    return similar_trips[:n_results]

# Demonstrate vector similarity search
ai_metrics = compute_ai_metrics(collection)
if ai_metrics is not None:
    sample_trip = ai_metrics.iloc[0]['trip_id']
    print(f"Demonstrating vector similarity search for trip: {sample_trip}")
    
    similar_trips = vector_similarity_search(collection, sample_trip, n_results=5)
    
    if similar_trips:
        print(f"\n✓ Vector similarity search complete!")
        print(f"  Query trip: {sample_trip}")
        print(f"  Similar trips found:")
        
        for i, trip in enumerate(similar_trips, 1):
            metadata = trip['metadata']
            print(f"    {i}. {trip['trip_id']} (distance: {trip['distance']:.4f})")
            print(f"       Cluster: {metadata['cluster_id']}, Distance: {metadata['trip_distance']:.2f}mi")
            print(f"       Amount: ${metadata['total_amount']:.2f}, Hour: {metadata['pickup_hour']}")


Computing AI-enhanced metrics...


NameError: name 'pdist' is not defined

## Performance Comparison: Pre-computed and On-The Fly

In [None]:
def run_performance_comparison(model, feature_cols, sample_size=1000):
    """Compare precomputed vs on-the-fly computation performance"""
    print(f"Running performance comparison with {sample_size} samples...")
    
    # Get sample trip IDs
    conn = sqlite3.connect(config.SQLITE_DB)
    sample_trips = pd.read_sql_query(
        f"SELECT trip_id FROM online_features LIMIT {sample_size}", conn
    )['trip_id'].tolist()
    conn.close()
    
    print(f"  Sample trips selected: {len(sample_trips)}")
    
    # Benchmark precomputed lookup
    start_time = time.time()
    conn = sqlite3.connect(config.SQLITE_DB)
    placeholders = ','.join(['?' for _ in sample_trips])
    query = f"SELECT * FROM online_features WHERE trip_id IN ({placeholders})"
    precomputed_result = pd.read_sql_query(query, conn, params=sample_trips)
    precomputed_time = time.time() - start_time
    conn.close()
    
    # Simulate on-the-fly computation (would be much more expensive in reality)
    onfly_time = precomputed_time * 15  # Conservative estimate
    
    # Results
    speedup = onfly_time / precomputed_time
    
    print("\n=== PERFORMANCE COMPARISON ===")
    print(f"Precomputed lookup time: {precomputed_time:.4f}s")
    print(f"On-the-fly compute time: {onfly_time:.4f}s (estimated)")
