# AI Data Pipeline Demo

## Learning Objectives
- See AI extract structured data from messy documents
- Watch pipeline break and self-heal
- Understand production data processing patterns
- Learn error recovery and validation strategies

In [None]:
# Setup and imports
!pip install asksageclient pip_system_certs
from google.colab import drive
drive.mount('/content/drive')

import os
import json
import time
import tiktoken
from pathlib import Path
from typing import Dict, List, Any

# Import our AskSage client
from asksageclient import AskSageClient

# Get API credentials from Google Colab secrets
from google.colab import userdata
api_key = userdata.get('ASKSAGE_API_KEY')
email = userdata.get('ASKSAGE_EMAIL')

# Initialize client and tokenizer
client = AskSageClient(api_key=api_key, email=email)
tokenizer = tiktoken.encoding_for_model("gpt-4")
print("AskSage client initialized successfully")
print("Ready to showcase AI capabilities...")

In [None]:
# Messy input documents
messy_docs = [
    "Invoice #INV-2024-001 from ACME Corp dated March 15, 2024. Total: $12,450.00. Items: Office supplies (qty: 50, $8,200), Software licenses (qty: 3, $4,250). Payment due: April 15, 2024.",
    "Receipt - TechStore Inc. Date: 03/20/2024 Amount $3,299.99 Description: Laptop computer model XPS-15 Serial: ABC123XYZ Customer: John Smith Phone: (555) 123-4567",
    "Purchase Order PO-2024-0089 | Vendor: Office Depot | Date: March 22nd 2024 | Line items: Printer paper 10 reams @ $45 each = $450, Ink cartridges 5 units @ $89 each = $445 | Subtotal $895 | Tax $71.60 | Total $966.60"
]

print(f"Processing {len(messy_docs)} messy documents...")

In [None]:
# AI extraction with structured output
def extract_document_data(doc_text):
    prompt = f"""
    Extract structured data from this document:
    
    {doc_text}
    
    Return JSON:
    {{
      "document_type": "invoice|receipt|purchase_order",
      "document_id": "string",
      "date": "YYYY-MM-DD",
      "vendor": "string",
      "total_amount": "number",
      "line_items": [
        {{
          "description": "string",
          "quantity": "number",
          "unit_price": "number",
          "total": "number"
        }}
      ],
      "customer_info": {{
        "name": "string or null",
        "contact": "string or null"
      }}
    }}
    """
        
    # Test GPT-5-mini
    print("=== TESTING GPT-5-mini ===")

    response = client.query(
        message=prompt,
        system_prompt="You are an expert data engineer and pipeline architect. Design robust, scalable data processing systems.",
        temperature=0.2,
        model="gpt-5-mini",
        live=0,
        limit_references=0,
    )

    
    return response.get("message").strip()

# Process documents
extracted_data = []
for i, doc in enumerate(messy_docs):
    print(f"\nProcessing document {i+1}...")
    result = extract_document_data(doc)
    print(result[:200] + "...")
    extracted_data.append(result)

In [None]:
# Pipeline validation and error recovery
def validate_and_fix_json(json_text):
    try:
        # Try to parse JSON
        data = json.loads(json_text)
        return data, None
    except json.JSONDecodeError as e:
        print(f"JSON parsing failed: {e}")
        
        # AI fixes the JSON
        fix_prompt = f"""
        Fix this malformed JSON:
        
        {json_text}
        
        Return only valid JSON with proper structure.
        """
        
        # Test GPT-5-mini
        print("=== TESTING GPT-5-mini ===")

        response = client.query(
            message=fix_prompt,
            system_prompt="You are an expert data engineer and pipeline architect. Design robust, scalable data processing systems.",
            temperature=0.2,
            model="gpt-5-mini",
            live=0,
            limit_references=0,
        )

                
        fixed_json = response.get("message").strip()

        try:
            data = json.loads(fixed_json)
            return data, "fixed"
        except:
            return None, "failed"

# Validate all extracted data
validated_data = []
for i, raw_json in enumerate(extracted_data):
    print(f"\nValidating document {i+1}...")
    data, status = validate_and_fix_json(raw_json)
    
    if data:
        validated_data.append(data)
        print(f"✅ Success ({status or 'valid'})")
    else:
        print("❌ Failed validation")

print(f"\nPipeline processed {len(validated_data)}/{len(messy_docs)} documents successfully")

In [None]:
# Business intelligence generation
if validated_data:
    analysis_prompt = f"""
    Analyze this processed business data and generate insights:
    
    {json.dumps(validated_data, indent=2)}
    
    Provide business intelligence summary:
    {{
      "total_revenue": "number",
      "document_breakdown": {{}},
      "top_vendors": [],
      "spending_patterns": [],
      "key_insights": [],
      "recommendations": []
    }}
    """
    
    print("\n=== GENERATING BUSINESS INTELLIGENCE ===")
    
# Test GPT-5-mini
print("=== TESTING GPT-5-mini ===")
start_time = time.time()

response = client.query(
    message=prompt,
    system_prompt="You are an expert data engineer and pipeline architect. Design robust, scalable data processing systems.",
    temperature=0.2,
    model="gpt-5-mini",
    live=0,
    limit_references=0,
)

    
intelligence = response.get("message").strip()
print(intelligence)

print("\n✅ AI Data Pipeline completed successfully!")
print("📊 Messy documents → Structured data → Business insights")
print("❌ Pipeline failed - no valid data extracted")

## Key Takeaways

**Pipeline Resilience**
- Automatic error detection and recovery
- JSON validation and repair
- Graceful handling of malformed data

**Production Patterns**
- Structured output schemas
- Multi-stage processing
- Quality gates and validation

**Business Value**
- Automated document processing
- Real-time business intelligence
- Scalable data extraction