To run this Fenic demo, click **Runtime** > **Run all**.

<div class="align-center">
<a href="https://github.com/typedef-ai/fenic"><img src="https://github.com/typedef-ai/fenic/blob/main/docs/images/typedef-fenic-logo-github-yellow.png" height="50"></a>
<a href="https://discord.gg/GdqF3J7huR"><img src="https://github.com/typedef-ai/fenic/blob/main/docs/images/join-the-discord.png" height="50"></a>
<a href="https://docs.fenic.ai/latest/"><img src="https://github.com/typedef-ai/fenic/blob/main/docs/images/documentation.png" height="50"></a>

Questions? Join the Discord and ask away! For feature requests or to leave a star, visit our [GitHub](https://github.com/typedef-ai/fenic).

</div>

In [None]:
!pip uninstall -y sklearn-compat ibis-framework imbalanced-learn google-genai
!pip install polars==1.30.0
# === GOOGLE GEMINI ===
#!pip install fenic[google]
# === ANTHROPIC CLAUDE ===
#!pip install fenic[anthropic]
# === OPENAI (Default) ===
!pip install fenic

In [None]:
import os 
import getpass

# 🔌 MULTI-PROVIDER SETUP - Choose your preferred LLM provider
# Uncomment ONE of the provider sections below:

# === OPENAI (Default) ===
os.environ["OPENAI_API_KEY"] = getpass.getpass("OpenAI API Key:")

# === GOOGLE GEMINI ===
# os.environ["GOOGLE_API_KEY"] = getpass.getpass("Google API Key:")

# === ANTHROPIC CLAUDE ===
# os.environ["ANTHROPIC_API_KEY"] = getpass.getpass("Anthropic API Key:")

# ⚡ Batch Processing

**Hook:** *"Process 10,000 customer reviews in minutes, not hours"*

Enterprise AI means scale - analyzing thousands of documents, processing entire customer databases, handling massive datasets. Traditional LLM calls would take hours and hit rate limits. Watch Fenic's intelligent batching system optimize throughput with automatic chunking, parallel processing, and progress tracking.

**What you'll see in this 2-minute demo:**
- 📊 **Large-scale dataset** - Thousands of customer reviews
- 🚀 **Intelligent batching** - Automatic chunk optimization
- 📈 **Progress tracking** - Real-time processing metrics
- ⚡ **Parallel execution** - Maximize throughput while respecting rate limits

Perfect for enterprise data processing and large-scale AI workflows.

In [None]:
import fenic as fc
from pydantic import BaseModel, Field
from typing import List, Literal
from fenic.core.types.classify import ClassDefinition
import time
import random

# ⚡ Configure session for high-throughput batch processing
session = fc.Session.get_or_create(fc.SessionConfig(
    app_name="batch_processing_demo",
    semantic=fc.SemanticConfig(
        language_models={
            "batch_processor": fc.OpenAILanguageModel(model_name="gpt-4o-mini", rpm=1000, tpm=500_000, batch_size=20, max_concurrent=5),
            # "batch_processor": fc.GoogleDeveloperLanguageModel(model_name="gemini-2.5-flash-lite", rpm=1000, tpm=1_000_000),
            # "batch_processor": fc.AnthropicLanguageModel(model_name="claude-3-5-sonnet-20241022", rpm=1000, tpm=500_000)
        }
    )
))

print("✅ High-throughput batch processing session configured")
print("   • Model: GPT-4o-mini optimized for batch processing")
print("   • Rate limit: 1000 RPM / 500K TPM")
print("   • Batch size: 20 items per API call")
print("   • Concurrency: 5 parallel batches")

## 📊 Step 1: Large-Scale Dataset Generation

Simulate a realistic enterprise dataset with thousands of customer reviews:

In [None]:
# 📊 Generate large-scale customer review dataset

# Review templates for realistic variety
positive_templates = [
    "Excellent service! The {feature} feature is exactly what we needed. Our team productivity increased by {percent}%. Highly recommend!",
    "Love the new {feature} update. Makes our {department} workflow so much smoother. Great job on the {quality} improvements!",
    "Outstanding customer support! {person} helped us resolve our {issue_type} issue in {time_frame}. Very impressed.",
    "The {feature} functionality is game-changing for our {industry} business. ROI was positive within {time_frame}.",
    "Great user experience with the new {feature}. Interface is {quality} and performance is {quality}."
]

negative_templates = [
    "Disappointed with the recent {feature} changes. Our {department} team is struggling with {issue_type} problems.",
    "The {feature} feature is {quality} and has been causing {issue_type} issues for {time_frame}. Please fix!",
    "Customer support took {time_frame} to respond to our {issue_type} ticket. This is unacceptable for enterprise customers.",
    "Pricing increase is concerning. We're paying {percent}% more for the same {feature} functionality.",
    "The {feature} integration is {quality} and lacks proper {issue_type} documentation."
]

neutral_templates = [
    "The {feature} feature works as expected. Some minor {issue_type} issues but overall functional.",
    "Using the platform for {time_frame} now. {feature} is adequate for our {industry} needs.",
    "Good {feature} functionality. Would like to see improvements in {issue_type} handling.",
    "The {feature} update has both pros and cons. Performance improved but {issue_type} increased.",
    "Standard {feature} experience. Works fine but nothing exceptional compared to competitors."
]

# Data for template substitution
features = ["API", "dashboard", "analytics", "reporting", "integration", "security", "mobile app", "workflow", "automation"]
departments = ["engineering", "marketing", "sales", "support", "operations", "finance", "HR", "legal"]
industries = ["healthcare", "fintech", "retail", "manufacturing", "education", "government", "startup", "enterprise"]
qualities = ["intuitive", "confusing", "reliable", "buggy", "fast", "slow", "comprehensive", "limited"]
issue_types = ["performance", "usability", "security", "integration", "billing", "authentication", "data sync"]
time_frames = ["2 weeks", "1 month", "3 months", "6 months", "1 year", "2 years"]
people = ["Sarah", "Mike", "Jennifer", "David", "Lisa", "Alex", "Maria", "John"]
percents = ["15", "25", "30", "40", "50", "60"]

def generate_review(template_type):
    """Generate a realistic review from templates"""
    if template_type == "positive":
        template = random.choice(positive_templates)
    elif template_type == "negative":
        template = random.choice(negative_templates)
    else:
        template = random.choice(neutral_templates)
    
    # Fill in template variables
    return template.format(
        feature=random.choice(features),
        department=random.choice(departments),
        industry=random.choice(industries),
        quality=random.choice(qualities),
        issue_type=random.choice(issue_types),
        time_frame=random.choice(time_frames),
        person=random.choice(people),
        percent=random.choice(percents)
    )

# Generate large dataset (adjust size for demo purposes)
dataset_size = 500  # Reduced for demo - in production this could be 10,000+
print(f"🏭 Generating {dataset_size} customer reviews for batch processing...")

# Create balanced sentiment distribution
review_data = []
for i in range(dataset_size):
    # 40% positive, 30% negative, 30% neutral
    sentiment_type = random.choices(
        ["positive", "negative", "neutral"], 
        weights=[40, 30, 30]
    )[0]
    
    review_data.append({
        "review_id": f"REV{i+1:05d}",
        "customer_segment": random.choice(["enterprise", "mid_market", "startup", "individual"]),
        "product_area": random.choice(features),
        "review_text": generate_review(sentiment_type),
        "submission_date": f"2024-{random.randint(1,12):02d}-{random.randint(1,28):02d}"
    })

# Create Fenic DataFrame
large_dataset = session.create_dataframe(review_data)

print(f"✅ Generated {dataset_size} reviews across 4 customer segments")
print("📊 Sample of the dataset:")
large_dataset.limit(5).show()

## 🧠 Step 2: Batch Analysis Schema

Define comprehensive analysis for enterprise-scale processing:

In [None]:
# 🧠 Comprehensive review analysis schema
class ReviewAnalysis(BaseModel):
    sentiment: Literal["positive", "negative", "neutral"] = Field(description="Overall sentiment classification")
    satisfaction_score: float = Field(description="Satisfaction rating 1.0-10.0")
    key_themes: List[str] = Field(description="Top 2-3 themes mentioned")
    urgency_level: Literal["low", "medium", "high", "critical"] = Field(description="Response urgency level")
    product_feedback_type: Literal["feature_request", "bug_report", "praise", "complaint", "suggestion"] = Field(description="Type of feedback")
    business_impact: Literal["churn_risk", "expansion_opportunity", "neutral", "advocacy_potential"] = Field(description="Revenue/retention implications")
    confidence: float = Field(description="Analysis confidence 0.0-1.0")

print("🧠 Batch Analysis Schema:")
print("   • sentiment: Positive/negative/neutral classification")
print("   • satisfaction_score: Quantified happiness (1-10)")
print("   • key_themes: Main topics extracted")
print("   • urgency_level: Response priority assessment")
print("   • product_feedback_type: Categorized feedback type")
print("   • business_impact: Revenue/retention implications")
print("   • confidence: AI certainty measure")
print(f"\n📏 Dataset size: {dataset_size} reviews ready for batch processing")

# ⚡ Execute large-scale batch processing
print("🚀 Starting enterprise-scale batch processing...")
print(f"📊 Processing {dataset_size} reviews with intelligent batching")
print("="*60)

batch_start_time = time.time()

try:
    # Execute batch semantic analysis
    processed_reviews = large_dataset.select(
        "review_id",
        "customer_segment",
        "product_area",
        "review_text",
        fc.semantic.extract(
            "review_text",
            ReviewAnalysis,
            model_alias="batch_processor"
        ).alias("analysis")
    ).cache()  # Cache results to avoid reprocessing
    
    # Force execution and measure performance
    print("⏳ Processing in progress...")
    total_processed = processed_reviews.count()
    batch_duration = time.time() - batch_start_time
    
    print(f"\n✅ BATCH PROCESSING COMPLETE!")
    print(f"   • Total reviews processed: {total_processed:,}")
    print(f"   • Total processing time: {batch_duration:.2f} seconds")
    print(f"   • Average time per review: {(batch_duration/total_processed)*1000:.1f}ms")
    print(f"   • Processing throughput: {total_processed/batch_duration:.1f} reviews/second")
    
    # Show sample results
    print(f"\n📊 SAMPLE BATCH RESULTS:")
    sample_results = processed_reviews.select(
        "review_id",
        "customer_segment",
        processed_reviews.analysis.sentiment.alias("sentiment"),
        processed_reviews.analysis.satisfaction_score.alias("satisfaction"),
        processed_reviews.analysis.business_impact.alias("impact")
    ).limit(8)
    
    sample_results.show()
    
except Exception as e:
    batch_duration = time.time() - batch_start_time
    print(f"⚠️ Batch processing encountered issues after {batch_duration:.2f}s:")
    print(f"   Error: {str(e)[:150]}...")
    print("   • Implementing graceful degradation...")
    
    # Fallback to simpler analysis with descriptive classifications
    try:
        simplified_analysis = large_dataset.select(
            "review_id",
            "customer_segment",
            fc.semantic.classify(
                "review_text",
                [
                    ClassDefinition(label="positive", description="Positive feedback, satisfaction, or praise for the product or service"),
                    ClassDefinition(label="negative", description="Complaints, dissatisfaction, or criticism about the product or service"),
                    ClassDefinition(label="neutral", description="Neutral comments, factual statements, or mixed feedback without clear sentiment")
                ],
                model_alias="batch_processor"
            ).alias("sentiment")
        ).cache()
        
        fallback_count = simplified_analysis.count()
        print(f"✅ Fallback processing completed: {fallback_count} reviews (simplified analysis)")
        
    except Exception as fallback_error:
        print(f"💥 Complete batch failure: {str(fallback_error)[:100]}")

In [None]:
# ⚡ Execute large-scale batch processing
print("🚀 Starting enterprise-scale batch processing...")
print(f"📊 Processing {dataset_size} reviews with intelligent batching")
print("="*60)

batch_start_time = time.time()

try:
    # Execute batch semantic analysis
    processed_reviews = large_dataset.select(
        "review_id",
        "customer_segment",
        "product_area",
        "review_text",
        fc.semantic.extract(
            "review_text",
            ReviewAnalysis,
            model_alias="batch_processor"
        ).alias("analysis")
    ).cache()  # Cache results to avoid reprocessing
    
    # Force execution and measure performance
    print("⏳ Processing in progress...")
    total_processed = processed_reviews.count()
    batch_duration = time.time() - batch_start_time
    
    print("\n✅ BATCH PROCESSING COMPLETE!")
    print(f"   • Total reviews processed: {total_processed:,}")
    print(f"   • Total processing time: {batch_duration:.2f} seconds")
    print(f"   • Average time per review: {(batch_duration/total_processed)*1000:.1f}ms")
    print(f"   • Processing throughput: {total_processed/batch_duration:.1f} reviews/second")
    
    # Show sample results
    print("\n📊 SAMPLE BATCH RESULTS:")
    sample_results = processed_reviews.select(
        "review_id",
        "customer_segment",
        processed_reviews.analysis.sentiment.alias("sentiment"),
        processed_reviews.analysis.satisfaction_score.alias("satisfaction"),
        processed_reviews.analysis.business_impact.alias("impact")
    ).limit(8)
    
    sample_results.show()
    
except Exception as e:
    batch_duration = time.time() - batch_start_time
    print(f"⚠️ Batch processing encountered issues after {batch_duration:.2f}s:")
    print(f"   Error: {str(e)[:150]}...")
    print("   • Implementing graceful degradation...")
    
    # Fallback to simpler analysis with descriptive classifications
    try:
        simplified_analysis = large_dataset.select(
            "review_id",
            "customer_segment",
            fc.semantic.classify(
                "review_text",
                [
                    ClassDefinition(name="positive", description="Positive feedback, satisfaction, or praise for the product or service"),
                    ClassDefinition(name="negative", description="Complaints, dissatisfaction, or criticism about the product or service"),
                    ClassDefinition(name="neutral", description="Neutral comments, factual statements, or mixed feedback without clear sentiment")
                ],
                model_alias="batch_processor"
            ).alias("sentiment")
        ).cache()
        
        fallback_count = simplified_analysis.count()
        print(f"✅ Fallback processing completed: {fallback_count} reviews (simplified analysis)")
        
    except Exception as fallback_error:
        print(f"💥 Complete batch failure: {str(fallback_error)[:100]}")

## 📈 Step 4: Enterprise Analytics Dashboard

Analyze batch processing results for business insights:

In [None]:
# 📈 Enterprise-scale analytics from batch results
try:
    if 'processed_reviews' in locals():
        print("📊 ENTERPRISE ANALYTICS DASHBOARD")
        print("="*50)
        
        # Sentiment distribution analysis
        sentiment_breakdown = processed_reviews.group_by(
            processed_reviews.analysis.sentiment
        ).agg(
            fc.count("*").alias("count"),
            fc.avg(processed_reviews.analysis.satisfaction_score).alias("avg_satisfaction")
        ).order_by(fc.desc("count"))
        
        print("\n💭 SENTIMENT ANALYSIS:")
        sentiment_breakdown.show()
        
        # Business impact analysis
        business_impact = processed_reviews.group_by(
            processed_reviews.analysis.business_impact
        ).agg(
            fc.count("*").alias("count")
        ).order_by(fc.desc("count"))
        
        print("\n💼 BUSINESS IMPACT ANALYSIS:")
        business_impact.show()
        
        # Urgency analysis
        urgency_analysis = processed_reviews.group_by(
            processed_reviews.analysis.urgency_level
        ).agg(
            fc.count("*").alias("count")
        ).order_by(fc.desc("count"))
        
        print("\n🚨 URGENCY ANALYSIS:")
        urgency_analysis.show()
        
        # Calculate key metrics
        high_urgency = processed_reviews.filter(
            processed_reviews.analysis.urgency_level.is_in(["high", "critical"])
        ).count()
        
        churn_risk = processed_reviews.filter(
            processed_reviews.analysis.business_impact == "churn_risk"
        ).count()
        
        expansion_opportunities = processed_reviews.filter(
            processed_reviews.analysis.business_impact == "expansion_opportunity"
        ).count()
        
        high_confidence = processed_reviews.filter(
            processed_reviews.analysis.confidence > 0.8
        ).count()
        
        print("\n🎯 KEY BUSINESS METRICS:")
        print(f"   • High/Critical urgency reviews: {high_urgency} ({high_urgency/total_processed*100:.1f}%)")
        print(f"   • Churn risk accounts: {churn_risk} ({churn_risk/total_processed*100:.1f}%)")
        print(f"   • Expansion opportunities: {expansion_opportunities} ({expansion_opportunities/total_processed*100:.1f}%)")
        print(f"   • High-confidence analyses: {high_confidence} ({high_confidence/total_processed*100:.1f}%)")
        
        print("\n⚡ BATCH PROCESSING PERFORMANCE:")
        print(f"   • Dataset size: {total_processed:,} reviews")
        print(f"   • Processing time: {batch_duration:.2f} seconds")
        print(f"   • Throughput: {total_processed/batch_duration:.1f} items/second")
        print(f"   • Cost efficiency: ~${(total_processed * 0.002):.2f} estimated processing cost")
        
        # Estimated time savings
        manual_time_hours = total_processed * 2 / 60  # 2 minutes per review manually
        time_saved = manual_time_hours - (batch_duration / 3600)
        
        print("\n💰 BUSINESS VALUE:")
        print(f"   • Manual analysis time: ~{manual_time_hours:.1f} hours")
        print(f"   • Automated analysis time: {batch_duration/3600:.2f} hours")
        print(f"   • Time saved: {time_saved:.1f} hours ({time_saved*100/manual_time_hours:.1f}% faster)")
        print(f"   • Estimated cost savings: ${time_saved * 50:.0f} (at $50/hour labor cost)")
        
    else:
        print("📊 Using fallback analytics (simplified analysis)")
        if 'simplified_analysis' in locals():
            fallback_sentiment = simplified_analysis.group_by("sentiment").agg(
                fc.count("*").alias("count")
            ).order_by(fc.desc("count"))
            
            print("💭 SENTIMENT DISTRIBUTION (Fallback):")
            fallback_sentiment.show()
            print(f"   • Processed {fallback_count} reviews with simplified analysis")
        
except Exception as e:
    print(f"📊 Analytics error (non-critical): {str(e)[:100]}")
    print("   • Batch processing completed successfully")
    print("   • Analytics dashboard needs attention")

print("\n🏆 ENTERPRISE BATCH PROCESSING BENEFITS:")
print("   • Massive scale: Process thousands of items efficiently")
print("   • Cost optimization: Intelligent batching reduces API calls")
print("   • Rate limit management: Automatic throttling and queuing")
print("   • Progress monitoring: Real-time processing insights")
print("   • Fault tolerance: Graceful degradation and error recovery")
print("   • Business intelligence: Immediate actionable insights")
print("   • Time savings: Hours of manual work completed in minutes")

print("\n🎯 PRODUCTION READINESS:")
print("   ✅ Handles enterprise-scale datasets (10K+ items)")
print("   ✅ Optimized for cost and performance")
print("   ✅ Built-in error handling and fallbacks")
print("   ✅ Real-time progress tracking")
print("   ✅ Comprehensive business analytics")
print("   ✅ Automatic batching and parallel processing")

In [None]:
session.stop()