In [None]:
import psycopg2
import json
import hashlib
from datetime import datetime
from typing import Dict, List, Optional, Tuple
import pandas as pd

## Registry Discovery Functions

In [None]:
def list_available_embeddings(db_connection) -> pd.DataFrame:
    """Query embedding_registry to show available models with metadata.
    
    Returns:
        DataFrame with columns: model_alias, model_name, dimension, embedding_count, 
                                 chunk_source_dataset, created_at, chunk_size_config
    """
    query = '''
        SELECT 
            model_alias,
            model_name,
            dimension,
            embedding_count,
            chunk_source_dataset,
            chunk_size_config,
            created_at,
            last_accessed
        FROM embedding_registry
        ORDER BY created_at DESC
    '''
    return pd.read_sql(query, db_connection)

In [None]:
def get_embedding_metadata(db_connection, model_alias: str) -> Optional[Dict]:
    """Fetch metadata_json and other info for a specific model.
    
    Args:
        db_connection: PostgreSQL connection
        model_alias: The model alias (e.g., 'bge_base_en_v1_5')
        
    Returns:
        Dict with: dimension, embedding_count, config_hash (if stored),
                   chunk_source_dataset, created_at, metadata_json
    """
    with db_connection.cursor() as cur:
        cur.execute('''
            SELECT 
                dimension,
                embedding_count,
                chunk_source_dataset,
                chunk_size_config,
                created_at,
                metadata_json
            FROM embedding_registry
            WHERE model_alias = %s
        ''', (model_alias,))
        result = cur.fetchone()
        
        if not result:
            return None
        
        return {
            'dimension': result[0],
            'embedding_count': result[1],
            'chunk_source_dataset': result[2],
            'chunk_size_config': result[3],
            'created_at': result[4],
            'metadata_json': result[5] or {}
        }

In [None]:
def register_embedding(db_connection, model_alias: str, model_name: str, 
                       dimension: int, embedding_count: int, 
                       chunk_source_dataset: str = None,
                       chunk_size_config: int = None,
                       metadata: Dict = None) -> bool:
    """Register or update an embedding model in the registry.
    
    Call this after generating embeddings to enable discovery by other notebooks.
    
    Args:
        db_connection: PostgreSQL connection
        model_alias: Identifier for the model (e.g., 'bge_base_en_v1_5')
        model_name: Human-readable model name (e.g., 'BGE Base EN v1.5')
        dimension: Embedding vector dimension (usually 768)
        embedding_count: Number of embeddings stored
        chunk_source_dataset: Description of source data
        chunk_size_config: MAX_CHUNK_SIZE used during generation
        metadata: Optional dict with notes, URLs, training_date, etc.
        
    Returns:
        True if successful
    """
    if metadata is None:
        metadata = {}
    
    try:
        with db_connection.cursor() as cur:
            # Use INSERT ... ON CONFLICT for upsert behavior
            cur.execute('''
                INSERT INTO embedding_registry (
                    model_alias, model_name, dimension, embedding_count,
                    chunk_source_dataset, chunk_size_config, metadata_json
                )
                VALUES (%s, %s, %s, %s, %s, %s, %s)
                ON CONFLICT (model_alias) DO UPDATE SET
                    embedding_count = EXCLUDED.embedding_count,
                    chunk_source_dataset = COALESCE(EXCLUDED.chunk_source_dataset, embedding_registry.chunk_source_dataset),
                    chunk_size_config = COALESCE(EXCLUDED.chunk_size_config, embedding_registry.chunk_size_config),
                    metadata_json = EXCLUDED.metadata_json,
                    last_accessed = CURRENT_TIMESTAMP
            ''', (
                model_alias, model_name, dimension, embedding_count,
                chunk_source_dataset, chunk_size_config, json.dumps(metadata)
            ))
        db_connection.commit()
        print(f"✓ Registered embedding: {model_alias} ({embedding_count} chunks)")
        return True
    except Exception as e:
        print(f"✗ Failed to register embedding: {e}")
        db_connection.rollback()
        return False

## Experiment Tracking Functions

In [None]:
def compute_config_hash(config_dict: Dict) -> str:
    """Create deterministic SHA256 hash of a configuration dictionary.
    
    This enables finding all experiments with identical configurations.
    
    Args:
        config_dict: Configuration parameters
        
    Returns:
        SHA256 hash string (first 12 characters for readability)
    """
    config_str = json.dumps(config_dict, sort_keys=True)
    hash_obj = hashlib.sha256(config_str.encode())
    return hash_obj.hexdigest()[:12]

In [None]:
def start_experiment(db_connection, experiment_name: str, 
                     notebook_path: str = None,
                     embedding_model_alias: str = None,
                     config: Dict = None,
                     techniques: List[str] = None,
                     notes: str = None) -> int:
    """Start a new experiment and return its ID for tracking.
    
    Args:
        db_connection: PostgreSQL connection
        experiment_name: Human-readable experiment name
        notebook_path: Path to the notebook running this experiment
        embedding_model_alias: Which embedding model is being used
        config: Dict of configuration parameters
        techniques: List of techniques being applied (e.g., ['reranking', 'query_expansion'])
        notes: Optional notes about the experiment
        
    Returns:
        Experiment ID for use in save_metrics() and complete_experiment()
    """
    if config is None:
        config = {}
    if techniques is None:
        techniques = []
    
    config_hash = compute_config_hash(config)
    
    with db_connection.cursor() as cur:
        cur.execute('''
            INSERT INTO experiments (
                experiment_name, notebook_path, embedding_model_alias,
                config_hash, config_json, techniques_applied, notes, status
            )
            VALUES (%s, %s, %s, %s, %s, %s, %s, 'running')
            RETURNING id
        ''', (
            experiment_name,
            notebook_path,
            embedding_model_alias,
            config_hash,
            json.dumps(config),
            techniques,
            notes
        ))
        exp_id = cur.fetchone()[0]
    db_connection.commit()
    print(f"✓ Started experiment #{exp_id}: {experiment_name}")
    return exp_id

In [None]:
def complete_experiment(db_connection, experiment_id: int, 
                       status: str = 'completed',
                       notes: str = None) -> bool:
    """Mark an experiment as complete.
    
    Args:
        db_connection: PostgreSQL connection
        experiment_id: ID returned from start_experiment()
        status: 'completed' or 'failed'
        notes: Optional update to notes field
        
    Returns:
        True if successful
    """
    try:
        with db_connection.cursor() as cur:
            update_notes = ", notes = %s" if notes else ""
            params = [status, experiment_id] if not notes else [status, notes, experiment_id]
            
            cur.execute(f'''
                UPDATE experiments
                SET status = %s{update_notes}, completed_at = CURRENT_TIMESTAMP
                WHERE id = %s
            ''', params)
        db_connection.commit()
        print(f"✓ Experiment #{experiment_id} marked as {status}")
        return True
    except Exception as e:
        print(f"✗ Failed to complete experiment: {e}")
        db_connection.rollback()
        return False

In [None]:
def save_metrics(db_connection, experiment_id: int, metrics_dict: Dict,
                 export_to_file: bool = True,
                 export_dir: str = 'data/experiment_results') -> Tuple[bool, str]:
    """Save experiment metrics to database and optionally to JSON file.
    
    Args:
        db_connection: PostgreSQL connection
        experiment_id: ID from start_experiment()
        metrics_dict: Dict of {metric_name: value, ...}
                      Can also nest details: {metric_name: {value: X, details: {...}}}
        export_to_file: Whether to also save to filesystem JSON
        export_dir: Directory for JSON exports
        
    Returns:
        Tuple of (success: bool, message: str)
    """
    import os
    
    try:
        with db_connection.cursor() as cur:
            for metric_name, metric_data in metrics_dict.items():
                # Handle both simple floats and nested dicts with details
                if isinstance(metric_data, dict):
                    metric_value = metric_data.get('value', 0.0)
                    metric_details = metric_data.get('details', {})
                else:
                    metric_value = metric_data
                    metric_details = {}
                
                cur.execute('''
                    INSERT INTO evaluation_results (
                        experiment_id, metric_name, metric_value, metric_details_json
                    )
                    VALUES (%s, %s, %s, %s)
                ''', (
                    experiment_id,
                    metric_name,
                    float(metric_value),
                    json.dumps(metric_details) if metric_details else '{}'
                ))
        db_connection.commit()
        
        # Export to file if requested
        file_path = None
        if export_to_file:
            os.makedirs(export_dir, exist_ok=True)
            timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
            file_path = os.path.join(export_dir, f'experiment_{experiment_id}_{timestamp}.json')
            with open(file_path, 'w') as f:
                json.dump({
                    'experiment_id': experiment_id,
                    'timestamp': timestamp,
                    'metrics': metrics_dict
                }, f, indent=2)
        
        msg = f"✓ Saved {len(metrics_dict)} metrics for experiment #{experiment_id}"
        if file_path:
            msg += f" to {file_path}"
        print(msg)
        return True, msg
    except Exception as e:
        msg = f"✗ Failed to save metrics: {e}"
        print(msg)
        db_connection.rollback()
        return False, msg

## Query & Analysis Functions

In [None]:
def get_experiment(db_connection, experiment_id: int) -> Optional[Dict]:
    """Fetch experiment details and associated metrics.
    
    Args:
        db_connection: PostgreSQL connection
        experiment_id: Experiment ID
        
    Returns:
        Dict with experiment info and metrics
    """
    with db_connection.cursor() as cur:
        # Get experiment
        cur.execute('SELECT * FROM experiments WHERE id = %s', (experiment_id,))
        exp = cur.fetchone()
        
        if not exp:
            return None
        
        # Get metrics for this experiment
        cur.execute('''
            SELECT metric_name, metric_value, metric_details_json
            FROM evaluation_results
            WHERE experiment_id = %s
            ORDER BY metric_name
        ''', (experiment_id,))
        metrics = {row[0]: {'value': row[1], 'details': row[2]} for row in cur.fetchall()}
        
        return {
            'id': exp[0],
            'name': exp[1],
            'notebook': exp[2],
            'embedding_model': exp[3],
            'config_hash': exp[4],
            'config': exp[5],
            'techniques': exp[6],
            'started_at': exp[7],
            'completed_at': exp[8],
            'status': exp[9],
            'notes': exp[10],
            'metrics': metrics
        }

In [None]:
def list_experiments(db_connection, limit: int = 20,
                    status: str = None,
                    embedding_model: str = None) -> pd.DataFrame:
    """List recent experiments with optional filtering.
    
    Args:
        db_connection: PostgreSQL connection
        limit: Max number of results
        status: Filter by status ('running', 'completed', 'failed')
        embedding_model: Filter by embedding model alias
        
    Returns:
        DataFrame of experiments
    """
    query = 'SELECT * FROM experiments WHERE 1=1'
    params = []
    
    if status:
        query += ' AND status = %s'
        params.append(status)
    
    if embedding_model:
        query += ' AND embedding_model_alias = %s'
        params.append(embedding_model)
    
    query += f' ORDER BY started_at DESC LIMIT {limit}'
    
    return pd.read_sql(query, db_connection, params=params)

In [None]:
def compare_experiments(db_connection, experiment_ids: List[int],
                       metric_names: List[str] = None) -> pd.DataFrame:
    """Compare metrics across multiple experiments side-by-side.
    
    Args:
        db_connection: PostgreSQL connection
        experiment_ids: List of experiment IDs to compare
        metric_names: Specific metrics to compare (if None, all metrics)
        
    Returns:
        DataFrame with experiments as rows, metrics as columns
    """
    placeholders = ','.join(['%s'] * len(experiment_ids))
    
    query = f'''
        SELECT 
            e.id,
            e.experiment_name,
            e.embedding_model_alias,
            r.metric_name,
            r.metric_value
        FROM experiments e
        LEFT JOIN evaluation_results r ON e.id = r.experiment_id
        WHERE e.id IN ({placeholders})
    '''
    
    if metric_names:
        placeholders_metrics = ','.join(['%s'] * len(metric_names))
        query += f' AND r.metric_name IN ({placeholders_metrics})'
        params = experiment_ids + metric_names
    else:
        params = experiment_ids
    
    df = pd.read_sql(query, db_connection, params=params)
    # Pivot to get metrics as columns
    return df.pivot_table(
        index=['id', 'experiment_name', 'embedding_model_alias'],
        columns='metric_name',
        values='metric_value'
    ).reset_index()