# Simple ETL Pipeline for DPO Training Data

This notebook implements a straightforward ETL process to prepare preference data for OpenAI DPO fine-tuning.

## Process Overview
1. **Extract** data from `feedback_comparisons` table
2. **Transform** HTML to text and apply preference logic
3. **Load** into new `dpo_training_data` table


## 1. Setup and Dependencies


In [9]:
# Import required libraries
import os
import json
import uuid
from datetime import datetime
from typing import List, Dict, Any
import warnings
warnings.filterwarnings('ignore')

# Data processing
from bs4 import BeautifulSoup
import html

# Database
from supabase import create_client, Client

# Environment
from dotenv import load_dotenv

# Load environment variables
load_dotenv('.env.local')

print("✅ All libraries imported successfully")


✅ All libraries imported successfully


## 2. Initialize Supabase Connection


In [10]:
# Initialize Supabase client
def initialize_supabase():
    """Initialize Supabase client using environment variables"""
    url = os.getenv("NEXT_PUBLIC_SUPABASE_URL")
    key = os.getenv("SUPABASE_SERVICE_ROLE_KEY")
    
    if not url or not key:
        raise ValueError("Missing Supabase credentials in .env.local")
    
    supabase: Client = create_client(url, key)
    return supabase

# Initialize connection
try:
    supabase = initialize_supabase()
    print("✅ Supabase client initialized successfully")
except Exception as e:
    print(f"❌ Error initializing Supabase: {e}")


✅ Supabase client initialized successfully


## 3. HTML Cleaning Function


In [11]:
# HTML cleaning function (Option A: Simple Strip)
def clean_html(content):
    """
    Clean HTML content using simple strip approach
    
    Args:
        content: HTML content string
    
    Returns:
        Cleaned plain text
    """
    if not content:
        return ""
    
    try:
        # Decode HTML entities (&amp; → &, &lt; → <, etc.)
        decoded = html.unescape(str(content))
        
        # Remove all HTML tags using BeautifulSoup
        soup = BeautifulSoup(decoded, 'html.parser')
        
        # Get plain text and clean up whitespace
        clean_text = soup.get_text().strip()
        
        return clean_text
    except Exception as e:
        print(f"⚠️ Error cleaning HTML: {e}")
        return str(content)  # Return original if cleaning fails

# Test the HTML cleaning function
test_html = "<p>This is a <strong>test</strong> with <em>HTML</em> content.</p>"
cleaned = clean_html(test_html)
print(f"✅ HTML cleaning test:")
print(f"  Original: {test_html}")
print(f"  Cleaned:  {cleaned}")


✅ HTML cleaning test:
  Original: <p>This is a <strong>test</strong> with <em>HTML</em> content.</p>
  Cleaned:  This is a test with HTML content.


## 4. Create DPO Training Data Table


In [12]:
# Verify dpo_training_data table is accessible
def verify_dpo_table():
    """Verify that the dpo_training_data table is accessible"""
    
    try:
        # Try to query the table to verify it exists and is accessible
        response = supabase.table('dpo_training_data').select('uuid').limit(1).execute()
        print("✅ dpo_training_data table is accessible and ready")
        return True
        
    except Exception as e:
        print(f"❌ Error accessing dpo_training_data table: {e}")
        print("Please ensure the table exists in your Supabase database")
        return False

# Verify the table
table_ready = verify_dpo_table()


✅ dpo_training_data table is accessible and ready


## 5. Extract Data from feedback_comparisons


In [13]:
# Extract data from feedback_comparisons with filtering
def extract_feedback_data():
    """
    Extract data from feedback_comparisons table with basic filtering
    
    Returns:
        List of valid feedback records
    """
    try:
        # Query feedback_comparisons with basic filtering
        response = supabase.table('feedback_comparisons').select('*').execute()
        
        if not response.data:
            print("⚠️ No data found in feedback_comparisons table")
            return []
        
        print(f"📊 Extracted {len(response.data)} total records")
        
        # Apply filtering logic
        filtered_records = []
        skipped_count = 0
        
        for record in response.data:
            # Check required fields
            if (record.get('user_preference') and 
                record.get('current_summary') and 
                record.get('advanced_summary') and 
                record.get('article_id')):
                
                # Check preference is valid
                if record['user_preference'] in ['A', 'B']:
                    filtered_records.append(record)
                else:
                    skipped_count += 1
                    print(f"⚠️ Skipped record - invalid preference: {record.get('user_preference')}")
            else:
                skipped_count += 1
                print(f"⚠️ Skipped record - missing required fields")
        
        print(f"✅ {len(filtered_records)} valid records after filtering")
        print(f"⚠️ {skipped_count} records skipped")
        
        return filtered_records
        
    except Exception as e:
        print(f"❌ Error extracting data: {e}")
        return []

# Extract the data
feedback_data = extract_feedback_data()

# Show sample record structure
if feedback_data:
    print(f"\n📝 Sample record structure:")
    sample_record = feedback_data[0]
    for key, value in sample_record.items():
        print(f"  {key}: {str(value)[:100]}{'...' if len(str(value)) > 100 else ''}")
else:
    print("❌ No valid data extracted")


📊 Extracted 50 total records
⚠️ Skipped record - missing required fields
⚠️ Skipped record - missing required fields
⚠️ Skipped record - missing required fields
⚠️ Skipped record - missing required fields
✅ 46 valid records after filtering
⚠️ 4 records skipped

📝 Sample record structure:
  id: 9e57bad2-75fd-4e1e-b35c-7cf5daeae67f
  session_id: 0748b433-dcfc-42d0-bca2-05c5650331d7
  recipient_id: c2f3f6cf-658e-4602-972b-1105ae245df5
  summary_id: 2bead982-11c5-42a6-9c70-08391a63296c
  article_id: 9d820575-fcd3-4523-b097-9b393815c245
  current_summary: <!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content...
  advanced_summary: <div>
  <p><strong>How to create state-of-the-art genetic model systems: strategies for …</strong></...
  current_model: gpt-4o-mini
  advanced_model: gpt-5
  user_preference: B
  comparison_order: 1
  extraction_method: generated
  created_at: 2025-08-28T00:03:34.433573+00:00


## 6. Transform Data


In [14]:
# Transform data: Clean HTML and apply preference logic
def get_article_content(article_id):
    """Get article content by article_id"""
    try:
        response = supabase.table('articles').select('content').eq('id', article_id).execute()
        if response.data:
            return response.data[0]['content']
        else:
            print(f"⚠️ Article not found for ID: {article_id}")
            return None
    except Exception as e:
        print(f"❌ Error fetching article {article_id}: {e}")
        return None

def transform_feedback_data(feedback_records):
    """
    Transform feedback data into DPO training format
    
    Args:
        feedback_records: List of feedback comparison records
    
    Returns:
        List of transformed DPO training records
    """
    transformed_records = []
    errors = []
    
    for i, record in enumerate(feedback_records):
        try:
            # Get article content
            article_content = get_article_content(record['article_id'])
            if not article_content:
                errors.append(f"Record {i}: Article not found")
                continue
            
            # Clean HTML summaries
            current_clean = clean_html(record['current_summary'])
            advanced_clean = clean_html(record['advanced_summary'])
            
            # Apply preference logic
            if record['user_preference'] == 'A':
                preferred_output = current_clean
                non_preferred_output = advanced_clean
            elif record['user_preference'] == 'B':
                preferred_output = advanced_clean
                non_preferred_output = current_clean
            else:
                errors.append(f"Record {i}: Invalid preference")
                continue
            
            # Create DPO training record
            dpo_record = {
                'uuid': str(uuid.uuid4()),
                'input_message': article_content,
                'preferred_output': preferred_output,
                'non_preferred_output': non_preferred_output
            }
            
            transformed_records.append(dpo_record)
            
        except Exception as e:
            errors.append(f"Record {i}: {str(e)}")
            continue
    
    print(f"✅ Transformed {len(transformed_records)} records successfully")
    if errors:
        print(f"⚠️ {len(errors)} records failed transformation:")
        for error in errors[:5]:  # Show first 5 errors
            print(f"  {error}")
        if len(errors) > 5:
            print(f"  ... and {len(errors) - 5} more errors")
    
    return transformed_records

# Transform the data
if feedback_data:
    print("🔄 Transforming feedback data...")
    dpo_records = transform_feedback_data(feedback_data)
    
    # Show sample transformed record
    if dpo_records:
        print(f"\n📝 Sample transformed record:")
        sample = dpo_records[0]
        for key, value in sample.items():
            if key == 'input_message':
                print(f"  {key}: {value[:100]}... (length: {len(value)})")
            else:
                print(f"  {key}: {value[:100]}{'...' if len(value) > 100 else ''}")
else:
    print("❌ No data to transform")


🔄 Transforming feedback data...
✅ Transformed 46 records successfully

📝 Sample transformed record:
  uuid: 8e99580f-45ee-432d-93c2-55200a6a67f4
  input_message: Jun 28, 2018 ... Engineering CRISPR/Cpf1 with tRNA promotes genome ... molecules enhance CRISPR/Cas9... (length: 160)
  preferred_output: How to create state-of-the-art genetic model systems: strategies for …
This article outlines practic...
  non_preferred_output: Summary of Synthetic Biology Article


Summary of "How to create state-of-the-art genetic model syst...


## 7. Load Data into DPO Training Table


In [15]:
# Load transformed data into dpo_training_data table
def load_dpo_data(dpo_records, batch_size=50):
    """
    Load DPO training records into the database
    
    Args:
        dpo_records: List of transformed DPO records
        batch_size: Number of records to insert per batch
    
    Returns:
        Success count and error details
    """
    if not dpo_records:
        print("❌ No records to load")
        return 0, []
    
    success_count = 0
    errors = []
    
    # Process in batches
    for i in range(0, len(dpo_records), batch_size):
        batch = dpo_records[i:i + batch_size]
        batch_num = (i // batch_size) + 1
        total_batches = (len(dpo_records) + batch_size - 1) // batch_size
        
        print(f"📤 Loading batch {batch_num}/{total_batches} ({len(batch)} records)...")
        
        try:
            # Insert batch
            response = supabase.table('dpo_training_data').insert(batch).execute()
            
            if response.data:
                success_count += len(batch)
                print(f"✅ Batch {batch_num} loaded successfully")
            else:
                errors.append(f"Batch {batch_num}: No data returned from insert")
                
        except Exception as e:
            error_msg = f"Batch {batch_num}: {str(e)}"
            errors.append(error_msg)
            print(f"❌ {error_msg}")
    
    print(f"\n📊 Load Summary:")
    print(f"  ✅ Successfully loaded: {success_count} records")
    print(f"  ❌ Failed: {len(errors)} batches")
    
    if errors:
        print(f"  ⚠️ Errors:")
        for error in errors:
            print(f"    {error}")
    
    return success_count, errors

# Load the data
if 'dpo_records' in locals() and dpo_records:
    print("🚀 Loading DPO training data...")
    success_count, load_errors = load_dpo_data(dpo_records)
    
    if success_count > 0:
        print(f"\n🎉 ETL Pipeline Complete!")
        print(f"✅ {success_count} records ready for DPO fine-tuning")
        
        # Verify the data was loaded
        try:
            verify_response = supabase.table('dpo_training_data').select('count').execute()
            print(f"📊 Total records in dpo_training_data table: {len(verify_response.data)}")
        except:
            print("📊 Verification query failed, but data was loaded")
    else:
        print("❌ No data was loaded successfully")
else:
    print("❌ No transformed data available to load")


🚀 Loading DPO training data...
📤 Loading batch 1/1 (46 records)...
✅ Batch 1 loaded successfully

📊 Load Summary:
  ✅ Successfully loaded: 46 records
  ❌ Failed: 0 batches

🎉 ETL Pipeline Complete!
✅ 46 records ready for DPO fine-tuning
📊 Total records in dpo_training_data table: 1


## 8. Complete ETL Pipeline Function


In [16]:
# Complete ETL pipeline function
def run_complete_etl():
    """
    Run the complete ETL pipeline from start to finish
    
    Returns:
        Dictionary with pipeline results
    """
    print("🚀 Starting Complete ETL Pipeline")
    print("=" * 50)
    
    results = {
        'extracted': 0,
        'transformed': 0,
        'loaded': 0,
        'errors': []
    }
    
    try:
        # Step 1: Extract
        print("\n📊 Step 1: Extracting data...")
        feedback_data = extract_feedback_data()
        results['extracted'] = len(feedback_data)
        
        if not feedback_data:
            results['errors'].append("No data extracted")
            return results
        
        # Step 2: Transform
        print("\n🔄 Step 2: Transforming data...")
        dpo_records = transform_feedback_data(feedback_data)
        results['transformed'] = len(dpo_records)
        
        if not dpo_records:
            results['errors'].append("No data transformed")
            return results
        
        # Step 3: Load
        print("\n📤 Step 3: Loading data...")
        success_count, load_errors = load_dpo_data(dpo_records)
        results['loaded'] = success_count
        results['errors'].extend(load_errors)
        
        print(f"\n🎉 ETL Pipeline Complete!")
        print(f"✅ Extracted: {results['extracted']} records")
        print(f"✅ Transformed: {results['transformed']} records")
        print(f"✅ Loaded: {results['loaded']} records")
        
        if results['errors']:
            print(f"⚠️ Errors: {len(results['errors'])}")
        
        return results
        
    except Exception as e:
        error_msg = f"Pipeline failed: {str(e)}"
        results['errors'].append(error_msg)
        print(f"❌ {error_msg}")
        return results

# Run the complete pipeline
print("Ready to run the complete ETL pipeline!")
print("Execute: run_complete_etl()")
print("\nOr run individual steps:")
print("- extract_feedback_data()")
print("- transform_feedback_data(feedback_data)")
print("- load_dpo_data(dpo_records)")


Ready to run the complete ETL pipeline!
Execute: run_complete_etl()

Or run individual steps:
- extract_feedback_data()
- transform_feedback_data(feedback_data)
- load_dpo_data(dpo_records)
